Hello again and welcome back! When we last left off, we had just given up on trying to find a “simple” solution to the following problem:
How can we read events from a table as they are being written without skipping records that haven’t committed at read time?
To provide a concrete example, we might run the following query:
SELECT id FROM events ORDER BY id WHERE id > 184
and get back
185, 186, 189 even though events
188 would show up in the results if we were to run this query sometime later. Why? Because it’s possible that at the time of our first read, the transaction that inserts
188 has not yet committed.
We can’t avoid this issue by looking at each
id and waiting for
id + 1 to show up before proceeding because the ids are generated by a Postgres sequence, and it is possible that there are gaps that will never be filled.
If this problem still seems unclear, please go back and read the first post.
Ideally, we would have a cleaner sequence of event ids. In particular, we want each id to be 1 greater than the previous id, with no gaps. This would permit us to read sequentially without skipping anything. As discussed in the previous post, the table we happen to be reading from is managed by a third party library, and we can’t dramatically modify how records are written to it.
But what if we could generate a table of surrogate event ids, arranged in the order that the event records were actually committed? And what if we could write these surrogates in such a way that we left no gaps in their id sequence and didn’t cause egregious performance problems? If this were possible, we could reliably process events by looping over the surrogate table to determine which events we still needed to process. This would permit us to process the events roughly in order, and ensure that none are skipped.
Before I go into this solution, I want to credit its source. Commanded is an awesome CQRS/ES framework written in Elixir. It includes an event store implementation which is a veritable treasure trove of Postgres tricks. The following is
directly lifted adapted from how that event store generates stream records1.
We will create a table called
events_stream. This table will consist of two columns,
id column will be an integer representing event insert/commit order, and the
event_id column will be a foreign key to the corresponding event record. While the
id column will be a primary key, it will not be associated with a Postgres sequence because, as discussed in the previous post, sequences can have gaps. Instead, we will generate them using a different mechanism.
We will create a second table
stream_ids containing a single column,
id, which is again an integer primary key not associated with a Postgres sequence. We will only write a single row to this table which will act as the id counter for the
events_stream table. Each time we want to insert records into
events_stream, we will increment the
id of the single record in the
stream_ids table by the number of records we are inserting into
events_stream. We will do this within the same transaction that inserts the
To give a concerete example, let’s say events
186 have just been inserted. We would run the following query to record them in the
WITH stream AS ( UPDATE stream_ids SET id = id + 2 RETURNING id - 2 AS original_id ), events (event_id, index) AS ( VALUES (185, 1), (186, 2) ) INSERT INTO events_stream (id, event_id) SELECT stream.original_id + events.index, events.event_id FROM events, stream;
stream common table expression in the above query contains an
UPDATE which increments the
id of our single
stream_ids record by the number of
event_stream records we are inserting, and returns the original value of the
id before it was incremented. The ids of the
events_stream records we are inserting are generated relative to this original value. So the first
events_stream record is inserted with original id + 1, the second original id + 2, and so on.
The beauty of this query is the relative id incrementing. Even if we are inserting many
events_stream records concurrently, we will never get a duplicate or a gap. It forces the sequence of
event_stream ids to have just the properties we want.
Okay, this is great, but when does the query to stream the events run? In Commanded, it’s run within the transaction that writes the events in the first place. There, the context is a little bit different because the streams are a first-class resource rather than something which was tacked on later.
Another good place to run it would be inside of a trigger that fires on inserts to the
events table. This works well provided that your event inserts are fairly fast because concurrent inserts can now force transactions writing events in other processes to be held open until they complete2.
If you have some event writes that take a long time and you don’t want these to interfere with the processing of other events, a third possibility is to use a queue. Here, you use a trigger on event insert to add the newly inserted event
id to a queue table. You then have a separate process that dequeues and streams items from this table in a single transaction.
Now, we can finally fix the code that caused all of this trouble in the first place. We upgrade the original code to loop over the
event_stream records to determine which events to process:
sql = "SELECT id, event_id FROM events_stream WHERE id > $1 ORDER BY ID" streamed_events = fetch_records(sql, last_seen) for streamed_event in streamed_events sql = "SELECT * FROM events WHERE id = $?" event = fetch_record(sql, streamed_event.event_id) process(event) last_seen = streamed_event.id end end
This approach ended up being somewhat more involved than we were expecting when we ran into the issue in the first place. On the bright side, we were able to implement it without having to make any changes to the 3rd party library that manages the events table, and it has been quite stable over the last few months in production.
NOTE: crossposted at https://tech.precisionnutrition.com
The reason for this is that when using the default
READ COMMITTED transaction isolation level, an
UPDATE will wait for any concurrent transactions open on its target row to commit or rollback before it proceeds. See here for more details. Hat tip again to @drteeth for digging deep enough to understand what was going on with concurrent queries of this type. ↩