Getting Started with Stream Workers
This page guides you through creating your first stream worker. Once published, the stream worker will listen for new data to be written to a collection, and subsequently write that data to a stream. There's no need to create the dependencies beforehand, in this case the collection and the stream, as the stream worker will create them for you at run time.
After you create the stream worker, there are instructions for testing it.
Create a Stream Worker
Refer to the Stream Workers section for an explanation of how stream workers function.
Click Stream Workers.
In the Editor tab, define the stream worker. Copy and paste the following code block in the code editor on the Editor tab.
@App:name('sample-cargo-app')
@App:description('Basic stream worker to demonstrate reading data from a collection and sending it to a stream. The stream and collection are created automatically if they do not already exist.')
@App:qlVersion('2')
/**
Testing the stream worker:
1. Open stream `SampleCargoAppDestStream` in console. The output can be monitored here.
2. Navigate to Query Workers and run the following query:
LET data = [
{"weight":1},
{"weight":2},
{"weight":3},
{"weight":4},
{"weight":5}
]
FOR d IN data
INSERT d INTO SampleCargoAppInputTable
3. Following messages would be shown on the `SampleCargoAppDestStream` stream console.
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
*/
-- Defines `SampleCargoAppInputTable` Source.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc" , replication.type="global", map.type='json') (weight int);
-- Define `SampleCargoAppDestStream` Stream.
CREATE SINK STREAM SampleCargoAppDestStream (weight int);
-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;Click Validate. Macrometa checks to see that your code is valid.
Click Save.
Select edge locations, and then click Save. The locations that you select represent where the data for this stream worker will live.
Click Publish to publish your stream worker. When you do this, it will begin to run the code as defined and continue until you unpublish it.
Test the Stream Worker
Use the following procedure to verify that the stream worker functions as expected.
1. Open a Stream Window
The Macrometa Streams console does not persist messages, so to see them, you must have the console open before you send.
- In a new tab or window, open the Macrometa console.
- Click Streams.
- Click c8locals.SampleCargoAppDestStream.
2. Add Data to Collection and View Stream Updates
In the non-stream tab or window, navigate to Query Workers.
Copy and paste this sample into the editor:
LET data = [
{"weight":1},
{"weight":2},
{"weight":3},
{"weight":4},
{"weight":5}
]
FOR d IN data
INSERT d INTO SampleCargoAppInputTableClick Run Query to add the five weight records to the collection.
Return to the stream tab or window. The results will be similar to the following:
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
The stream worker has pulled the weight information and sent it to the stream.
Next Steps
For an additional challenge, you could extend this stream worker to transform stream data for use cases such as finding an average over a number of events, writing it to a new collection, or triggering a different downstream event or action.