You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
Only use the throughAsync
SDK method and not through
as it is not needed and is deprecated.
This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.
Note that through
intentionally doesn’t know anything about RStreams events. All it does it take in what it
is given from the previous pipe step and then send on what you return from the function to the next pipe step.
For example, if your source pipe step produces an object of type Person
then you are going to get
an object of type Person
and not ReadEvent<Person>
. The small example below uses the popular
event-stream library exported by the RStreams SDK
to turn an array into a source stream to seed the pipe with content from the array.
1const arr: Person[] = [{name: 'jane doe'}];
2await rsdk.streams.pipeAsync(
3 rsdk.streams.eventstream.readArray(people.results),
4 rsdk.throughAsync<Person>(event) {
5 event.name = 'john doe';
6 return event;
7 },
8 rsdk.streams.devnull()
9);
When would I use this?
- I want to do something more involved so enrich doesn’t work for me
- I want to take data events provided and transform them and send them through to the next pipe step
Runnable Examples
Example 1
This example reads 5 events from the rstreams-example.peopleplus
RStreams queue. The pipe then creates
a throughAsync
stream step that just takes the ReadEvent<Person>
events read from the bus
and turns it into a PersonLight
event and sends it on to the toCSV
stream to make a
CSV line ready to stick in a CSV file.
Finally, it writes the file using the Node standard filesystem fs
module to create a sink that writes events that flow into
the sink to a file. Pretty convenient.
The toCSV
function’s first argument, if true, writes a CSV header as the first row. If the toCSV
function’s first
argument is an array of strings, it uses that as the CSV header first row. The second arg is options that
come from the underlying fast-csv NPM module
that generates the CSV file: fast-csv options.
1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
2import { Person } from "../lib/types";
3import fs from "fs";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7
8 const opts: ReadOptions = {
9 start: 'z/2022/04/20',
10 limit: 5
11 }
12
13 await rsdk.streams.pipeAsync(
14 rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15 'rstreams-example.peopleplus', opts),
16 rsdk.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17 return {
18 firstName: p.payload.firstName,
19 lastName: p.payload.lastName,
20 email: p.payload.email
21 }
22 }),
23 rsdk.streams.toCSV(true, {quote: '"'}),
24 fs.createWriteStream("./output/people.csv"),
25 );
26}
27
28interface PersonLight {
29 firstName: string;
30 lastName: string;
31 email: string;
32}
33
34(async () => {
35 try {
36 await main();
37 } catch(err) {
38 console.log(err);
39 }
40})()
firstName,lastName,email
Herman,Morris,herman.morris@example.com
Herman,Morris,herman.morris@example.com
Tomothy,Rogers,tomothy.rogers@example.com
Herman,Morris,herman.morris@example.com
Tomothy,Rogers,tomothy.rogers@example.com
Note: Person types referenced in the examples
Example 2
Here we see the real power of a through
. We are reading data from a stream populated with
people that are of type PersonRaw
. We want to read them, translate them from PersonRaw
to a
simpler type named Person
and then look at the country on each object and push those
that live in the US to a queue named rstreams-example.peopleplus-us
and the others to
a queue named rstreams-example.peopleplus
.
Things to learn from this example:
Reading Data
We are only reading up to 5 objects and then closing the source stream which closes down the pipe
We don’t have a batch stream step
between the read
and the throughAsync
since we don’t need it because the logic we have in the
throughAsync
doesn’t do anything that takes time. If we reached out to say an API or database here
to further enrich the person with external data, then we would absolutely want to use the batch
stream step
to micro-batch and get arrays of data sent to the throughAsync
with a type of Array<ReadEvent<PersonRaw>>
.
This would then let you hit the database once per group of events, perhaps, building a single SQL statement
to get all the data so your through
doesn’t slow down.
Creating a BaseEvent
Note that throughAsync
doesn’t wrap your return value in a
BaseEvent which is what the load
needs so
we make one ourself and return it. Normally, there are only three attributes you need to care about:
- payload: the data this event exists to wrap
- event_source_timestamp: this is when the very first event hit any queue of the bus that eventually led
to this event, no matter how many queues it flowed through to get here and how the event was transformed
along the way. So, copying the source event’s (
PersonRaw
)event_source_timestamp
is almost always the right thing to do so we can carry forward this date. It is used to understand how long it has taken for an object to propagate through the system and is very important. - correlation_id: The short answer is that you need to have this so be sure to just use the helper API rsdk.streams.createCorrelation to make one for you. What is correlation_id?
Normally, we let the SDK set the other things on the BaseEvent
for us. If we don’t include the event
attribute, which is what queue to send the event to, then the downstream load
stream’s default
queue value will be put on the event and used to send the event to that queue. The second argument
to the load
on Line 28 is the default queue to send events to that don’t have the event
attribute to tell
the SDK which queue to send things to.
rstreams-example.peopleplus
1import { BaseEvent, ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
2import { PersonRaw, Person } from '../lib/types';
3
4async function main() {
5 const rsdk: RStreamsSdk = new RStreamsSdk();
6 const es = rsdk.streams.eventstream;
7 const botId = 'rstreams-example.people-to-peopleplusandus';
8
9 const readOpts: ReadOptions = {
10 start: 'z/2022/04/20',
11 limit: 5
12 }
13
14 await rsdk.streams.pipeAsync(
15 rsdk.read<PersonRaw>(botId, 'rstreams-example.people', readOpts),
16 rsdk.throughAsync<ReadEvent<PersonRaw>, BaseEvent<Person>>((event) => {
17 const queue = event.payload.location.country === 'United States' ?
18 'rstreams-example.peopleplus' :
19 'rstreams-example.peopleplus-us'
20 const result: BaseEvent<Person> = {
21 event: queue,
22 payload: translate(event.payload),
23 event_source_timestamp: event.event_source_timestamp,
24 correlation_id: rsdk.streams.createCorrelation(event)
25 };
26 return result;
27 }),
28 rsdk.load(botId, 'rstreams-example.peopleplus', {force: true})
29 );
30}
31
32/**
33 * @param p The type from the public API we want to modify
34 * @returns The new type that is flatter and gets rid of some attributes don't need
35 */
36 function translate(p: PersonRaw): Person {
37 return {
38 gender: p.gender,
39 firstName: p.name.first,
40 lastName: p.name.last,
41 email: p.email,
42 birthDate: p.dob.date,
43 nationality: p.nat,
44 addr: {
45 addr1: p.location.street.number + ' ' + p.location.street.name,
46 city: p.location.city,
47 state: p.location.state,
48 country: p.location.country,
49 postcode: p.location.postcode,
50 longitude: p.location.coordinates.longitude,
51 latitude: p.location.coordinates.latitude,
52 tzOffset: p.location.timezone.offset,
53 tzDesc: p.location.timezone.description
54 }
55 }
56 }
57
58(async () => {
59 await main();
60})()