Chapter 11 of (Kleppman) Designing Data-Intensive Systems
- Stream (data processing) (684)
- Message queue (broker)
- When stream consumers lag producers
- Message topics
- Persistent and ephemeral message brokers as databases
- Persistent message brokers are based on partitioned logs
- Persistent message queues don’t care if a consumer goes offline
- Dual writes (704-5)
- Change data capture (707)
- Timestamps in stream processing
- Stream joins
Raw highlights from Kindle
Highlight(yellow) - Page 684 · Location 13717
In general, a “stream” refers to data that is incrementally made available over time. The concept appears in many places: in the stdin and stdout of Unix, programming languages (lazy lists) [2], filesystem APIs (such as Java’s FileInputStream), TCP connections, delivering audio and video over the internet, and so on.
Highlight(yellow) - Transmitting Event Streams > Page 684 · Location 13730
When the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock (see “Monotonic Versus Time-of-Day Clocks”).
Highlight(yellow) - Transmitting Event Streams > Page 685 · Location 13745
Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients) [3]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.
Highlight(yellow) - Messaging Systems > Page 686 · Location 13766
Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
Highlight(yellow) - Messaging Systems > Page 686 · Location 13776
What happens if the producers send messages faster than the consumers can process them? Broadly speaking, there are three options: the system can drop messages, buffer messages in a queue, or apply backpressure (also known as flow control; i.e., blocking the producer from sending more messages).
Highlight(yellow) - Messaging Systems > Page 689 · Location 13832
message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams [13].
Note - Messaging Systems > Page 689 · Location 13834
Contrasting message queues with direct messaging. The latter is more of what you think of when you think of message passing, and makes few or no reliability guarantees, but is fast and simple. By contrast, message brokers are essentially a kind of database, and hence are relatively heavyweight.
Highlight(yellow) - Messaging Systems > Page 689 · Location 13838
Faced with slow consumers, they generally allow unbounded queueing (as opposed to dropping messages or backpressure), although this choice may also depend on the configuration.
Highlight(yellow) - Messaging Systems > Page 689 · Location 13840
when a producer sends a message, it normally only waits for the broker to confirm that it has buffered the message
Highlight(yellow) - Messaging Systems > Page 690 · Location 13850
Since they quickly delete messages, most message brokers assume that their working set is fairly small—
Highlight(yellow) - Messaging Systems > Page 690 · Location 13853
Highlight(pink) - Messaging Systems > Page 691 · Location 13869
JMS
Highlight(pink) - Messaging Systems > Page 691 · Location 13870
AMQP
Note - Messaging Systems > Page 691 · Location 13877
Load balancing: each message is delivered to one consumer
Note - Messaging Systems > Page 691 · Location 13883
Fan-out: each message is delivered to all consumers (broadcast)
Highlight(yellow) - Messaging Systems > Page 692 · Location 13898
crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
Highlight(yellow) - Messaging Systems > Page 694 · Location 13914
Even if the message broker otherwise tries to preserve the order of messages (as required by both the JMS and AMQP standards), the combination of load balancing with redelivery inevitably leads to messages being reordered.
Highlight(yellow) - Partitioned Logs > Page 695 · Location 13929
receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.
Highlight(yellow) - Partitioned Logs > Page 696 · Location 13941
a producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially. If a consumer reaches the end of the log, it waits for a notification that a new message has been appended.
Note - Partitioned Logs > Page 696 · Location 13942
Log-based message brokers
Highlight(yellow) - Partitioned Logs > Page 696 · Location 13946
A topic can then be defined as a group of partitions that all carry messages of the same type.
Highlight(yellow) - Partitioned Logs > Page 696 · Location 13949
Within each partition, the broker assigns a monotonically increasing sequence number, or offset, to every message
CONTINUE FROM HERE
Highlight(yellow) - Partitioned Logs > Page 697 · Location 13958
Apache Kafka [17, 18], Amazon Kinesis Streams [19], and Twitter’s DistributedLog [20, 21] are log-based message brokers that work like this.
Highlight(yellow) - Partitioned Logs > Page 697 · Location 13978
The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log.
Highlight(yellow) - Partitioned Logs > Page 698 · Location 13983
Each client then consumes all the messages in the partitions it has been assigned.
Highlight(yellow) - Partitioned Logs > Page 698 · Location 13989
If a single message is slow to process, it holds up the processing of subsequent messages in that partition
Highlight(yellow) - Partitioned Logs > Page 698 · Location 14002
Consuming a partition sequentially makes it easy to tell which messages have been processed:
Highlight(yellow) - Partitioned Logs > Page 699 · Location 14004
Thus, the broker does not need to track acknowledgments for every single message—it only needs to periodically record the consumer offsets.
Highlight(yellow) - Partitioned Logs > Page 699 · Location 14007
This offset is in fact very similar to the log sequence number that is commonly found in single-leader database replication,
Highlight(yellow) - Partitioned Logs > Page 699 · Location 14011
the message broker behaves like a leader database, and the consumer like a follower.
Highlight(yellow) - Partitioned Logs > Page 699 · Location 14019
To reclaim disk space, the log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage.
Highlight(yellow) - Partitioned Logs > Page 700 · Location 14023
Effectively, the log implements a bounded-size buffer that discards old messages when it gets full, also known as a circular buffer or ring buffer.
Highlight(yellow) - Partitioned Logs > Page 701 · Location 14045
you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services.
Note - Partitioned Logs > Page 701 · Location 14046
In log-based message brokers, message retention is independent of consumption; the system just writes to a circular log and deletes the oldest record segments when the disk starts to fill up. Therefore, if a consumer goes offline, it just stops consuming bandwidth. By contrast, in a traditional message broker (AMQP, JMS) setup, a queue with no consumers will take up more and more storage/memory until it reaches some limit or breaks. This makes LBMBs more like batch processing tools; you can just keep running over the same set of messages as often as you like with no side effects.
Highlight(yellow) - Databases and Streams > Page 702 · Location 14082
The fact that something was written to a database is an event that can be captured, stored, and processed.
Highlight(yellow) - Databases and Streams > Page 703 · Location 14086
a replication log (see “Implementation of Replication Logs”) is a stream of database write events, produced by the leader as it processes transactions.
Highlight(pink) - Databases and Streams > Page 703 · Location 14092
state machine
Note - Databases and Streams > Page 703 · Location 14092
A mathematical abstraction of computing in which we assume that a system can be in exactly one of a fixed number of known states.
Highlight(yellow) - Keeping Systems in Sync > Page 704 · Location 14115
dual writes, in which the application code explicitly writes to each of the systems when data changes:
Highlight(yellow) - Keeping Systems in Sync > Page 704 · Location 14121
dual writes have some serious problems, one of which is a race condition
Highlight(yellow) - Keeping Systems in Sync > Page 705 · Location 14134
Another problem with dual writes is that one of the writes may fail while the other succeeds.
Highlight(yellow) - Change Data Capture > Page 706 · Location 14158
there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.
Highlight(yellow) - Change Data Capture > Page 707 · Location 14173
change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database to the derived systems, since it preserves the ordering of messages
Highlight(yellow) - Change Data Capture > Page 708 · Location 14193
LinkedIn’s Databus [25], Facebook’s Wormhole [26], and Yahoo!’ s Sherpa [27] use this idea at large scale. Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [28], Maxwell and Debezium do something similar for MySQL by parsing the binlog [29, 30, 31], Mongoriver reads the MongoDB oplog [32, 33], and GoldenGate provides similar facilities for Oracle [34, 35]. The Kafka Connect framework offers further CDC connectors for various databases.
Highlight(yellow) - Change Data Capture > Page 708 · Location 14220
if you don’t have the entire log history, you need to start with a consistent snapshot,
Highlight(yellow) - Change Data Capture > Page 709 · Location 14221
The snapshot of the database must correspond to a known position or offset in the change log,
Highlight(yellow) - Change Data Capture > Page 709 · Location 14232
the storage engine periodically looks for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key.
Note - Change Data Capture > Page 709 · Location 14233
Log compaction
Highlight(yellow) - Change Data Capture > Page 709 · Location 14234
In a log-structured storage engine, an update with a special null value (a tombstone) indicates that a key was deleted, and causes it to be removed during log compaction. But as long as a key is not overwritten or deleted, it stays in the log forever.
Highlight(yellow) - Change Data Capture > Page 710 · Location 14244
This log compaction feature is supported by Apache Kafka. As we shall see later in this chapter, it allows the message broker to be used for durable storage, not just for transient messaging.
Highlight(yellow) - Change Data Capture > Page 710 · Location 14255
Increasingly, databases are beginning to support change streams as a first-class interface, rather than the typical retrofitted and reverse-engineered CDC efforts.
Highlight(yellow) - Event Sourcing > Page 711 · Location 14284
There are some parallels between the ideas we’ve discussed here and event sourcing, a technique that was developed in the domain-driven design (DDD) community
Highlight(yellow) - Event Sourcing > Page 711 · Location 14298
In event sourcing, the application logic is explicitly built on the basis of immutable events that are written to an event log.
Highlight(yellow) - Event Sourcing > Page 712 · Location 14302
Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs
Highlight(yellow) - Event Sourcing > Page 712 · Location 14310
Event sourcing is similar to the chronicle data model [45], and there are also similarities between an event log and the fact table that you find in a star schema
Highlight(yellow) - Event Sourcing > Page 713 · Location 14333
with event sourcing, events are modeled at a higher level: an event typically expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. In this case, later events typically do not override prior events, and so you need the full history of events to reconstruct the final state. Log compaction is not possible in the same way.
Highlight(yellow) - Event Sourcing > Page 714 · Location 14344
The event sourcing philosophy is careful to distinguish between events and commands
Highlight(yellow) - Event Sourcing > Page 714 · Location 14347
If the validation is successful and the command is accepted, it becomes an event, which is durable and immutable.
Highlight(yellow) - Event Sourcing > Page 714 · Location 14355
A consumer of the event stream is not allowed to reject an event: by the time the consumer sees the event, it is already an immutable part of the log, and it may have already been seen by other consumers.
Highlight(yellow) - State, Streams, and Immutability > Page 715 · Location 14381
No matter how the state changes, there was always a sequence of events that caused those changes.
Highlight(yellow) - State, Streams, and Immutability > Page 716 · Location 14385
If you are mathematically inclined, you might say that the application state is what you get when you integrate an event stream over time, and a change stream is what you get when you differentiate the state by time,
Highlight(yellow) - State, Streams, and Immutability > Page 716 · Location 14401
The truth is the log. The database is a cache of a subset of the log.
Highlight(yellow) - State, Streams, and Immutability > Page 717 · Location 14412
Immutability in databases is an old idea. For example, accountants have been using immutability for centuries in financial bookkeeping.
Highlight(yellow) - State, Streams, and Immutability > Page 717 · Location 14418
If a mistake is made, accountants don’t erase or change the incorrect transaction in the ledger—instead, they add another transaction that compensates for the mistake,
Highlight(yellow) - State, Streams, and Immutability > Page 718 · Location 14436
by separating mutable state from the immutable event log, you can derive several different read-oriented representations from the same log of events.
Highlight(yellow) - State, Streams, and Immutability > Page 718 · Location 14446
Having an explicit translation step from an event log to a database makes it easier to evolve your application over time:
Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14453
many of the complexities of schema design, indexing, and storage engines are the result of wanting to support certain query and access patterns (see Chapter 3). For this reason, you gain a lot of flexibility by separating the form in which data is written from the form it is read, and by allowing several different read views. This idea is sometimes known as command query responsibility segregation (CQRS)
Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14465
Debates about normalization and denormalization (see “Many-to-One and Many-to-Many Relationships”) become largely irrelevant if you can translate data from a write-optimized event log to read-optimized application state:
Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14476
The biggest downside of event sourcing and change data capture is that the consumers of the event log are usually asynchronous, so there is a possibility that a user may make a write to the log, then read from a log-derived view and find that their write has not yet been reflected in the read view.
Highlight(yellow) - State, Streams, and Immutability > Page 720 · Location 14483
On the other hand, deriving the current state from an event log also simplifies some aspects of concurrency control.
Note - Processing Streams > Page 722 · Location 14540
Stream results can be stored, used to trigger software events, or used as input to a subsequent stream.
Highlight(yellow) - Uses of Stream Processing > Page 726 · Location 14620
Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters (which we encountered in “Performance optimizations”) for set membership, HyperLogLog [72] for cardinality estimation, and various percentile estimation algorithms (see “Percentiles in Practice”).
Highlight(yellow) - Reasoning About Time > Page 730 · Location 14704
A batch process may read a year’s worth of historical events within a few minutes; in most cases, the timeline of interest is the year of history, not the few minutes of processing.
Highlight(yellow) - Reasoning About Time > Page 730 · Location 14708
On the other hand, many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing [79].
Highlight(yellow) - Reasoning About Time > Page 732 · Location 14743
A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
Highlight(yellow) - Reasoning About Time > Page 735 · Location 14792
A tumbling window has a fixed length, and every event belongs to exactly one window.
Highlight(yellow) - Reasoning About Time > Page 735 · Location 14797
A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing.
Highlight(yellow) - Reasoning About Time > Page 735 · Location 14801
A sliding window contains all the events that occur within some interval of each other.
Highlight(yellow) - Reasoning About Time > Page 736 · Location 14807
Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes).
Note - Reasoning About Time > Page 736 · Location 14808
Session windowing
Highlight(yellow) - Stream Joins > Page 736 · Location 14818
the fact that new events can appear anytime on a stream makes joins on streams more challenging than in batch jobs. To understand the situation better, let’s distinguish three different types of joins: stream-stream joins, stream-table joins, and table-table joins [84].
Highlight(yellow) - Stream Joins > Page 737 · Location 14839
To implement this type of join, a stream processor needs to maintain state: for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
Note - Stream Joins > Page 738 · Location 14844
Stream-stream join
Highlight(yellow) - Stream Joins > Page 738 · Location 14854
To perform this join, the stream process needs to look at one activity event at a time, look up the event’s user ID in the database, and add the profile information to the activity event.
Note - Stream Joins > Page 738 · Location 14855
Stream-table join
Highlight(yellow) - Stream Joins > Page 739 · Location 14868
A stream-table join is actually very similar to a stream-stream join; the biggest difference is that for the table changelog stream, the join uses a window that reaches back to the “beginning of time” (a conceptually infinite window), with newer versions of records overwriting older ones. For the stream input, the join might not maintain a window at all.
Highlight(yellow) - Stream Joins > Page 740 · Location 14892
Another way of looking at this stream process is that it maintains a materialized view for a query that joins two tables
Note - Stream Joins > Page 740 · Location 14893
In stream processing, a table-table join is a process that maintains a materialized view. It is built by applying new events to an existing data store.
Highlight(yellow) - Stream Joins > Page 741 · Location 14912
The order of the events that maintain the state is important
Highlight(yellow) - Stream Joins > Page 741 · Location 14914
This raises a question: if events on different streams happen around a similar time, in which order are they processed?
Highlight(yellow) - Stream Joins > Page 741 · Location 14921
If the ordering of events across streams is undetermined, the join becomes nondeterministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again.
Highlight(yellow) - Stream Joins > Page 741 · Location 14928
In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record: for example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale [88, 89]. This change makes the join deterministic, but has the consequence that log compaction is not possible, since all versions of the records in the table need to be retained.
Highlight(yellow) - Fault Tolerance > Page 742 · Location 14951
waiting until a task is finished before making its output visible is not an option, because a stream is infinite and so you can never finish processing it.
Highlight(yellow) - Fault Tolerance > Page 742 · Location 14958
One solution is to break the stream into small blocks, and treat each block like a miniature batch process.
Highlight(yellow) - Fault Tolerance > Page 743 · Location 14970
A variant approach, used in Apache Flink, is to periodically generate rolling checkpoints of state and write them to durable storage [92, 93].
Highlight(yellow) - Fault Tolerance > Page 743 · Location 14977
However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed batch.
Highlight(yellow) - Fault Tolerance > Page 745 · Location 15014
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.