History

ToCSV

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

API Doc

This function creates a transform stream, meaning a stream that exists to receive events after the source stream, do something with them and then send them on to the next pipe step, which must exist.

It takes each event, and turns it into a CSV line ready to be written to a CSV file.

When would I use this?

  • I want to generate a CSV file from the events flowing through the stream

Runnable Examples

Example 1

This example reads 5 events from the rstreams-example.peopleplus RStreams queue. The pipe then creates a throughAsync stream step that just takes the ReadEvent<Person> events read from the bus and turns it into a PersonLight event and sends it on to the toCSV stream to make a CSV line ready to stick in a CSV file.

Finally, it writes the file using the Node standard filesystem fsmodule to create a sink that writes events that flow into the sink to a file. Pretty convenient.

The toCSV function’s first argument, if true, writes a CSV header as the first row. If the toCSV function’s first argument is an array of strings, it uses that as the CSV header first row. The second arg is options that come from the underlying fast-csv NPM module that generates the CSV file: fast-csv options.

Example 1 code
 1import { ReadEvent, ReadOptions, RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import fs from "fs";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7
 8  const opts: ReadOptions = {
 9    start: 'z/2022/04/20',
10    limit: 5
11  }
12
13  await rsdk.streams.pipeAsync(
14    rsdk.read<Person>('rstreams-example.peopleplus-to-jsonlines',
15                      'rstreams-example.peopleplus', opts),
16    rsdk.streams.throughAsync<ReadEvent<Person>, PersonLight>(async (p: ReadEvent<Person>) => {
17      return {
18        firstName: p.payload.firstName,
19        lastName: p.payload.lastName,
20        email: p.payload.email
21      }
22    }),
23    rsdk.streams.toCSV(true, {quote: '"'}),
24    fs.createWriteStream("./output/people.csv"),
25  );
26}
27
28interface PersonLight {
29    firstName: string;
30    lastName: string;
31    email: string;
32}
33
34(async () => {
35  try {
36    await main();
37  } catch(err) {
38    console.log(err);
39  }
40})()
Generated people.jsonl file
1firstName,lastName,email
2Herman,Morris,herman.morris@example.com
3Herman,Morris,herman.morris@example.com
4Tomothy,Rogers,tomothy.rogers@example.com
5Herman,Morris,herman.morris@example.com
6Tomothy,Rogers,tomothy.rogers@example.com

Note: Person types referenced in the examples