Skip to main content

Consuming Data Example

This stream worker consumes all data from ConsumerSalesTotalsStream and then sends it to PublishSalesTotalsStream. The stream worker includes an optional sink and query, currently commented out, for testing.

@App:description("Description of the plan")

CREATE SOURCE ConsumerSalesTotalsStream WITH (type='stream', stream.list='SalesTotalsEP', map.type='json', stream.type='local') (transNo int, product string, price int, quantity int, salesValue long);

CREATE SOURCE PublishSalesTotalsStream WITH (type='stream', stream.list='SalesTotals', map.type='json', replication.type='local') (transNo int, product string, price int, quantity int, salesValue long);

-- Define stream to consume data (Optional, used for testing)
-- CREATE SINK STREAM ConsumerSales (transNo int, product string, price int, quantity int, salesValue long);

-- Transfers data between sources
INSERT INTO PublishSalesTotalsStream
FROM ConsumerSalesTotalsStream;

-- Sends data to stream (Optional, used for testing)
-- INSERT INTO ConsumerSales
-- FROM PublishSalesTotalsStream;