You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.
It takes each event, stringifies it and tacks on a newline character at the end and sends that string, with the newline, on to the next step in the pipe. It is used to create json lines content to either feed to an s3 file or just a file one the local file system.
When would I use this?
- I want to make a JSON lines file from the events flowing through the stream
Runnable Examples
Example 1
This example reads 5 events from the rstreams-example.peopleplus
RStreams queue. The pipe then creates
a throughAsync
stream step that just takes the ReadEvent<Person>
events read from the bus
and turns it into a PersonLight
event and sends it on to the stringify
stream to make a
json line ready to stick in a json lines file.
Finally, it writes the file using the Node standard filesystem fs
module to create a sink that writes events that flow into
the sink to a file. Pretty convenient.
1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
2import { Person } from "../lib/types";
3import fs from "fs";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7
8 const opts: ReadOptions = {
9 start: 'z/2022/04/20',
10 limit: 5
11 }
12
13 await rsdk.streams.pipeAsync(
14 rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15 'rstreams-example.peopleplus', opts),
16 rsdk.streams.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17 return {
18 firstName: p.payload.firstName,
19 lastName: p.payload.lastName,
20 email: p.payload.email
21 }
22 }),
23 rsdk.streams.stringify(),
24 fs.createWriteStream("./output/people.jsonl"),
25 );
26}
27
28interface PersonLight {
29 firstName: string;
30 lastName: string;
31 email: string;
32}
33
34(async () => {
35 try {
36 await main();
37 } catch(err) {
38 console.log(err);
39 }
40})()
1{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
2{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
3{"firstName":"Tomothy","lastName":"Rogers","email":"tomothy.rogers@example.com"}
4{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
5{"firstName":"Tomothy","lastName":"Rogers","email":"tomothy.rogers@example.com"}