This primer provides exactly enough knowledge of streaming concepts for a developer to successfully
This primer provides exactly enough knowledge of streaming concepts for a developer to successfully write streaming applications using the RStreams SDK and bus. It is not intended as an exhaustive treatise on the vagaries of Node streams. We all have work to do.
Are you setup to run the examples?
All examples in the SDK documentation assume that when these apps run, the RStreams SDK can discover the configuration it needs. The config it needs is the AWS resource IDs of the RStreams Bus instance deployed in your AWS account. Things like the ID of the kinesis stream used by the bus and so on.
Of course, in a production environment the SDK will get the config in an intelligent and safe manner, say from AWS Secrets Manager. See the RStreams Flow Configuring RStreams doc.
Here’s the typescript type of the config.
Get the config
You will first need to get this config. By default, the RStreams Bus puts a secret in secrets manager that is the JSON config blob. The secret will be named rstreams-<bus name>
. Go get the JSON config from this secret.
Save the config
As a file
Create a file named rstreams.config.json
and put it in the same directory you are running your app in
or in any parent director and the SDK will just find it and use it.
As an environment variable
Create an environment variable named RSTREAMS_CONFIG
whose value is the config JSON blob.
As an argument to the SDK itself
Create a variable in the code that is the config and then pass it into the SDK’s constructor.
1
2const RSTREAMS_BUS_CONFIG: ConfigurationResources = {
3 "Region": "some-value",
4 "LeoStream": "some-value",
5 "LeoCron": "some-value",
6 "LeoSettings": "some-value",
7 "LeoEvent": "some-value",
8 "LeoKinesisStream" : "some-value",
9 "LeoFirehoseStream": "some-value",
10 "LeoS3": "some-value"
11};
12
13const rsdk: RStreamsSdk = new RStreamsSdk(RSTREAMS_BUS_CONFIG);
Principle Operations
Write
You’re going to want to write to the bus, meaning send a data event to a specific queue of the bus. Queues maintain
their order, with the newest at the front of the queue and the oldest data at the back of the queue.
Read
You’re going to want to read from the bus, meaning read events from a queue of the bus. You typically read from
the last place you read from last in a queue. Or, if this is your bot’s first time reading from a queue then
the oldest event in the queue is the default. Or, you can read events in a specific range back in time in the queue.
Transform
You’re going to want to read from the bus, change the data somehow or cause a side effect like writing to some database,
and then write the changed data to a different queue.
Write to the bus
You want to write data to an RStreams qeuue.
TODO: include link to git project so can checkout and run
Write a single object to the bus
Let’s say we want to populate an RStreams queue with people we retrieve from an API that generates random people. The steps to do that are
- Line 6 : Create an instance of the SDK
- Line 7 : Go get a single random person from a public API using the Axios library
- Line 8 : Call the putEvent SDK API to send an event up to the RStreams Bus
- The first argument is the ID of the bot this code is running as
- The second argument is the ID of the RStreams queue to send the event to
- The third argument is the JSON object to send
1import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
2import { PersonRaw, PersonRawResults } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7 const person = await getRandomPerson();
8 await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
9}
10
11async function getRandomPerson(): Promise<PersonRaw> {
12 const NUM_EVENTS = 1;
13 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&` +
14 `exc=login,registered,phone,cell,picture,id&noinfo`;
15 const {data, status} = await axios.get<PersonRawResults>(url);
16
17 if (status !== 200) {
18 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
19 }
20
21 console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);
22
23 return data.results[0];
24}
25
26(async () => {
27 await main();
28})()
1export interface PersonRaw {
2 gender: string;
3 name: {
4 title: string;
5 first: string;
6 last: string;
7 }
8 location: {
9 street: {
10 number: number;
11 name: string;
12 }
13 city: string;
14 state: string;
15 country: string;
16 postcode: number;
17 coordinates: {
18 longitude: string;
19 latitude: string;
20 }
21 timezone: {
22 offset: string;
23 description: string;
24 }
25 }
26 email: string;
27 dob: {
28 date: string;
29 age: number;
30 }
31 nat: string;
32}
33
34export interface PersonRawResults {
35 results: PersonRaw[];
36}
View results in Botmon
If you go to Botmon, you will see that the rstreams-example.people
queue now has an event in it.
Write multiple objects to the bus
So, instead of reading one person from the public API we used in the example above, let’s say we get 100 people at a time from the public API and we want to write them to the bus. Here’s what that looks like.
1import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
2import { PersonRawResults } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7 const people = await getRandomPeople();
8
9 //HINT: this will have very bad performance. This is just to illustrate a point.
10 // Don't use putEvent in a loop this way in practice!
11 for (const person of people.results) {
12 await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
13 }
14}
15
16async function getRandomPeople(): Promise<PersonRawResults> {
17 const NUM_EVENTS = 100;
18 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&` +
19 `exc=login,registered,phone,cell,picture,id&noinfo`;
20 const {data, status} = await axios.get<PersonRawResults>(url);
21
22 if (status !== 200) {
23 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
24 }
25
26 console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);
27
28 return data;
29}
30
31(async () => {
32 await main();
33})()
The only difference in this example is that we pass in 100 to the public API, getting back 100 objects as
an array. We then loop through them, making a connection to the RStreams Bus for each and every event.
It’s simple and it works but this is bad. The putEvent
API is really only meant for one or maybe a
handful of events. To understand why, consider what the RStreams SDK is doing when you call putEvent
.
- It’s opening a connection to AWS Kinesis
- It sending the single event on that connection each time to Kinesis
- The event flows through Kinesis until an RStreams Kinesis processor reads the single event and writes it to the RStreams Dynamo DB queue table, putting the event in the correct queue
RStreams is designed to handle the continuos generation of data events that flow into a given queue, is read from that queue and mutated and then sent to other queues. It is today doing this with very large amounts of concurrently received events. The RStreams SDK has a better way to work with sending larger amounts of data to the bus, meaning to an RStreams queue.
Stream multiple objects to the bus fast
It’s time to tackle the idea of streams. If you aren’t well versed on streams, jump over and read the Streams Primer. It’s short and sweet and may well convert you to streams if you aren’t already.
1import { RStreamsSdk } from "leo-sdk";
2import { PersonRawResults } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7 const es = rsdk.streams.eventstream;
8 const people = await getRandomPeople();
9
10 await rsdk.streams.pipeAsync(
11 es.readArray(people.results),
12 rsdk.load('rstreams-example.load-people', 'rstreams-example.people',
13 {records: 25, time: 5000, useS3: true})
14 );
15}
16
17async function getRandomPeople(): Promise<PersonRawResults> {
18 const NUM_EVENTS = 100;
19 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&` +
20 `exc=login,registered,phone,cell,picture,id&noinfo`;
21 const {data, status} = await axios.get<PersonRawResults>(url);
22
23 if (status !== 200) {
24 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
25 }
26
27 return data;
28}
29
30(async () => {
31 await main();
32})()