History

Stringify

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

API Doc

This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.

It takes each event, stringifies it and tacks on a newline character at the end and sends that string, with the newline, on to the next step in the pipe. It is used to create json lines content to either feed to an s3 file or just a file one the local file system.

When would I use this?

  • I want to make a JSON lines file from the events flowing through the stream

Runnable Examples

Example 1

This example reads 5 events from the rstreams-example.peopleplus RStreams queue. The pipe then creates a throughAsync stream step that just takes the ReadEvent<Person> events read from the bus and turns it into a PersonLight event and sends it on to the stringify stream to make a json line ready to stick in a json lines file.

Finally, it writes the file using the Node standard filesystem fsmodule to create a sink that writes events that flow into the sink to a file. Pretty convenient.

Example 1 code
 1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import fs from "fs";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7
 8  const opts: ReadOptions = {
 9    start: 'z/2022/04/20',
10    limit: 5
11  }
12
13  await rsdk.streams.pipeAsync(
14    rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15                      'rstreams-example.peopleplus', opts),
16    rsdk.streams.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17      return {
18        firstName: p.payload.firstName,
19        lastName: p.payload.lastName,
20        email: p.payload.email
21      }
22    }),
23    rsdk.streams.stringify(),
24    fs.createWriteStream("./output/people.jsonl"),
25  );
26}
27
28interface PersonLight {
29    firstName: string;
30    lastName: string;
31    email: string;
32}
33
34(async () => {
35  try {
36    await main();
37  } catch(err) {
38    console.log(err);
39  }
40})()
Generated people.jsonl file
1{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
2{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
3{"firstName":"Tomothy","lastName":"Rogers","email":"tomothy.rogers@example.com"}
4{"firstName":"Herman","lastName":"Morris","email":"herman.morris@example.com"}
5{"firstName":"Tomothy","lastName":"Rogers","email":"tomothy.rogers@example.com"}

Note: Person types referenced in the examples