Some technical writing by Justin Giancola. I work on the platform team at Precision Nutrition.
Hello again and welcome back! When we last left off, we had just given up on trying to find a “simple” solution to the following problem:
How can we read events from a table as they are being written without skipping records that haven’t committed at read time?
To provide a concrete example, we might run the following query:
SELECT id FROM events ORDER BY id WHERE id > 184
and get back 185, 186, 189
even though events 187
and 188
would show up in the results if we were to run this query sometime later. Why? Because it’s possible that at the time of our first read, the transaction that inserts 187
and 188
has not yet committed.
We can’t avoid this issue by looking at each id
and waiting for id + 1
to show up before proceeding because the ids are generated by a Postgres sequence, and it is possible that there are gaps that will never be filled.
If this problem still seems unclear, please go back and read the first post.
Ideally, we would have a cleaner sequence of event ids. In particular, we want each id to be 1 greater than the previous id, with no gaps. This would permit us to read sequentially without skipping anything. As discussed in the previous post, the table we happen to be reading from is managed by a third party library, and we can’t dramatically modify how records are written to it.
But what if we could generate a table of surrogate event ids, arranged in the order that the event records were actually committed? And what if we could write these surrogates in such a way that we left no gaps in their id sequence and didn’t cause egregious performance problems? If this were possible, we could reliably process events by looping over the surrogate table to determine which events we still needed to process. This would permit us to process the events roughly in order, and ensure that none are skipped.
Before I go into this solution, I want to credit its source. Commanded is an awesome CQRS/ES framework written in Elixir. It includes an event store implementation which is a veritable treasure trove of Postgres tricks. The following is directly lifted adapted from how that event store generates stream records1.
We will create a table called events_stream
. This table will consist of two columns, id
and event_id
. The id
column will be an integer representing event insert/commit order, and the event_id
column will be a foreign key to the corresponding event record. While the id
column will be a primary key, it will not be associated with a Postgres sequence because, as discussed in the previous post, sequences can have gaps. Instead, we will generate them using a different mechanism.
We will create a second table stream_ids
containing a single column, id
, which is again an integer primary key not associated with a Postgres sequence. We will only write a single row to this table which will act as the id counter for the events_stream
table. Each time we want to insert records into events_stream
, we will increment the id
of the single record in the stream_ids
table by the number of records we are inserting into events_stream
. We will do this within the same transaction that inserts the events_stream
records.
To give a concerete example, let’s say events 185
and 186
have just been inserted. We would run the following query to record them in the events_stream
table:
WITH stream AS (
UPDATE stream_ids SET id = id + 2
RETURNING id - 2 AS original_id
),
events (event_id, index) AS (
VALUES (185, 1), (186, 2)
)
INSERT INTO events_stream (id, event_id)
SELECT
stream.original_id + events.index,
events.event_id
FROM events, stream;
The stream
common table expression in the above query contains an UPDATE
which increments the id
of our single stream_ids
record by the number of event_stream
records we are inserting, and returns the original value of the id
before it was incremented. The ids of the events_stream
records we are inserting are generated relative to this original value. So the first events_stream
record is inserted with original id + 1, the second original id + 2, and so on.
The beauty of this query is the relative id incrementing. Even if we are inserting many events_stream
records concurrently, we will never get a duplicate or a gap. It forces the sequence of event_stream
ids to have just the properties we want.
Okay, this is great, but when does the query to stream the events run? In Commanded, it’s run within the transaction that writes the events in the first place. There, the context is a little bit different because the streams are a first-class resource rather than something which was tacked on later.
Another good place to run it would be inside of a trigger that fires on inserts to the events
table. This works well provided that your event inserts are fairly fast because concurrent inserts can now force transactions writing events in other processes to be held open until they complete2.
If you have some event writes that take a long time and you don’t want these to interfere with the processing of other events, a third possibility is to use a queue. Here, you use a trigger on event insert to add the newly inserted event id
to a queue table. You then have a separate process that dequeues and streams items from this table in a single transaction.
Now, we can finally fix the code that caused all of this trouble in the first place. We upgrade the original code to loop over the event_stream
records to determine which events to process:
sql = "SELECT id, event_id FROM events_stream WHERE id > $1 ORDER BY ID"
streamed_events = fetch_records(sql, last_seen)
for streamed_event in streamed_events
sql = "SELECT * FROM events WHERE id = $?"
event = fetch_record(sql, streamed_event.event_id)
process(event)
last_seen = streamed_event.id
end
This approach ended up being somewhat more involved than we were expecting when we ran into the issue in the first place. On the bright side, we were able to implement it without having to make any changes to the 3rd party library that manages the events table, and it has been quite stable over the last few months in production.
NOTE: crossposted at https://tech.precisionnutrition.com
And if you’re curious, you can read the implementation right here ↩
The reason for this is that when using the default READ COMMITTED
transaction isolation level, an UPDATE
will wait for any concurrent transactions open on its target row to commit or rollback before it proceeds. See here for more details. Hat tip again to @drteeth for digging deep enough to understand what was going on with concurrent queries of this type. ↩