You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
A stand-alone function, meaning one that doesn’t use pipes and streams, that reads events from the specified source RStreams queue and then calls your transform function allowing you to do anything you want to with the data.
When would I use this?
- You’re in an app and want to send a single event to an RStreams queue on a very infrequent basis
- You’ve got a pipe that does something and you want to enhance it, as the side effect of a given stream step function, to send events to another RStreams queue
Runnable Examples
Example 1: Write a Single Object to the Bus
The first example is a naive example that sends data to an RStreams queue one at a time. The code makes a call out to a free
API that returns random people, gets a single person back and then on line 6 uses putEvent
to send that person to the
rstreams-example.people
queue, doing so as a bot with ID of rstreams-example.load-people
.
Note that the transform function is typed for both the callback and async variety but please only use the async version going forward - all new features are only being added to the async approach.
1import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
2import { PersonRaw, PersonRawResults } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7 const person = await getRandomPerson();
8 await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
9}
10
11async function getRandomPerson(): Promise<PersonRaw> {
12 const NUM_EVENTS = 1;
13 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
14 const {data, status} = await axios.get<PersonRawResults>(url);
15
16 if (status !== 200) {
17 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
18 }
19
20 console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);
21
22 return data.results[0];
23}
24
25(async () => {
26 await main();
27})()
Note: Person types referenced in the examples
View results in Botmon
If you go to Botmon, you will see that the rstreams-example.people
queue now has an event in it.
Example 2: Write multiple objects to the bus (slow performance)
This is an example of what not to do. When you want to write many events to an RStreams queue, use the Load Stream pipe step.
So, instead of reading one person from the public API we used in the example above, let’s say we get 100 people at a time from the public API and we want to write them to the bus.
The only difference in this example is that we pass in 100 to the public API, getting back 100 objects as
an array. We then loop through them, making a connection to the RStreams Bus for each and every event.
It’s simple and it works but this is bad. The putEvent
API is really only meant to be called infrequently for one or maybe a
handful of events. To understand why, consider what the RStreams SDK is doing when you call putEvent
.
- It’s opening a connection to AWS Kinesis
- It sending the single event on that connection each time to Kinesis (the Kinesis connection will be closed automatically when no longer needed)
- The event flows through Kinesis until an RStreams Kinesis processor reads the single event and writes it to the RStreams Dynamo DB queue table, putting the event in the correct queue
RStreams is designed to handle the continuos generation of data events that flow into a given queue, is read from that queue and mutated and then sent to other queues. It is today doing this with very large amounts of concurrently received events and has optimizations for sending lots of data. The Load Stream pipe step is a much better way to send large amounts of data to the bus, meaning to an RStreams queue.
Note: Person types referenced in the examples
1import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
2import { PersonRawResults } from "../lib/types";
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7 const people = await getRandomPeople();
8
9 //HINT: this will have very bad performance. This is just to illustrate a point.
10 // Don't use putEvent in a loop this way in practice, instead use sdk.load!
11 for (const person of people.results) {
12 await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
13 }
14}
15
16async function getRandomPeople(): Promise<PersonRawResults> {
17 const NUM_EVENTS = 100;
18 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&` +
19 `exc=login,registered,phone,cell,picture,id&noinfo`;
20 const {data, status} = await axios.get<PersonRawResults>(url);
21
22 if (status !== 200) {
23 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
24 }
25
26 console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);
27
28 return data;
29}
30
31(async () => {
32 await main();
33})()