Summary
A checkpoint is a saved position in an RStreams queue. The Node SDK maintains the read checkpoint for all bots that read from a given queue. The Node SDK maintains a write checkpoing for all bots that write to a given queue. When a bot is restarted and starts reading from a queue, it will by default begin reading from its checkpoint (think position) in the queue.
So, “to checkpoint” or the act of “checkpointing” means using the SDK to write the read or write checkpoint for a bot for the stream it is reading from or writing to.
Most of the time, the SDK automatically checkpoints for you and you don’t need to care. However, you need to understand generally what it is so you don’t get in trouble.
Checkpointing and Correlation
IF YOU HAVE AN EVENT THAT DOESN’T INCLUDE A VALID CORRELATION_ID THEN THE SDK CANNOT CHECKPOINT FOR YOU.
If the above sentence is all you remember from this article, then victory is assured :)
An event has an object called correlation_id. Jump over to the fundamentals doc and read the brief section on Correlation ID if you haven’t already.
The SDK will checkpoint for you if your event has a correlation_id
and without it the SDK can’t. Why? There is no magic to
how the SDK works. The correlation information tells the SDK the source queue and exact event that an event came from and
without this it simply can’t save your bot’s read or write position in a queue.
Read Checkpoints
When your bot reads an event from an RStreams queue, you are going to do something with that event and then want to read the next event in the queue. What if you read event A and before you process the event your bot crashes and shuts down.
When your bot restarts, you want to read event A. The good news is that all events in RStreams queues are persisted so no worries, your event is still there. But, where was event A in the queue? Each event has an ID that both uniquely identifies that event and marks it time-based position in that queue. So, if you know know the time of the last event in the queue was written to that queue, you could start reading from about that position by crafting an event ID that maybe gets you close to where you were reading. Here’s an event Id that will start reading the first event in the 13th minute of April 13 at 5PM UTC.
z/2022/04/13/17/13
But, that’s not great. So, the SDK keeps track of this for you. Here’s how?
Read Checkpointing for Offload and Enrich
When you have an operation that reads from a queue, such as offload or
enrich, the SDK will checkpoint events read from the source queue
when events flow out of the sink step that is hidden within these operations. In the case of offload
, that’s when the
offload function returns. In the cae of enrich
, it’s when an event is written to the destination queue.
Here’s exactly what “checkpoint events” means. It means that periodically, it will send an update back to the RStreams Bus
to have it save this bot’s read position in the queue the bot is reading data from using the correlation_id
of events
that have flowed into the offload
and the function has returned without error and in the case of enrich
for the events
that have been successfully written to the destination queue.
So, each time your bot starts up it will automatically call out to the RStreams Bus
and ask for this bot’s current read position,
the checkpoint, in the given queue it’s reading from and will continue pulling events from that position forward in time through
the queue.
Read Checkpointing for a Pipe
If your last pipe stream step, your sink, writes to a queue using a load
stream, then periodically as the SDK writes the
events to the desination queue it will also updatethis bot’s checkpoint in the queue that you read from in your source stream.
Read Checkpoint Examples
Example 1
Here we are creating a source stream that goes out and gets data not yet in a queue of the RStreams Bus
and we
write it to the RStreams queue named my-destination-queue
acting as a bot named my-cool-bot
.
The SDK will not checkpoint what you are reading from since your source isn’t an RStreams queue.
May sound obvious but just need to do a gut check that folks are getting it.
1import { RStreamsSdk } from "leo-sdk";
2
3async function main() {
4 const rsdk: RStreamsSdk = new RStreamsSdk();
5
6 await rsdk.streams.pipeAsync(
7 rsdk.createSource(async () => {
8 // Call out to a database to feed data into the pipe
9 }, opts, state),
10 rsdk.load('my-cool-bot', 'my-destination-queue')
11 );
12}
Example 2
Here we are reading events as a bot named my-cool-bot
and writing them to another system, in this case some database
external to RStreams. Notice that on line 12 we return true after saving the event to the database. This tells the
SDK that processing was successful and that we should checkpoint the position of this event for the my-cool-bot
in the my-source-queue
queue.
1import { RStreamsSdk } from "leo-sdk";
2
3async function main() {
4 const rsdk: RStreamsSdk = new RStreamsSdk();
5 const opts: OffloadOptions<Person> = {
6 id: 'my-cool-bot',
7 inQueue: 'my-source-queue',
8 start: 'z/2022/04/20',
9 limit: 2,
10 transform: async (person: Person) => {
11 // Save the person to a database
12 return true;
13 }
14 };
15
16 await rsdk.offloadEvents<Person>(opts);
17}
Example 3
Here we are reading events as a bot named my-cool-bot
from a queue named my-source-queue
, translating them from a PersonRaw
to
a Person
object and then writing them a queue named my-dest-queue
. You’ll notice that on line 15 we are returning the newly
translated person to be sent to the my-dest-queue
queue. The SDK will checkpoint the original event from my-source-queue
for you after it’s sure that the derived event that you returned from the transform
function was successfully written
to the my-dest-queue
queue.
Let’s say that you wanted to skip one person event and not write it to the dest queue but you still wanted to checkpoint the
source event from the souce queue? Well, in that case you’d just do this for that one event return true;
which tells the SDK
not to send an event to the destination but to go ahead and checkpoint in the source queue that the bot handled the event.
1import { EnrichOptions, RStreamsSdk } from "leo-sdk";
2
3async function main() {
4 const rsdk: RStreamsSdk = new RStreamsSdk();
5 const opts: EnrichOptions<PersonRaw, Person> = {
6 id: 'my-cool-bot',
7 inQueue: 'my-source-queue',
8 outQueue: 'my-dest-queue',
9 start: 'z/2022/04/20',
10 config: {
11 limit: 2
12 },
13 transform: async (person: PersonRaw) => {
14 const p: Person = translate(person);
15 return p;
16 }
17 };
18
19 await rsdk.enrichEvents<PersonRaw, Person>(opts);
20}
Example 4
Here we are reading events from queue my-source-queue
as a bot named my-cool-bot
and then
transforming the event from a PersonRaw
to a Person
object and then sending the new Person
object to the queue named my-dest-queue
doing so as a bot named my-cool-bot
.
The SDK will checkpoint that we’ve read and handled the event in my-source-queue
for us only after it’s sure
that the new derived event has been written to my-dest-queue
. That’s why it is so important that derived
events have the correct correlation_id
correlation bookkeeping information. The SDK uses that in derived events to know what source event
correlates to the new event so it can checkpoint for us.
1import { BaseEvent, ReadEvent, RStreamsSdk } from "leo-sdk";
2import { PersonRaw, Person } from '../lib/types';
3
4async function main() {
5 const rsdk: RStreamsSdk = new RStreamsSdk();
6 const botId = 'rstreams-example.people-to-peopleplusandus';
7
8 await rsdk.streams.pipeAsync(
9 rsdk.read<PersonRaw>('my-cool-bot', 'my-source-queue'),
10 rsdk.throughAsync<ReadEvent<PersonRaw>, BaseEvent<Person>>((event) => {
11 const result: BaseEvent<Person> = {
12 payload: translate(event.payload),
13 event_source_timestamp: event.event_source_timestamp,
14 correlation_id: rsdk.streams.createCorrelation(event)
15 };
16
17 return result;
18 }),
19 rsdk.load('my-cool-bot', 'my-dest-queue', {force: true})
20 );
21}