You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.
This function creates a source stream, fed by events from the array that you return from your CreateSourceFunction, to act as the first step in a pipe.
When would I use this?
- I want to create a pipe and seed the pipe from data from a database at scale
- I want to create a pipe and seed the pipe with data from an API at scale
- I want to create a pipe and seed the pipe with data from [fill-in-the-blank] at scale
Runnable Examples
Example 1
There’s a fair bit going on here so stay with me. We read events from
from a public free API that generates random people, seeding a pipe by creating a source stream
using the createSource
API and then write them to the bus landing in the rstreams-example.people
queue.
- Line 8
We define a type for the state we want the SDK to pass into each time it calls our function
to generate more content to feed the stream. The
createSource
function lets you specify that you want state passed in with each invocation of your function and then lets you initialize that state for the first time your function is called. Then, you can change that state in your function and it will be passed to each subsequent invocation. - Line 10
We are defining the options
we want to pass into the
createSource
function. Here we are telling the SDK to close the source stream and thus shutdown the pipe after ten seconds. - Line 11 We are creating an instance of our state. We only want to call out to the free public API that generates random people for us five times. So, we initialize our state to 5 and then in our actual function, below, we decrement that state. When it gets to zero, we simply return from the function which tells the SDK to close the stream and shut down the pipe.
- Line 14
We are creating a new source steam and specifying that the source stream will be returning arrays
of
PersonRaw
objects and also that we are going to be asking the SDK to pass in a state object of typeSourceState.
Then we pass as the first argument to thecreateSource
function an anonymous function of type CreateSourceFunction that wil be called each time the stream needs more data. There is an optional argument which is state that will be passed in by the SDK on our behalf each time our function is invoked. - Lines 15 and 16 We grab the state into a local variable and then decrement that state number itself so that it will be changed on subsequent invocations to this function.
- Lines 17 and 18 If our counter is at 0, we don’t want to continue and so we return nothing which tells the SDK we’re don. It will close our stream which will cause the pipe to flush and then close down.
- Lines 20 and 21
We’re not done so lets get more data. Line 20 is a call to a function below that just makes a call out
to get 100 random people objects from a public free API. Line 21 just returns the array of
PersonRaw
objects we got from the API. - Line 23
The second argument and the third argument to the
createSource
function: the optional options and optional initial state respectively. - Line 24
We create a write stream sink to write all events that make it to the sink to the
rstreams-example.people
queue doing so as the botrstreams-example.load-people-faster
.
1import { CreateSourceOptions, RStreamsSdk } from "leo-sdk";
2import { PersonRawResults, PersonRaw } from '../lib/types';
3import axios from "axios";
4
5async function main() {
6 const rsdk: RStreamsSdk = new RStreamsSdk();
7
8 interface SourceState {numApiCalls: number;}
9
10 const opts: CreateSourceOptions = {milliseconds: 10000};
11 const state: SourceState = {numApiCalls: 5};
12
13 await rsdk.streams.pipeAsync(
14 rsdk.createSource<PersonRaw, SourceState>(async (state) => {
15 const numApiCalls = state.numApiCalls;
16 state.numApiCalls--;
17 if (numApiCalls === 0) {
18 return;
19 } else {
20 const prr: PersonRawResults = await getRandomPeople();
21 return prr.results;
22 }
23 }, opts, state),
24 rsdk.load('rstreams-example.load-people-faster', 'rstreams-example.people',
25 {records: 25, time: 5000, useS3: true})
26 );
27}
28
29async function getRandomPeople(): Promise<PersonRawResults> {
30 const NUM_EVENTS = 100;
31 const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
32 const {data, status} = await axios.get<PersonRawResults>(url);
33
34 if (status !== 200) {
35 throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
36 }
37
38 return data;
39}
40
41(async () => {
42 await main();
43})()