TUMBLING_LENGTH()
A batch (tumbling) length window that holds and process a number of events as specified in the length
.
Syntax
WINDOW TUMBLING_LENGTH(length <INT>)
WINDOW TUMBLING_LENGTH(length <INT>, current.event <BOOL>)
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
length | The number of events the window should tumble. | INT | No | No | |
current.event | Let the window stream the current events out as and when they arrive to the window while expiring them in batches. | false | BOOL | Yes | No |
Example 1
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS price
FROM InputEventStream WINDOW TUMBLING_LENGTH(10);
This collect and process 10 events as a batch and output them.
Example 2
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, sumPrice double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS sumPrice
FROM InputEventStream WINDOW TUMBLING_LENGTH(10, true);
This window sends the arriving events directly to the output letting the sumPrice
to increase gradually. After every 10 events, it clears the window as a batch and resets the sumPrice
to zero.
Example 3
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) TUMBLING_LENGTH(10) OUTPUT all events;
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query0')
INSERT INTO StockEventWindow
FROM InputEventStream;
@info(name = 'query1')
INSERT all events INTO OutputStream
SELECT symbol, sum(price) AS price
FROM StockEventWindow;
This uses a defined window to process 10 events as a batch and output all events.
Example 4
This example shows aggregating events based on event count in a batch (tumbling) manner.
Stream Worker Code
CREATE STREAM TemperatureStream(sensorId string, temperature double);
CREATE SINK STREAM OverallTemperatureStream(avgTemperature double, maxTemperature double, numberOfEvents long);
CREATE SINK STREAM SensorIdTemperatureStream(sensorId string, avgTemperature double, maxTemperature double);
@info(name = 'Overall-analysis')
INSERT INTO OverallTemperatureStream
-- Calculate average, maximum, and count for `temperature` attribute.
SELECT avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature,
count() AS numberOfEvents
-- Aggregate every `4` events in a batch manner.
FROM TemperatureStream WINDOW TUMBLING_LENGTH(4);
@info(name = 'SensorId-analysis')
INSERT INTO SensorIdTemperatureStream
SELECT sensorId,
-- Calculate average, and maximum for `temperature`, by grouping events by `sensorId`.
avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature
-- Aggregate every `5` events in a batch manner.
FROM TemperatureStream WINDOW TUMBLING_LENGTH(5)
-- Output events only when `avgTemperature` is greater than or equal to `20.0`.
WHERE avgTemperature >= 20.0
GROUP BY sensorId;
Batch Event Count Aggregation Behavior
When events are sent to TemperatureStream
, the following events are emitted at OverallTemperatureStream
via the Overall-analysis
query, and SensorIdTemperatureStream
via the SensorId-analysis
query.
Input to TemperatureStream | Output at OverallTemperatureStream | Output at SensorIdTemperatureStream |
---|---|---|
['1001' , 19.0 ] | - | - |
['1002' , 26.0 ] | - | - |
['1002' , 24.0 ] | - | - |
['1001' , 20.0 ] | [22.5 , 26.0 , 4 ] | - |
['1001' , 21.0 ] | - | ['1002' , 25.5 , 24.0 ], [ '1001' , 20.0 , 19.0 ] |
['1002' , 22.0 ] | - | - |
['1001' , 21.0 ] | - | - |
['1002' , 22.0 ] | [21.5 , 22.0 , 4 ] | - |