History

Create Source

You need to understand what a pipe and stream step in that pipe is AND nothing comes for free. The cost of working with large amounts of data in near real-time environments with RStreams is you have to think about what you are doing and what it means with respect to reading and writing. It is strongly recommended you read the Read/Write at Scale article at some point.

API Doc

This function creates a source stream, fed by events from the array that you return from your CreateSourceFunction, to act as the first step in a pipe.

When would I use this?

  • I want to create a pipe and seed the pipe from data from a database at scale
  • I want to create a pipe and seed the pipe with data from an API at scale
  • I want to create a pipe and seed the pipe with data from [fill-in-the-blank] at scale

Runnable Examples

Example 1

There’s a fair bit going on here so stay with me. We read events from from a public free API that generates random people, seeding a pipe by creating a source stream using the createSource API and then write them to the bus landing in the rstreams-example.people queue.

  • Line 8 We define a type for the state we want the SDK to pass into each time it calls our function to generate more content to feed the stream. The createSource function lets you specify that you want state passed in with each invocation of your function and then lets you initialize that state for the first time your function is called. Then, you can change that state in your function and it will be passed to each subsequent invocation.
  • Line 10 We are defining the options we want to pass into the createSource function. Here we are telling the SDK to close the source stream and thus shutdown the pipe after ten seconds.
  • Line 11 We are creating an instance of our state. We only want to call out to the free public API that generates random people for us five times. So, we initialize our state to 5 and then in our actual function, below, we decrement that state. When it gets to zero, we simply return from the function which tells the SDK to close the stream and shut down the pipe.
  • Line 14 We are creating a new source steam and specifying that the source stream will be returning arrays of PersonRaw objects and also that we are going to be asking the SDK to pass in a state object of type SourceState. Then we pass as the first argument to the createSource function an anonymous function of type CreateSourceFunction that wil be called each time the stream needs more data. There is an optional argument which is state that will be passed in by the SDK on our behalf each time our function is invoked.
  • Lines 15 and 16 We grab the state into a local variable and then decrement that state number itself so that it will be changed on subsequent invocations to this function.
  • Lines 17 and 18 If our counter is at 0, we don’t want to continue and so we return nothing which tells the SDK we’re don. It will close our stream which will cause the pipe to flush and then close down.
  • Lines 20 and 21 We’re not done so lets get more data. Line 20 is a call to a function below that just makes a call out to get 100 random people objects from a public free API. Line 21 just returns the array of PersonRaw objects we got from the API.
  • Line 23 The second argument and the third argument to the createSource function: the optional options and optional initial state respectively.
  • Line 24 We create a write stream sink to write all events that make it to the sink to the rstreams-example.people queue doing so as the bot rstreams-example.load-people-faster.
Example 1 code
 1import { CreateSourceOptions, RStreamsSdk } from "leo-sdk";
 2import { PersonRawResults, PersonRaw } from '../lib/types';
 3import axios from "axios";
 4
 5async function main() {
 6  const rsdk: RStreamsSdk  = new RStreamsSdk();
 7
 8  interface SourceState {numApiCalls: number;}
 9
10  const opts: CreateSourceOptions = {milliseconds: 10000};
11  const state: SourceState = {numApiCalls: 5};
12
13  await rsdk.streams.pipeAsync(
14    rsdk.createSource<PersonRaw, SourceState>(async (state) => {
15        const numApiCalls = state.numApiCalls;
16        state.numApiCalls--;
17        if (numApiCalls === 0) {
18            return;
19        } else {
20            const prr: PersonRawResults = await getRandomPeople();
21            return prr.results;
22        }
23    }, opts, state),
24    rsdk.load('rstreams-example.load-people-faster', 'rstreams-example.people', 
25              {records: 25, time: 5000, useS3: true})
26  );
27}
28
29async function getRandomPeople(): Promise<PersonRawResults> {
30  const NUM_EVENTS = 100;
31  const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
32  const {data, status} = await axios.get<PersonRawResults>(url);
33  
34  if (status !== 200) {
35    throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
36  }
37
38  return data;
39}
40
41(async () => {
42  await main();
43})()

Note: Person types referenced in the examples