History

Through

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

Only use the throughAsync SDK method and not through as it is not needed and is deprecated.

API Doc

This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.

Note that through intentionally doesn’t know anything about RStreams events. All it does it take in what it is given from the previous pipe step and then send on what you return from the function to the next pipe step.

For example, if your source pipe step produces an object of type Person then you are going to get an object of type Person and not ReadEvent<Person>. The small example below uses the popular event-stream library exported by the RStreams SDK to turn an array into a source stream to seed the pipe with content from the array.

1const arr: Person[] = [{name: 'jane doe'}];
2await rsdk.streams.pipeAsync( 
3  rsdk.streams.eventstream.readArray(people.results),
4  rsdk.throughAsync<Person>(event) {
5    event.name = 'john doe';
6    return event;
7  },
8  rsdk.streams.devnull()
9);

When would I use this?

  • I want to do something more involved so enrich doesn’t work for me
  • I want to take data events provided and transform them and send them through to the next pipe step

Runnable Examples

Example 1

This example reads 5 events from the rstreams-example.peopleplus RStreams queue. The pipe then creates a throughAsync stream step that just takes the ReadEvent<Person> events read from the bus and turns it into a PersonLight event and sends it on to the toCSV stream to make a CSV line ready to stick in a CSV file.

Finally, it writes the file using the Node standard filesystem fsmodule to create a sink that writes events that flow into the sink to a file. Pretty convenient.

The toCSV function’s first argument, if true, writes a CSV header as the first row. If the toCSV function’s first argument is an array of strings, it uses that as the CSV header first row. The second arg is options that come from the underlying fast-csv NPM module that generates the CSV file: fast-csv options.

Example 1 code
 1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import fs from "fs";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7
 8  const opts: ReadOptions = {
 9    start: 'z/2022/04/20',
10    limit: 5
11  }
12
13  await rsdk.streams.pipeAsync(
14    rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15                      'rstreams-example.peopleplus', opts),
16    rsdk.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17      return {
18        firstName: p.payload.firstName,
19        lastName: p.payload.lastName,
20        email: p.payload.email
21      }
22    }),
23    rsdk.streams.toCSV(true, {quote: '"'}),
24    fs.createWriteStream("./output/people.csv"),
25  );
26}
27
28interface PersonLight {
29    firstName: string;
30    lastName: string;
31    email: string;
32}
33
34(async () => {
35  try {
36    await main();
37  } catch(err) {
38    console.log(err);
39  }
40})()
Generated people json lines file
firstName,lastName,email
Herman,Morris,herman.morris@example.com
Herman,Morris,herman.morris@example.com
Tomothy,Rogers,tomothy.rogers@example.com
Herman,Morris,herman.morris@example.com
Tomothy,Rogers,tomothy.rogers@example.com

Note: Person types referenced in the examples

Example 2

Here we see the real power of a through. We are reading data from a stream populated with people that are of type PersonRaw. We want to read them, translate them from PersonRaw to a simpler type named Person and then look at the country on each object and push those that live in the US to a queue named rstreams-example.peopleplus-us and the others to a queue named rstreams-example.peopleplus.

Things to learn from this example:

Reading Data

We are only reading up to 5 objects and then closing the source stream which closes down the pipe

We don’t have a batch stream step between the read and the throughAsync since we don’t need it because the logic we have in the throughAsync doesn’t do anything that takes time. If we reached out to say an API or database here to further enrich the person with external data, then we would absolutely want to use the batch stream step to micro-batch and get arrays of data sent to the throughAsync with a type of Array<ReadEvent<PersonRaw>>. This would then let you hit the database once per group of events, perhaps, building a single SQL statement to get all the data so your through doesn’t slow down.

Creating a BaseEvent

Note that throughAsync doesn’t wrap your return value in a BaseEvent which is what the load needs so we make one ourself and return it. Normally, there are only three attributes you need to care about:

  • payload: the data this event exists to wrap
  • event_source_timestamp: this is when the very first event hit any queue of the bus that eventually led to this event, no matter how many queues it flowed through to get here and how the event was transformed along the way. So, copying the source event’s (PersonRaw) event_source_timestamp is almost always the right thing to do so we can carry forward this date. It is used to understand how long it has taken for an object to propagate through the system and is very important.
  • correlation_id: The short answer is that you need to have this so be sure to just use the helper API rsdk.streams.createCorrelation to make one for you. What is correlation_id?

Normally, we let the SDK set the other things on the BaseEvent for us. If we don’t include the event attribute, which is what queue to send the event to, then the downstream load stream’s default queue value will be put on the event and used to send the event to that queue. The second argument to the load on Line 28 is the default queue to send events to that don’t have the event attribute to tell the SDK which queue to send things to.

rstreams-example.peopleplus

Example 2 code
 1import { BaseEvent, ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
 2import { PersonRaw, Person } from '../lib/types';
 3
 4async function main() {
 5  const rsdk: RStreamsSdk  = new RStreamsSdk();
 6  const es = rsdk.streams.eventstream;
 7  const botId = 'rstreams-example.people-to-peopleplusandus';
 8
 9  const readOpts: ReadOptions = {
10    start: 'z/2022/04/20',
11    limit: 5
12  }
13
14  await rsdk.streams.pipeAsync(
15    rsdk.read<PersonRaw>(botId, 'rstreams-example.people', readOpts),
16    rsdk.throughAsync<ReadEvent<PersonRaw>, BaseEvent<Person>>((event) => {
17      const queue = event.payload.location.country === 'United States' ? 
18                                                       'rstreams-example.peopleplus' : 
19                                                       'rstreams-example.peopleplus-us'
20      const result: BaseEvent<Person> = {
21        event: queue,
22        payload: translate(event.payload),
23        event_source_timestamp: event.event_source_timestamp,
24        correlation_id: rsdk.streams.createCorrelation(event)
25      };
26      return result;
27    }),
28    rsdk.load(botId, 'rstreams-example.peopleplus', {force: true})
29  );
30}
31
32/**
33 * @param p The type from the public API we want to modify
34 * @returns The new type that is flatter and gets rid of some attributes don't need
35 */
36 function translate(p: PersonRaw): Person {
37    return {
38      gender: p.gender,
39      firstName: p.name.first,
40      lastName: p.name.last,
41      email: p.email,
42      birthDate: p.dob.date,
43      nationality: p.nat,
44      addr: {
45        addr1: p.location.street.number + ' ' + p.location.street.name,
46        city: p.location.city, 
47        state: p.location.state,
48        country: p.location.country,
49        postcode: p.location.postcode,
50        longitude: p.location.coordinates.longitude,
51        latitude: p.location.coordinates.latitude,
52        tzOffset: p.location.timezone.offset,
53        tzDesc: p.location.timezone.description
54      }
55    }
56  }
57
58(async () => {
59  await main();
60})()

Note: Person types referenced in the examples