Data Pipeline Examples
This page explains ways to create data pipelines.
Stream Joins
This example shows joining two stream based on a condition.
For more information on other join operations, refer to the Stream Worker Query Guide.
Stream Joins Example
CREATE STREAM TemperatureStream (roomNo string, temperature double);
CREATE STREAM HumidityStream (roomNo string, humidity double);
@info(name = 'Equi-join')
-- Join latest `temperature` and `humidity` events arriving within 1 minute for each `roomNo`.
INSERT INTO TemperatureHumidityStream
SELECT t.roomNo, t.temperature, h.humidity
FROM TemperatureStream window unique:time(roomNo, 1 min) AS t
JOIN HumidityStream window unique:time(roomNo, 1 min) AS h
ON t.roomNo == h.roomNo;
@info(name = 'Join-on-temperature')
INSERT INTO EnrichedTemperatureStream
SELECT t.roomNo, t.temperature, h.humidity
-- Join when events arrive in `TemperatureStream`.
FROM TemperatureStream AS t
-- When events get matched in `time()` window, all matched events are emitted, else `null` is emitted.
LEFT OUTER JOIN HumidityStream window sliding_time(1 min) AS h
ON t.roomNo == h.roomNo;
JOIN Behavior
When events are sent to TemperatureStream
stream and HumidityStream
stream, the following events are emitted at TemperatureHumidityStream
via Equi-JOIN
query, and EnrichedTemperatureStream
via JOIN-on-temperature
query:
Time | Input to TemperatureStream | Input to HumidityStream | Output at TemperatureHumidityStream | Output at EnrichedTemperatureStream |
---|---|---|---|---|
9:00:00 | ['1001' , 18.0 ] | - | - | ['1001' , 18.0 , null ] |
9:00:10 | - | ['1002' , 72.0 ] | - | - |
9:00:15 | - | ['1002' , 73.0 ] | - | - |
9:00:30 | ['1002' , 22.0 ] | - | ['1002' , 22.0 , 73.0 ] | ['1002' , 22.0 , 72.0 ], [ '1002' , 22.0 , 73.0 ] |
9:00:50 | - | ['1001' , 60.0 ] | ['1001' , 18.0 , 60.0 ] | - |
9:01:10 | - | ['1001' , 62.0 ] | - | - |
9:01:20 | ['1001' , 17.0 ] | - | ['1001' , 17.0 , 62.0 ] | ['1001' , 17.0 , 60.0 ], [ '1001' , 17.0 , 62.0 ] |
9:02:10 | ['1002' , 23.5 ] | - | - | ['1002' , 23.5 , null ] |
Partition Events by Value
This example shows partitioning events by attribute values. For more informatiON ON partition refer the Stream Query Guide.
Partition Events by Value Example
CREATE STREAM LoginStream ( userID string, loginSuccessful bool);
-- Optional purging configuratiON to remove partitiON instances that haven't received events for `1 hour` by checking every `10 sec`.
@purge(enable='true', interval='10 sec', idle.period='1 hour')
-- Partitions the events based ON `userID`.
partitiON with ( userID of LoginStream )
begin
@info(name='Aggregation-query')
-- Calculates success and failure login attempts FROM the last 3 events of each `userID`.
INSERT INTO #LoginAttempts
SELECT userID, loginSuccessful, count() AS attempts
FROM LoginStream WINDOW SLIDING_LENGTH(3)
GROUP BY loginSuccessful;
-- Inserts results to `#LoginAttempts` inner stream that is only accessible within the partitiON instance.
@info(name='Alert-query')
-- Consumes events FROM the inner stream, and suspends `userID`s that have 3 consecutive login failures.
INSERT INTO UserSuspensionStream
SELECT userID, "Three consecutive login failures!" AS message
FROM #LoginAttempts[loginSuccessful==false and attempts==3];
end;
Partition Behavior
When events are sent to LoginStream
stream, following events will be generated at #LoginAttempts
inner stream via Aggregation-query
query, and UserSuspensionStream
via Alert-query
query:
Input to TemperatureStream | At #LoginAttempts | Output at UserSuspensionStream |
---|---|---|
['1001' , false ] | ['1001' , false , 1 ] | - |
['1002' , true ] | ['1002' , true , 1 ] | - |
['1002' , false ] | ['1002' , false , 1 ] | - |
['1002' , false ] | ['1002' , false , 2 ] | - |
['1001' , false ] | ['1001' , false , 2 ] | - |
['1001' , true ] | ['1001' , true , 1 ] | - |
['1001' , false ] | ['1001' , false , 2 ] | - |
['1002' , false ] | ['1002' , false , 2 ] | ['1002' , '3 consecutive login failures!' ] |
Scatter and Gather (String)
This example shows performing scatter and gather ON string values.
Scatter and Gather (String) Example
CREATE STREAM PurchaseStream (userId string, items string, store string);
@info(name = 'Scatter-query')
-- Scatter value of `items` in to separate events by `,`.
INSERT INTO TokenizedItemStream
SELECT userId, token AS item, store
FROM PurchaseStream#str:tokenize(items, ',', true);
@info(name = 'Transform-query')
-- Concat tokenized `item` with `store`.
INSERT INTO TransformedItemStream
SELECT userId, str:concat(store, "-", item) AS itemKey
FROM TokenizedItemStream;
@info(name = 'Gather-query')
INSERT INTO GroupedPurchaseItemStream
-- Concat all events in a batch separating them by `,`.
SELECT userId, str:groupConcat(itemKey, ",") AS itemKeys
-- Collect events traveling AS a batch via `batch()` window.
FROM TransformedItemStream window batch();
Scatter and Gather (String) Input
The following event containing a JSON string is sent to PurchaseStream
:
['501'
, 'cake,cookie,bun,cookie'
, 'CA'
]
Scatter and Gather (String) Output
After processing, the events arrive at TokenizedItemStream
:
['501'
, 'cake'
, 'CA'
], ['501'
, 'cookie'
, 'CA'
], ['501'
, 'bun'
, 'CA'
]
The events arrive at TransformedItemStream
:
['501'
, 'CA-cake'
], ['501'
, 'CA-cookie'
], ['501'
, 'CA-bun'
]
The event arrive at GroupedPurchaseItemStream
:
['501'
, 'CA-cake,CA-cookie,CA-bun'
]
Scatter and Gather (JSON)
This example shows performing scatter and gather ON JSON values.
Scatter and Gather (JSON) Example
CREATE STREAM PurchaseStream (order string, store string);
@info(name = 'Scatter-query')
-- Scatter elements under `$.order.items` in to separate events.
INSERT INTO TokenizedItemStream
SELECT json:getString(order, '$.order.id') AS orderId,
jsonElement AS item,
store
FROM PurchaseStream#json:tokenize(order, '$.order.items');
@info(name = 'Transform-query')
-- Provide `$5` discount to cakes.
INSERT INTO DiscountedItemStream
SELECT orderId,
ifThenElse(json:getString(item, 'name') == "cake",
json:toString(
json:setElement(item, 'price',
json:getDouble(item, 'price') - 5
)
),
item) AS item,
store
FROM TokenizedItemStream;
@info(name = 'Gather-query')
INSERT INTO GroupedItemStream
-- Combine `item` FROM all events in a batch AS a single JSON array.
SELECT orderId, json:group(item) AS items, store
-- Collect events traveling AS a batch via `batch()` window.
FROM DiscountedItemStream window batch();
@info(name = 'Format-query')
INSERT INTO DiscountedOrderStream
-- Format the final JSON by combining `orderId`, `items`, and `store`.
SELECT str:fillTemplate("""
{"discountedOrder":
{"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
}""", orderId, items, store) AS discountedOrder
FROM GroupedItemStream;
Scatter and Gather (JSON) Input
Below event is sent to PurchaseStream
:
[{
"order":{
"id":"501",
"items":[{"name":"cake", "price":25.0},
{"name":"cookie", "price":15.0},
{"name":"bun", "price":20.0}
]
}
}, 'CA']
Scatter and Gather (JSON) Output
After processing, following events arrive at TokenizedItemStream
:
['501'
, '{"name":"cake","price":25.0}'
, 'CA'
],
['501'
, '{"name":"cookie","price":15.0}'
, 'CA'
],
['501'
, '{"name":"bun","price":20.0}'
, 'CA'
]
The events arrive at DiscountedItemStream
:
['501'
, '{"name":"cake","price":20.0}'
, 'CA'
],
['501'
, '{"name":"cookie","price":15.0}'
, 'CA'
],
['501'
, '{"name":"bun","price":20.0}'
, 'CA'
]
The event arriving at GroupedItemStream
is:
['501'
, '[{"price":20.0,"name":"cake"},{"price":15.0,"name":"cookie"},{"price":20.0,"name":"bun"}]'
, 'CA'
]
The event arriving at DiscountedOrderStream
is:
[
{"discountedOrder":
{
"id":"501",
"store":"CA",
"items":[{"price":20.0,"name":"cake"},
{"price":15.0,"name":"cookie"},
{"price":20.0,"name":"bun"}]
}
}
]