You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.
It takes each event, and turns it into a CSV line ready to be written to a CSV file.
When would I use this?
- I want to generate a CSV file from the events flowing through the stream
Runnable Examples
Example 1
This example reads 5 events from the rstreams-example.peopleplus
RStreams queue. The pipe then creates
a throughAsync
stream step that just takes the ReadEvent<Person>
events read from the bus
and turns it into a PersonLight
event and sends it on to the toCSV
stream to make a
CSV line ready to stick in a CSV file.
Finally, it writes the file using the Node standard filesystem fs
module to create a sink that writes events that flow into
the sink to a file. Pretty convenient.
The toCSV
function’s first argument, if true, writes a CSV header as the first row. If the toCSV
function’s first
argument is an array of strings, it uses that as the CSV header first row. The second arg is options that
come from the underlying fast-csv NPM module
that generates the CSV file: fast-csv options.
1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
2import { Person } from "../lib/types";
3import fs from "fs";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7
8 const opts: ReadOptions = {
9 start: 'z/2022/04/20',
10 limit: 5
11 }
12
13 await rsdk.streams.pipeAsync(
14 rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15 'rstreams-example.peopleplus', opts),
16 rsdk.streams.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17 return {
18 firstName: p.payload.firstName,
19 lastName: p.payload.lastName,
20 email: p.payload.email
21 }
22 }),
23 rsdk.streams.toCSV(true, {quote: '"'}),
24 fs.createWriteStream("./output/people.csv"),
25 );
26}
27
28interface PersonLight {
29 firstName: string;
30 lastName: string;
31 email: string;
32}
33
34(async () => {
35 try {
36 await main();
37 } catch(err) {
38 console.log(err);
39 }
40})()