You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
This function creates a source stream, fed by events from the specified RStreams queue, to act as the first step in a pipe. Just specify the RStreams queue and config to read efficiently and you’re done.
When would I use this?
- I want to use a pipe to have a little more control over processing
- The data I want to process comes from an RStreams queue
Runnable Examples
Example 1
This example reads 100 events from the rstreams-example.peopleplus
RStreams queue and then shuts down the pipe.
The read
stream sends the events to the devnull stream.
illustrates code running as a bot with ID of rstreams-example.people-to-peopleplus
and getting exactly two events
from queue rstreams-example.people
, starting at position z/2022/04/20
, and then transforms each
event’s JSON by dropping unwanted attributes and simplifying the JSON structure. It also calls a totally free, public API that given
a country name returns the standard two-char country code which we tack on to the event after
which we return the modified event which tells the SDK to push it to the
rstreams-example.people-to-peopleplus
queue.
Two things to note here. First is that the transform function is typed for both the callback and async variety but please only use the async version going forward - all new features are only being added to the async approach.
Second, there are actually three arguments to the transform
function, even though in our example we
are only using the first. What is stored in an RStreams queue is an instance of a
ReadEvent where the payload
attribute is the data the queue exists for.
The first argument is just the payload pulled out since usually that’s all you need. The second argument
is the full event from the queue with the event ID and other sometimes useful things. The third argument
is only used in the callback version where you call done
exactly once to trigger the callback. It’s there
for backwared compat. Don’t use it on new things.
The devnull
at the end just acts as a sink and passing in true tells it to log. That’s all it’s for, to act as a sink.
See the doc on Devnull for more details.
1import { ReadOptions, RStreamsSdk } from "leo-sdk";
2import { Person } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7
8 const opts: ReadOptions = {
9 start: 'z/2022/04/20',
10 limit: 5
11 }
12
13 await rsdk.streams.pipeAsync(
14 rsdk.read<Person>('rstreams-example.peopleplus-to-devnull',
15 'rstreams-example.peopleplus', opts),
16 rsdk.streams.devnull(true)
17 );
18}
19
20(async () => {
21 try {
22 await main();
23 } catch(err) {
24 console.log(err);
25 }
26})()
1➜ rstreams-runnable-examples ts-node apps/read-events-simple.ts
2Reading event from z/2022/04/20
3devnull {
4 "id": "rstreams-example.people-to-peopleplus",
5 "event": "rstreams-example.peopleplus",
6 "payload": {
7 "gender": "male",
8 "firstName": "Herman",
9 "lastName": "Morris",
10 "email": "herman.morris@example.com",
11 "birthDate": "1959-04-25T19:28:13.361Z",
12 "nationality": "IE",
13 "addr": {
14 "addr1": "9393 Mill Lane",
15 "city": "Killarney",
16 "state": "Galway City",
17 "country": "Ireland",
18 "postcode": 34192,
19 "longitude": "-48.3422",
20 "latitude": "23.2617",
21 "tzOffset": "-12:00",
22 "tzDesc": "Eniwetok, Kwajalein",
23 "countryCode": "IE"
24 }
25 },
26 "event_source_timestamp": 1650415833983,
27 "eid": "z/2022/04/21/20/37/1650573479245-0000000",
28 "correlation_id": {
29 "source": "rstreams-example.people",
30 "start": "z/2022/04/20/00/50/1650415834886-0000000",
31 "units": 1
32 },
33 "timestamp": 1650573479299
34}
35devnull {
36 "id": "rstreams-example.people-to-peopleplus",
37 "event": "rstreams-example.peopleplus",
38 "payload": {
39 "gender": "male",
40 "firstName": "Herman",
41 "lastName": "Morris",
42 "email": "herman.morris@example.com",
43 "birthDate": "1959-04-25T19:28:13.361Z",
44 "nationality": "IE",
45 "addr": {
46 "addr1": "9393 Mill Lane",
47 "city": "Killarney",
48 "state": "Galway City",
49 "country": "Ireland",
50 "postcode": 34192,
51 "longitude": "-48.3422",
52 "latitude": "23.2617",
53 "tzOffset": "-12:00",
54 "tzDesc": "Eniwetok, Kwajalein",
55 "countryCode": "IE"
56 }
57 },
58 "event_source_timestamp": 1650415833983,
59 "eid": "z/2022/04/22/16/39/1650645572667-0000134",
60 "correlation_id": {
61 "source": "rstreams-example.people",
62 "start": "z/2022/04/20/00/50/1650415834886-0000000",
63 "units": 1
64 },
65 "timestamp": 1650645572513
66}
67devnull {
68 "id": "rstreams-example.people-to-peopleplus",
69 "event": "rstreams-example.peopleplus",
70 "payload": {
71 "gender": "male",
72 "firstName": "Tomothy",
73 "lastName": "Rogers",
74 "email": "tomothy.rogers@example.com",
75 "birthDate": "1967-01-22T18:32:59.793Z",
76 "nationality": "AU",
77 "addr": {
78 "addr1": "6582 Adams St",
79 "city": "Kalgoorlie",
80 "state": "Australian Capital Territory",
81 "country": "Australia",
82 "postcode": 8157,
83 "longitude": "33.3086",
84 "latitude": "49.2180",
85 "tzOffset": "+5:30",
86 "tzDesc": "Bombay, Calcutta, Madras, New Delhi",
87 "countryCode": "AU"
88 }
89 },
90 "event_source_timestamp": 1650415833985,
91 "eid": "z/2022/04/22/16/39/1650645572667-0000135",
92 "correlation_id": {
93 "source": "rstreams-example.people",
94 "start": "z/2022/04/20/00/50/1650415834886-0000001",
95 "units": 1
96 },
97 "timestamp": 1650645572690
98}
99devnull {
100 "id": "rstreams-example.people-to-peopleplus",
101 "event": "rstreams-example.peopleplus",
102 "payload": {
103 "gender": "male",
104 "firstName": "Herman",
105 "lastName": "Morris",
106 "email": "herman.morris@example.com",
107 "birthDate": "1959-04-25T19:28:13.361Z",
108 "nationality": "IE",
109 "addr": {
110 "addr1": "9393 Mill Lane",
111 "city": "Killarney",
112 "state": "Galway City",
113 "country": "Ireland",
114 "postcode": 34192,
115 "longitude": "-48.3422",
116 "latitude": "23.2617",
117 "tzOffset": "-12:00",
118 "tzDesc": "Eniwetok, Kwajalein",
119 "countryCode": "IE"
120 }
121 },
122 "event_source_timestamp": 1650415833983,
123 "eid": "z/2022/04/22/16/39/1650645583644-0000111",
124 "correlation_id": {
125 "source": "rstreams-example.people",
126 "start": "z/2022/04/20/00/50/1650415834886-0000009",
127 "units": 10
128 },
129 "timestamp": 1650645583447
130}
131devnull {
132 "id": "rstreams-example.people-to-peopleplus",
133 "event": "rstreams-example.peopleplus",
134 "payload": {
135 "gender": "male",
136 "firstName": "Tomothy",
137 "lastName": "Rogers",
138 "email": "tomothy.rogers@example.com",
139 "birthDate": "1967-01-22T18:32:59.793Z",
140 "nationality": "AU",
141 "addr": {
142 "addr1": "6582 Adams St",
143 "city": "Kalgoorlie",
144 "state": "Australian Capital Territory",
145 "country": "Australia",
146 "postcode": 8157,
147 "longitude": "33.3086",
148 "latitude": "49.2180",
149 "tzOffset": "+5:30",
150 "tzDesc": "Bombay, Calcutta, Madras, New Delhi",
151 "countryCode": "AU"
152 }
153 },
154 "event_source_timestamp": 1650415833983,
155 "eid": "z/2022/04/22/16/39/1650645583644-0000112",
156 "correlation_id": {
157 "source": "rstreams-example.people",
158 "start": "z/2022/04/20/00/50/1650415834886-0000009",
159 "units": 10
160 },
161 "timestamp": 1650645583448
162}