Chapter 11 of (Kleppman) Designing Data-Intensive Systems

Raw highlights from Kindle

Highlight(yellow) - Page 684 · Location 13717

In general, a “stream” refers to data that is incrementally made available over time. The concept appears in many places: in the stdin and stdout of Unix, programming languages (lazy lists) [2], filesystem APIs (such as Java’s FileInputStream), TCP connections, delivering audio and video over the internet, and so on.

Highlight(yellow) - Transmitting Event Streams > Page 684 · Location 13730

When the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock (see “Monotonic Versus Time-of-Day Clocks”).

Highlight(yellow) - Transmitting Event Streams > Page 685 · Location 13745

Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients) [3]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.

Highlight(yellow) - Messaging Systems > Page 686 · Location 13766

Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.

Highlight(yellow) - Messaging Systems > Page 686 · Location 13776

What happens if the producers send messages faster than the consumers can process them? Broadly speaking, there are three options: the system can drop messages, buffer messages in a queue, or apply backpressure (also known as flow control; i.e., blocking the producer from sending more messages).

Highlight(yellow) - Messaging Systems > Page 689 · Location 13832

message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams [13].

Note - Messaging Systems > Page 689 · Location 13834

Contrasting message queues with direct messaging. The latter is more of what you think of when you think of message passing, and makes few or no reliability guarantees, but is fast and simple. By contrast, message brokers are essentially a kind of database, and hence are relatively heavyweight.

Highlight(yellow) - Messaging Systems > Page 689 · Location 13838

Faced with slow consumers, they generally allow unbounded queueing (as opposed to dropping messages or backpressure), although this choice may also depend on the configuration.

Highlight(yellow) - Messaging Systems > Page 689 · Location 13840

when a producer sends a message, it normally only waits for the broker to confirm that it has buffered the message

Highlight(yellow) - Messaging Systems > Page 690 · Location 13850

Since they quickly delete messages, most message brokers assume that their working set is fairly small—

Highlight(yellow) - Messaging Systems > Page 690 · Location 13853

Highlight(pink) - Messaging Systems > Page 691 · Location 13869

JMS

Highlight(pink) - Messaging Systems > Page 691 · Location 13870

AMQP

Note - Messaging Systems > Page 691 · Location 13877

Load balancing: each message is delivered to one consumer

Note - Messaging Systems > Page 691 · Location 13883

Fan-out: each message is delivered to all consumers (broadcast)

Highlight(yellow) - Messaging Systems > Page 692 · Location 13898

crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.

Highlight(yellow) - Messaging Systems > Page 694 · Location 13914

Even if the message broker otherwise tries to preserve the order of messages (as required by both the JMS and AMQP standards), the combination of load balancing with redelivery inevitably leads to messages being reordered.

Highlight(yellow) - Partitioned Logs > Page 695 · Location 13929

receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.

Highlight(yellow) - Partitioned Logs > Page 696 · Location 13941

a producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially. If a consumer reaches the end of the log, it waits for a notification that a new message has been appended.

Note - Partitioned Logs > Page 696 · Location 13942

Log-based message brokers

Highlight(yellow) - Partitioned Logs > Page 696 · Location 13946

A topic can then be defined as a group of partitions that all carry messages of the same type.

Highlight(yellow) - Partitioned Logs > Page 696 · Location 13949

Within each partition, the broker assigns a monotonically increasing sequence number, or offset, to every message

CONTINUE FROM HERE

Highlight(yellow) - Partitioned Logs > Page 697 · Location 13958

Apache Kafka [17, 18], Amazon Kinesis Streams [19], and Twitter’s DistributedLog [20, 21] are log-based message brokers that work like this.

Highlight(yellow) - Partitioned Logs > Page 697 · Location 13978

The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log.

Highlight(yellow) - Partitioned Logs > Page 698 · Location 13983

Each client then consumes all the messages in the partitions it has been assigned.

Highlight(yellow) - Partitioned Logs > Page 698 · Location 13989

If a single message is slow to process, it holds up the processing of subsequent messages in that partition

Highlight(yellow) - Partitioned Logs > Page 698 · Location 14002

Consuming a partition sequentially makes it easy to tell which messages have been processed:

Highlight(yellow) - Partitioned Logs > Page 699 · Location 14004

Thus, the broker does not need to track acknowledgments for every single message—it only needs to periodically record the consumer offsets.

Highlight(yellow) - Partitioned Logs > Page 699 · Location 14007

This offset is in fact very similar to the log sequence number that is commonly found in single-leader database replication,

Highlight(yellow) - Partitioned Logs > Page 699 · Location 14011

the message broker behaves like a leader database, and the consumer like a follower.

Highlight(yellow) - Partitioned Logs > Page 699 · Location 14019

To reclaim disk space, the log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage.

Highlight(yellow) - Partitioned Logs > Page 700 · Location 14023

Effectively, the log implements a bounded-size buffer that discards old messages when it gets full, also known as a circular buffer or ring buffer.

Highlight(yellow) - Partitioned Logs > Page 701 · Location 14045

you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services.

Note - Partitioned Logs > Page 701 · Location 14046

In log-based message brokers, message retention is independent of consumption; the system just writes to a circular log and deletes the oldest record segments when the disk starts to fill up. Therefore, if a consumer goes offline, it just stops consuming bandwidth. By contrast, in a traditional message broker (AMQP, JMS) setup, a queue with no consumers will take up more and more storage/memory until it reaches some limit or breaks. This makes LBMBs more like batch processing tools; you can just keep running over the same set of messages as often as you like with no side effects.

Highlight(yellow) - Databases and Streams > Page 702 · Location 14082

The fact that something was written to a database is an event that can be captured, stored, and processed.

Highlight(yellow) - Databases and Streams > Page 703 · Location 14086

a replication log (see “Implementation of Replication Logs”) is a stream of database write events, produced by the leader as it processes transactions.

Highlight(pink) - Databases and Streams > Page 703 · Location 14092

state machine

Note - Databases and Streams > Page 703 · Location 14092

A mathematical abstraction of computing in which we assume that a system can be in exactly one of a fixed number of known states.

Highlight(yellow) - Keeping Systems in Sync > Page 704 · Location 14115

dual writes, in which the application code explicitly writes to each of the systems when data changes:

Highlight(yellow) - Keeping Systems in Sync > Page 704 · Location 14121

dual writes have some serious problems, one of which is a race condition

Highlight(yellow) - Keeping Systems in Sync > Page 705 · Location 14134

Another problem with dual writes is that one of the writes may fail while the other succeeds.

Highlight(yellow) - Change Data Capture > Page 706 · Location 14158

there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.

Highlight(yellow) - Change Data Capture > Page 707 · Location 14173

change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database to the derived systems, since it preserves the ordering of messages

Highlight(yellow) - Change Data Capture > Page 708 · Location 14193

LinkedIn’s Databus [25], Facebook’s Wormhole [26], and Yahoo!’ s Sherpa [27] use this idea at large scale. Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [28], Maxwell and Debezium do something similar for MySQL by parsing the binlog [29, 30, 31], Mongoriver reads the MongoDB oplog [32, 33], and GoldenGate provides similar facilities for Oracle [34, 35]. The Kafka Connect framework offers further CDC connectors for various databases.

Highlight(yellow) - Change Data Capture > Page 708 · Location 14220

if you don’t have the entire log history, you need to start with a consistent snapshot,

Highlight(yellow) - Change Data Capture > Page 709 · Location 14221

The snapshot of the database must correspond to a known position or offset in the change log,

Highlight(yellow) - Change Data Capture > Page 709 · Location 14232

the storage engine periodically looks for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key.

Note - Change Data Capture > Page 709 · Location 14233

Log compaction

Highlight(yellow) - Change Data Capture > Page 709 · Location 14234

In a log-structured storage engine, an update with a special null value (a tombstone) indicates that a key was deleted, and causes it to be removed during log compaction. But as long as a key is not overwritten or deleted, it stays in the log forever.

Highlight(yellow) - Change Data Capture > Page 710 · Location 14244

This log compaction feature is supported by Apache Kafka. As we shall see later in this chapter, it allows the message broker to be used for durable storage, not just for transient messaging.

Highlight(yellow) - Change Data Capture > Page 710 · Location 14255

Increasingly, databases are beginning to support change streams as a first-class interface, rather than the typical retrofitted and reverse-engineered CDC efforts.

Highlight(yellow) - Event Sourcing > Page 711 · Location 14284

There are some parallels between the ideas we’ve discussed here and event sourcing, a technique that was developed in the domain-driven design (DDD) community

Highlight(yellow) - Event Sourcing > Page 711 · Location 14298

In event sourcing, the application logic is explicitly built on the basis of immutable events that are written to an event log.

Highlight(yellow) - Event Sourcing > Page 712 · Location 14302

Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs

Highlight(yellow) - Event Sourcing > Page 712 · Location 14310

Event sourcing is similar to the chronicle data model [45], and there are also similarities between an event log and the fact table that you find in a star schema

Highlight(yellow) - Event Sourcing > Page 713 · Location 14333

with event sourcing, events are modeled at a higher level: an event typically expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. In this case, later events typically do not override prior events, and so you need the full history of events to reconstruct the final state. Log compaction is not possible in the same way.

Highlight(yellow) - Event Sourcing > Page 714 · Location 14344

The event sourcing philosophy is careful to distinguish between events and commands

Highlight(yellow) - Event Sourcing > Page 714 · Location 14347

If the validation is successful and the command is accepted, it becomes an event, which is durable and immutable.

Highlight(yellow) - Event Sourcing > Page 714 · Location 14355

A consumer of the event stream is not allowed to reject an event: by the time the consumer sees the event, it is already an immutable part of the log, and it may have already been seen by other consumers.

Highlight(yellow) - State, Streams, and Immutability > Page 715 · Location 14381

No matter how the state changes, there was always a sequence of events that caused those changes.

Highlight(yellow) - State, Streams, and Immutability > Page 716 · Location 14385

If you are mathematically inclined, you might say that the application state is what you get when you integrate an event stream over time, and a change stream is what you get when you differentiate the state by time,

Highlight(yellow) - State, Streams, and Immutability > Page 716 · Location 14401

The truth is the log. The database is a cache of a subset of the log.

Highlight(yellow) - State, Streams, and Immutability > Page 717 · Location 14412

Immutability in databases is an old idea. For example, accountants have been using immutability for centuries in financial bookkeeping.

Highlight(yellow) - State, Streams, and Immutability > Page 717 · Location 14418

If a mistake is made, accountants don’t erase or change the incorrect transaction in the ledger—instead, they add another transaction that compensates for the mistake,

Highlight(yellow) - State, Streams, and Immutability > Page 718 · Location 14436

by separating mutable state from the immutable event log, you can derive several different read-oriented representations from the same log of events.

Highlight(yellow) - State, Streams, and Immutability > Page 718 · Location 14446

Having an explicit translation step from an event log to a database makes it easier to evolve your application over time:

Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14453

many of the complexities of schema design, indexing, and storage engines are the result of wanting to support certain query and access patterns (see Chapter 3). For this reason, you gain a lot of flexibility by separating the form in which data is written from the form it is read, and by allowing several different read views. This idea is sometimes known as command query responsibility segregation (CQRS)

Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14465

Debates about normalization and denormalization (see “Many-to-One and Many-to-Many Relationships”) become largely irrelevant if you can translate data from a write-optimized event log to read-optimized application state:

Highlight(yellow) - State, Streams, and Immutability > Page 719 · Location 14476

The biggest downside of event sourcing and change data capture is that the consumers of the event log are usually asynchronous, so there is a possibility that a user may make a write to the log, then read from a log-derived view and find that their write has not yet been reflected in the read view.

Highlight(yellow) - State, Streams, and Immutability > Page 720 · Location 14483

On the other hand, deriving the current state from an event log also simplifies some aspects of concurrency control.

Note - Processing Streams > Page 722 · Location 14540

Stream results can be stored, used to trigger software events, or used as input to a subsequent stream.

Highlight(yellow) - Uses of Stream Processing > Page 726 · Location 14620

Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters (which we encountered in “Performance optimizations”) for set membership, HyperLogLog [72] for cardinality estimation, and various percentile estimation algorithms (see “Percentiles in Practice”).

Highlight(yellow) - Reasoning About Time > Page 730 · Location 14704

A batch process may read a year’s worth of historical events within a few minutes; in most cases, the timeline of interest is the year of history, not the few minutes of processing.

Highlight(yellow) - Reasoning About Time > Page 730 · Location 14708

On the other hand, many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing [79].

Highlight(yellow) - Reasoning About Time > Page 732 · Location 14743

A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.

Highlight(yellow) - Reasoning About Time > Page 735 · Location 14792

A tumbling window has a fixed length, and every event belongs to exactly one window.

Highlight(yellow) - Reasoning About Time > Page 735 · Location 14797

A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing.

Highlight(yellow) - Reasoning About Time > Page 735 · Location 14801

A sliding window contains all the events that occur within some interval of each other.

Highlight(yellow) - Reasoning About Time > Page 736 · Location 14807

Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes).

Note - Reasoning About Time > Page 736 · Location 14808

Session windowing

Highlight(yellow) - Stream Joins > Page 736 · Location 14818

the fact that new events can appear anytime on a stream makes joins on streams more challenging than in batch jobs. To understand the situation better, let’s distinguish three different types of joins: stream-stream joins, stream-table joins, and table-table joins [84].

Highlight(yellow) - Stream Joins > Page 737 · Location 14839

To implement this type of join, a stream processor needs to maintain state: for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.

Note - Stream Joins > Page 738 · Location 14844

Stream-stream join

Highlight(yellow) - Stream Joins > Page 738 · Location 14854

To perform this join, the stream process needs to look at one activity event at a time, look up the event’s user ID in the database, and add the profile information to the activity event.

Note - Stream Joins > Page 738 · Location 14855

Stream-table join

Highlight(yellow) - Stream Joins > Page 739 · Location 14868

A stream-table join is actually very similar to a stream-stream join; the biggest difference is that for the table changelog stream, the join uses a window that reaches back to the “beginning of time” (a conceptually infinite window), with newer versions of records overwriting older ones. For the stream input, the join might not maintain a window at all.

Highlight(yellow) - Stream Joins > Page 740 · Location 14892

Another way of looking at this stream process is that it maintains a materialized view for a query that joins two tables

Note - Stream Joins > Page 740 · Location 14893

In stream processing, a table-table join is a process that maintains a materialized view. It is built by applying new events to an existing data store.

Highlight(yellow) - Stream Joins > Page 741 · Location 14912

The order of the events that maintain the state is important

Highlight(yellow) - Stream Joins > Page 741 · Location 14914

This raises a question: if events on different streams happen around a similar time, in which order are they processed?

Highlight(yellow) - Stream Joins > Page 741 · Location 14921

If the ordering of events across streams is undetermined, the join becomes nondeterministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again.

Highlight(yellow) - Stream Joins > Page 741 · Location 14928

In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record: for example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale [88, 89]. This change makes the join deterministic, but has the consequence that log compaction is not possible, since all versions of the records in the table need to be retained.

Highlight(yellow) - Fault Tolerance > Page 742 · Location 14951

waiting until a task is finished before making its output visible is not an option, because a stream is infinite and so you can never finish processing it.

Highlight(yellow) - Fault Tolerance > Page 742 · Location 14958

One solution is to break the stream into small blocks, and treat each block like a miniature batch process.

Highlight(yellow) - Fault Tolerance > Page 743 · Location 14970

A variant approach, used in Apache Flink, is to periodically generate rolling checkpoints of state and write them to durable storage [92, 93].

Highlight(yellow) - Fault Tolerance > Page 743 · Location 14977

However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed batch.

Highlight(yellow) - Fault Tolerance > Page 745 · Location 15014

Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.