History

Offload

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

API Doc

A standalone function, meaning one that doesn’t use pipes and streams, that reads events from the specified source RStreams queue and then calls your transform function allowing you to do anything you want to with the data.

When would I use this?

  • You want to read from a source queue and then write it to a resource or system that isn’t another RStreams queue

    • Write to a database
    • Send data to an API
  • You want to read from a source queue and perform aggregations/analytics on data before sending to another system

Runnable Examples

This expects you’ve run the examples in the enrich Operation to populate queues with data.

Example 1

The first example illustrates code running as a bot with ID of rstreams-example.offload-one-peopleplus and getting exactly two events from queue rstreams-example.peopleplus, starting at position z/2022/04/20, and then simply saves each event to another system by calling that system’s API. The endpoint here is a free, public API that lets you mock out the response and just throws away your request, but works for our purposes.

Two things to note here. First is that the transform function is typed for both the callback and async variety but please only use the async version going forward - all new features are only being added to the async approach.

Second, there are actually three arguments to the transform function, even though in our example we are only using the first. What is stored in an RStreams queue is an instance of a ReadEvent where the payload attribute is the data the queue exists for. The first argument is just the payload pulled out since usually that’s all you need. The second argument is the full event from the queue with the event ID and other sometimes useful things. The third argument is only used in the callback version where you call done exactly once to trigger the callback. It’s there for backwared compat. Don’t use it on new things.

  • throw Error
    If you throw an error at anytime the pipe will error out and your upstream queue will not be checkpointed
  • return true
    This tells the SDK to checkpoint for me in the upstream queue read from. If we’re not batching, then this checkpoints the one event. If we’re batching, this checkpoints up to the final event in the batch
  • return false
    This tells the SDK not to checkpoint this event in the upstream queue read from
Example 1 code
 1import { OffloadOptions,  RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import axios, { AxiosResponse } from "axios";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7  const opts: OffloadOptions<Person>  = {
 8    id: 'rstreams-example.offload-one-peopleplus',
 9    inQueue: 'rstreams-example.people',
10    start: 'z/2022/04/20',
11    limit: 2,
12    transform: async (person: Person) => {
13        await savePerson(person);
14        return true;        
15    }
16  };
17
18  await rsdk.offloadEvents<Person>(opts);
19}
20
21interface PostResponse {
22    success: boolean;
23}
24
25/**
26 * @param person Save the person to another system.
27 */
28async function savePerson(person: Person): Promise<void> {
29  const url = `https://run.mocky.io/v3/83997150-ab13-43da-9fb9-66051ba06c10?mocky-delay=500ms`;    
30  const {data, status}: AxiosResponse<PostResponse, any> = await axios.post<PostResponse>(url, person);
31  if (status !== 200 || !data || data.success !== true) {
32    throw new Error('Saving person to external system failed');
33  }
34}
35
36(async () => {
37  await main();
38})()

Note: Person types referenced in the examples

Example 2

This example is nearly identical to Example 1 above except that this time we are are going to use config to tell the SDK to batch up events for us so we can be more efficient. The calls out to a public API to save the event elsewhere are intentionally delayed by 500ms each, a not uncommon API latency. So, we’re at risk of not being able to read and offload events from the upstream queue fast enough to keep up if events are slamming into that upstream queue super fast.

So, we’re going to ask the SDK to micro-batch up events 10 at a time and then invoke our transform function with all ten at once and if it’s waited more than one second for 10 to show up then our config tells the SDK to just go ahead and invoke transform with whatever it’s got so far. Then in the offload transform function we’re going to modify our savePerson function to make concurrent POST API calls for each person we are saving, parallelizing the work and making it much faster so we can keep up. To make the example more interesting, we set limit now to 100 so we get a lot more events before we stop reading from the upstream queue. The config that is inherited from the ReadOptions is important for specifying how long we’re meant to read from the upstream queue before we stop reading and close down shop. If you’re running in a lambda function, you’ve only got 15 min before AWS shuts down your lambda and that may sound like a long time unless you are reading from a queue that is forever getting new events shoved into it, a pretty common case. By default, if you don’t set any config to tell the SDK when to stop reading from the upstream queue, the SDK will read for up to 80% of the total time remaining for your lambda, if you are in fact running as a lambda. That then saves 20% of the time for you to finish processing.

You’ll notice that because we used the OffloadBatchOptions to batch things up that the transform function arguments change. That’s because the SDK isn’t invoking transform with just one object but with the batch: an array of objects.

The first argument is just the array of events direct from the upstream queue. The second arg is an event wrapper around the whole array of events directly from the upstream queue - not really needed except in rare use cases. The third argument is for backward compatability when using the offload as a callback instead of using async. Please only use async going forward and so you don’t need the third arg.

When we’re done offloading the events, we simply return true telling the SDK to checkpoint for us in the upstream queue. See Returning from an offload async transform function above for more details.

Note: Person types referenced in the examples

Example 2 code
 1import { OffloadBatchOptions, ReadEvent, RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import axios, { AxiosResponse } from "axios";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7  const opts: OffloadBatchOptions<Person>  = {
 8    id: 'rstreams-example.offload-one-peopleplus',
 9    inQueue: 'rstreams-example.people',
10    batch: {
11        count: 10,
12        time: 1000
13    },
14    start: 'z/2022/04/20',
15    limit: 2,
16    transform: async (people: ReadEvent<Person>[]) => {
17        await savePeople(people);
18        return true;        
19    }
20  };
21
22  await rsdk.offloadEvents<Person>(opts);
23}
24
25interface PostResponse {success: boolean;}
26interface PostResponseStatus extends PostResponse {status: number} ;
27
28/**
29 * @param person Save the person to another system.
30 */
31async function savePeople(people: ReadEvent<Person>[]): Promise<void> {
32  const url = `https://run.mocky.io/v3/83997150-ab13-43da-9fb9-66051ba06c10?mocky-delay=500ms`;    
33
34  const responses: PostResponseStatus[] = (await Promise.all(
35    people.map((person) => axios.post<PostResponse>(url, person.payload)))).map((obj) => {
36      return {status: obj.status, success: obj.data ? obj.data.success : false};
37    });
38
39  responses.forEach((resp) => {
40    if (resp.status !== 200 || resp.success !== true) {
41      throw new Error('Saving person to external system failed');
42    }
43  });
44}
45
46(async () => {
47  await main();
48})()