Batch vs Stream Processing: Four Approaches to the Same Problem
This project was inspired by chapters 10 and 11 of Martin Kleppmann's Designing Data-Intensive Applications. The core idea: take the same data problem — normalizing 100GB of messy CSV data from OpenFoodFacts into a relational schema — and solve it four different ways, from the simplest to the most production-ready.
The repository contains all four implementations side by side.
The Problem
OpenFoodFacts distributes a single massive CSV with 90+ columns per row. Tags, categories, and nutrients are denormalized — crammed into comma-separated strings within cells. We want a clean relational schema:
productstable (code, name, brand)tagstable (id, tag_name)product_tagsjunction table (product_code, tag_id)
With 100GB of input, you cannot just loop through it on a single machine.
Approach 1: Manual Script
The baseline. A single Python script that reads line by line, splits tags, and inserts into PostgreSQL. It works for small datasets but falls over at scale — no parallelism, no fault tolerance, and a single DB connection bottleneck.
import csv import psycopg2 conn = psycopg2.connect("dbname=foodfacts user=postgres") cur = conn.cursor() tag_cache = {} with open("foodfacts.csv", "r") as f: reader = csv.DictReader(f, delimiter="\t") for row in reader: code = row["code"] name = row["product_name"] cur.execute( "INSERT INTO products (code, name) VALUES (%s, %s) ON CONFLICT DO NOTHING", (code, name), ) tags = row.get("tags", "").split(",") for tag in tags: tag = tag.strip().lower() if not tag: continue if tag not in tag_cache: cur.execute( "INSERT INTO tags (tag_name) VALUES (%s) ON CONFLICT (tag_name) DO NOTHING RETURNING id", (tag,), ) result = cur.fetchone() if result: tag_cache[tag] = result[0] else: cur.execute("SELECT id FROM tags WHERE tag_name = %s", (tag,)) tag_cache[tag] = cur.fetchone()[0] cur.execute( "INSERT INTO product_tags (product_code, tag_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (code, tag_cache[tag]), ) conn.commit()
Simple. Correct. Unusable at 100GB.
Approach 2: Hadoop MapReduce
The classic batch processing approach. The input is split across a cluster. Mappers emit (tag, product_code) pairs. Shuffle & sort groups all products by tag. Reducers write the normalized relations.
Mapper
public class OpenFoodFactsMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] col = value.toString().split("\t"); if (col.length < 26) return; String code = col[0]; String[] tags = col[25].split(","); for (String tag : tags) { String clean = tag.trim().toLowerCase(); if (!clean.isEmpty()) { context.write(new Text(clean), new Text(code)); } } } }
Reducer
public class TagRelationReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text productCode : values) { context.write(key, productCode); } } }
This works but is rigid. Every step writes intermediate results to HDFS. A multi-step normalization (extract unique tags → assign IDs → write relations) requires chaining multiple jobs, each with full disk I/O between them.
The Distributed Cache helps — you can broadcast the small tag-ID lookup table to every mapper so they can resolve IDs locally without network roundtrips:
@Override protected void setup(Context context) { // Load tag-ID map from Distributed Cache into HashMap // Every mapper gets this file locally loadTagCache(); }
Real-world MapReduce use cases
- Google PageRank: Map web pages to
(linked_page, linking_page)pairs, reduce to compute link counts and rank scores - Amazon "also bought": Map purchase histories to
(product, co_purchased_product)pairs, reduce to find correlations - Netflix transcoding: Split videos into chunks, map each chunk through ffmpeg at different bitrates, reduce to reassemble
Approach 3: Apache Spark
Spark replaced MapReduce's rigid two-phase model with DAGs (Directed Acyclic Graphs) — arbitrary chains of transformations without writing intermediate results to disk. In-memory processing makes it orders of magnitude faster.
The same normalization in Spark:
from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, trim, lower spark = SparkSession.builder.appName("FoodFactsNormalization").getOrCreate() # Read — Spark auto-partitions across the cluster df = spark.read.option("sep", "\t").csv("s3://bucket/foodfacts.csv", header=True) # Products table products = df.select("code", "product_name", "brands") products.write.mode("append").jdbc(db_url, "products") # Normalize tags — explode splits the comma-separated string into rows tag_relations = df.select( "code", trim(lower(explode(split(df.tags, ",")))).alias("tag_name") ) tag_relations.write.mode("append").jdbc(db_url, "product_tags")
What took three chained MapReduce jobs is now five lines. Spark's query optimizer figures out the shuffle strategy, memory allocation, and parallelism. The abstraction is SQL-like — you describe what you want, not how to partition and sort.
Modern managed services like BigQuery and Athena push this further — you write SQL and the infrastructure handles everything. But understanding what happens underneath (partitioning, shuffling, joins) matters when queries get slow.
Approach 4: Kafka Streams (Stream Processing)
The logical continuation from batch. Same idea — input, transformation, output — but the input is unbounded. There is no "end of file." Events arrive continuously and must be processed in real time.
This changes everything:
- Time matters: When did the event happen? When did we receive it? What if events arrive out of order?
- State is local: A remote database is too slow for per-event lookups. Kafka Streams uses RocksDB as a local state store.
- At-least-once delivery: The broker guarantees a message is delivered, but not that it is delivered only once. Your processing must be idempotent.
StreamsBuilder builder = new StreamsBuilder(); // Local state store for tag IDs (RocksDB) StoreBuilder<KeyValueStore<String, Long>> tagStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("tag-id-cache"), Serdes.String(), Serdes.Long() ); builder.addStateStore(tagStore); builder.stream("raw-products", Consumed.with(Serdes.String(), Serdes.String())) .process(NormalizationProcessor::new, "tag-id-cache");
The processor handles each event — parsing, tag lookup/creation, and writing to PostgreSQL:
class NormalizationProcessor extends ContextualProcessor<String, String, Void, Void> { private KeyValueStore<String, Long> tagCache; private Connection db; @Override public void init(ProcessorContext<Void, Void> context) { super.init(context); this.tagCache = context.getStateStore("tag-id-cache"); this.db = DriverManager.getConnection(DB_URL); } @Override public void process(Record<String, String> record) { String[] cols = record.value().split(";"); if (cols.length < 3) return; String code = cols[0]; String name = cols[1]; String[] tags = cols[2].split(","); upsertProduct(code, name); for (String raw : tags) { String tag = raw.trim().toLowerCase(); if (tag.isEmpty()) continue; Long tagId = tagCache.get(tag); if (tagId == null) { tagId = getOrCreateTagInDb(tag); tagCache.put(tag, tagId); } linkProductToTag(code, tagId); } } }
Message ordering and consumer groups
Two delivery patterns:
- Load balancing: Each event goes to exactly one consumer. Work is distributed across a consumer group.
- Fan-out: Every event goes to all consumers. Each consumer group gets a full copy.
When a consumer crashes mid-processing, the message becomes visible again after a visibility timeout — the broker does not delete it until acknowledgement. This is the at-least-once trade-off: it is more important that a message is never lost than that it is never processed twice.
Message Groups solve ordering: events with the same group ID (e.g. user ID, order ID) are guaranteed to be processed sequentially. The broker holds a lock on the group ID, ensuring causal dependencies are preserved.
Comparison
| Manual | MapReduce | Spark | Kafka Streams | |
|---|---|---|---|---|
| Input | Bounded | Bounded | Bounded | Unbounded |
| Parallelism | None | Cluster | Cluster | Cluster |
| Intermediate state | N/A | HDFS (disk) | Memory | RocksDB (local) |
| Fault tolerance | None | Job retry | Stage retry | Consumer rebalance |
| Latency | N/A | Minutes-hours | Seconds-minutes | Milliseconds |
| Complexity | Low | High | Medium | Medium-high |
| When to use | Prototyping | Legacy/Hadoop shops | Batch at scale | Real-time pipelines |
The progression is clear: each approach solves the limitations of the previous one. Manual scripts do not scale. MapReduce scales but is rigid and slow. Spark is fast but still batch. Kafka Streams handles the real world — where data never stops arriving.
Reference: Martin Kleppmann, Designing Data-Intensive Applications, Chapters 10-11.