History

Read

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

API Doc

This function creates a source stream, fed by events from the specified RStreams queue, to act as the first step in a pipe. Just specify the RStreams queue and config to read efficiently and you’re done.

When would I use this?

  • I want to use a pipe to have a little more control over processing
  • The data I want to process comes from an RStreams queue

Runnable Examples

Example 1

This example reads 100 events from the rstreams-example.peopleplus RStreams queue and then shuts down the pipe. The read stream sends the events to the devnull stream. illustrates code running as a bot with ID of rstreams-example.people-to-peopleplus and getting exactly two events from queue rstreams-example.people, starting at position z/2022/04/20, and then transforms each event’s JSON by dropping unwanted attributes and simplifying the JSON structure. It also calls a totally free, public API that given a country name returns the standard two-char country code which we tack on to the event after which we return the modified event which tells the SDK to push it to the rstreams-example.people-to-peopleplus queue.

Two things to note here. First is that the transform function is typed for both the callback and async variety but please only use the async version going forward - all new features are only being added to the async approach.

Second, there are actually three arguments to the transform function, even though in our example we are only using the first. What is stored in an RStreams queue is an instance of a ReadEvent where the payload attribute is the data the queue exists for. The first argument is just the payload pulled out since usually that’s all you need. The second argument is the full event from the queue with the event ID and other sometimes useful things. The third argument is only used in the callback version where you call done exactly once to trigger the callback. It’s there for backwared compat. Don’t use it on new things.

The devnull at the end just acts as a sink and passing in true tells it to log. That’s all it’s for, to act as a sink. See the doc on Devnull for more details.

Example 1 code
 1import { ReadOptions, RStreamsSdk } from "leo-sdk";
 2import { Person } from "../lib/types";
 3import axios from "axios";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7
 8  const opts: ReadOptions = {
 9    start: 'z/2022/04/20',
10    limit: 5
11  }
12
13  await rsdk.streams.pipeAsync(
14    rsdk.read<Person>('rstreams-example.peopleplus-to-devnull',
15                      'rstreams-example.peopleplus', opts),
16    rsdk.streams.devnull(true)
17  );
18}
19
20(async () => {
21  try {
22    await main();
23  } catch(err) {
24    console.log(err);
25  }
26})()
Example 1 console output
  1➜  rstreams-runnable-examples ts-node apps/read-events-simple.ts
  2Reading event from z/2022/04/20 
  3devnull {
  4  "id": "rstreams-example.people-to-peopleplus",
  5  "event": "rstreams-example.peopleplus",
  6  "payload": {
  7    "gender": "male",
  8    "firstName": "Herman",
  9    "lastName": "Morris",
 10    "email": "herman.morris@example.com",
 11    "birthDate": "1959-04-25T19:28:13.361Z",
 12    "nationality": "IE",
 13    "addr": {
 14      "addr1": "9393 Mill Lane",
 15      "city": "Killarney",
 16      "state": "Galway City",
 17      "country": "Ireland",
 18      "postcode": 34192,
 19      "longitude": "-48.3422",
 20      "latitude": "23.2617",
 21      "tzOffset": "-12:00",
 22      "tzDesc": "Eniwetok, Kwajalein",
 23      "countryCode": "IE"
 24    }
 25  },
 26  "event_source_timestamp": 1650415833983,
 27  "eid": "z/2022/04/21/20/37/1650573479245-0000000",
 28  "correlation_id": {
 29    "source": "rstreams-example.people",
 30    "start": "z/2022/04/20/00/50/1650415834886-0000000",
 31    "units": 1
 32  },
 33  "timestamp": 1650573479299
 34}
 35devnull {
 36  "id": "rstreams-example.people-to-peopleplus",
 37  "event": "rstreams-example.peopleplus",
 38  "payload": {
 39    "gender": "male",
 40    "firstName": "Herman",
 41    "lastName": "Morris",
 42    "email": "herman.morris@example.com",
 43    "birthDate": "1959-04-25T19:28:13.361Z",
 44    "nationality": "IE",
 45    "addr": {
 46      "addr1": "9393 Mill Lane",
 47      "city": "Killarney",
 48      "state": "Galway City",
 49      "country": "Ireland",
 50      "postcode": 34192,
 51      "longitude": "-48.3422",
 52      "latitude": "23.2617",
 53      "tzOffset": "-12:00",
 54      "tzDesc": "Eniwetok, Kwajalein",
 55      "countryCode": "IE"
 56    }
 57  },
 58  "event_source_timestamp": 1650415833983,
 59  "eid": "z/2022/04/22/16/39/1650645572667-0000134",
 60  "correlation_id": {
 61    "source": "rstreams-example.people",
 62    "start": "z/2022/04/20/00/50/1650415834886-0000000",
 63    "units": 1
 64  },
 65  "timestamp": 1650645572513
 66}
 67devnull {
 68  "id": "rstreams-example.people-to-peopleplus",
 69  "event": "rstreams-example.peopleplus",
 70  "payload": {
 71    "gender": "male",
 72    "firstName": "Tomothy",
 73    "lastName": "Rogers",
 74    "email": "tomothy.rogers@example.com",
 75    "birthDate": "1967-01-22T18:32:59.793Z",
 76    "nationality": "AU",
 77    "addr": {
 78      "addr1": "6582 Adams St",
 79      "city": "Kalgoorlie",
 80      "state": "Australian Capital Territory",
 81      "country": "Australia",
 82      "postcode": 8157,
 83      "longitude": "33.3086",
 84      "latitude": "49.2180",
 85      "tzOffset": "+5:30",
 86      "tzDesc": "Bombay, Calcutta, Madras, New Delhi",
 87      "countryCode": "AU"
 88    }
 89  },
 90  "event_source_timestamp": 1650415833985,
 91  "eid": "z/2022/04/22/16/39/1650645572667-0000135",
 92  "correlation_id": {
 93    "source": "rstreams-example.people",
 94    "start": "z/2022/04/20/00/50/1650415834886-0000001",
 95    "units": 1
 96  },
 97  "timestamp": 1650645572690
 98}
 99devnull {
100  "id": "rstreams-example.people-to-peopleplus",
101  "event": "rstreams-example.peopleplus",
102  "payload": {
103    "gender": "male",
104    "firstName": "Herman",
105    "lastName": "Morris",
106    "email": "herman.morris@example.com",
107    "birthDate": "1959-04-25T19:28:13.361Z",
108    "nationality": "IE",
109    "addr": {
110      "addr1": "9393 Mill Lane",
111      "city": "Killarney",
112      "state": "Galway City",
113      "country": "Ireland",
114      "postcode": 34192,
115      "longitude": "-48.3422",
116      "latitude": "23.2617",
117      "tzOffset": "-12:00",
118      "tzDesc": "Eniwetok, Kwajalein",
119      "countryCode": "IE"
120    }
121  },
122  "event_source_timestamp": 1650415833983,
123  "eid": "z/2022/04/22/16/39/1650645583644-0000111",
124  "correlation_id": {
125    "source": "rstreams-example.people",
126    "start": "z/2022/04/20/00/50/1650415834886-0000009",
127    "units": 10
128  },
129  "timestamp": 1650645583447
130}
131devnull {
132  "id": "rstreams-example.people-to-peopleplus",
133  "event": "rstreams-example.peopleplus",
134  "payload": {
135    "gender": "male",
136    "firstName": "Tomothy",
137    "lastName": "Rogers",
138    "email": "tomothy.rogers@example.com",
139    "birthDate": "1967-01-22T18:32:59.793Z",
140    "nationality": "AU",
141    "addr": {
142      "addr1": "6582 Adams St",
143      "city": "Kalgoorlie",
144      "state": "Australian Capital Territory",
145      "country": "Australia",
146      "postcode": 8157,
147      "longitude": "33.3086",
148      "latitude": "49.2180",
149      "tzOffset": "+5:30",
150      "tzDesc": "Bombay, Calcutta, Madras, New Delhi",
151      "countryCode": "AU"
152    }
153  },
154  "event_source_timestamp": 1650415833983,
155  "eid": "z/2022/04/22/16/39/1650645583644-0000112",
156  "correlation_id": {
157    "source": "rstreams-example.people",
158    "start": "z/2022/04/20/00/50/1650415834886-0000009",
159    "units": 10
160  },
161  "timestamp": 1650645583448
162}

Note: Person types referenced in the examples