TUMBLING_TIME()
A tumbling time batch window holds and processes events that arrive during the time
period as a batch.
Syntax
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, current.event <BOOL>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>, current.event <BOOL>)
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
time | The batch time period in which the window process the events. | INT LONG TIME | No | No | |
start | This specifies an offset in milliseconds in order to start the window at a time different to the standard time. | Timestamp of first event | INT LONG | Yes | No |
current.event | Let the window stream the current events out as and when they arrive to the window while expiring them in batches. | false | BOOL | Yes | No |
Example 1
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS price
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec);
This window collects and processes incoming events as a batch every 20 seconds and then outputs them to a stream.
Example 2
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, sumPrice double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS sumPrice
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec, true);
This window sends the arriving events directly to the output letting the sumPrice
to increase gradually and on every 20 second interval it clears the window as a batch resetting the sumPrice
to zero.
Example 3
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) TUMBLING_TIME(20 sec) output all events;
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query0')
INSERT INTO StockEventWindow
FROM InputEventStream;
@info(name = 'query1')
INSERT all events INTO OutputStream
SELECT symbol, sum(price) AS price
FROM StockEventWindow;
This uses a defined window to process events arrived every 20 seconds as a batch and output all events.
Example 4
This example shows aggregating events over time in a batch (tumbling) manner.
Stream Worker Code
CREATE STREAM TemperatureStream(sensorId string, temperature double);
CREATE SINK STREAM OverallTemperatureStream(avgTemperature double, maxTemperature double, numberOfEvents long);
CREATE SINK STREAM SensorIdTemperatureStream(sensorId string, avgTemperature double, maxTemperature double);
@info(name = 'Overall-analysis')
-- Calculate average, maximum, and count for `temperature` attribute.
INSERT INTO OverallTemperatureStream
SELECT avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature,
count() AS numberOfEvents
-- Aggregate events every `1 minute`, from the arrival of the first event.
FROM TemperatureStream WINDOW TUMBLING_TIME(1 min);
@info(name = 'SensorId-analysis')
INSERT INTO SensorIdTemperatureStream
SELECT sensorId,
-- Calculate average, and maximum for `temperature`, by grouping events by `sensorId`.
avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature
-- Aggregate events every `30 seconds` from epoch timestamp `0`.
FROM TemperatureStream WINDOW TUMBLING_TIME(30 sec, 0)
GROUP BY sensorId
-- Output events only when `avgTemperature` is greater than `20.0`.
WHERE avgTemperature > 20.0;
Batch Time Aggregation Behavior
When events are sent to TemperatureStream
, the following events are emitted at OverallTemperatureStream
via the Overall-analysis
query, and SensorIdTemperatureStream
via the SensorId-analysis
query.
Time | Input to TemperatureStream | Output at OverallTemperatureStream | Output at SensorIdTemperatureStream |
---|---|---|---|
9:00:10 | ['1001' , 21.0 ] | - | - |
9:00:20 | ['1002' , 25.0 ] | - | - |
9:00:30 | - | - | ['1001' , 21.0 , 21.0 ],['1002' , 25.0 , 25.0 ] |
9:00:35 | ['1002' , 26.0 ] | - | - |
9:00:40 | ['1002' , 27.0 ] | - | - |
9:00:55 | ['1001' , 19.0 ] | - | - |
9:00:00 | - | - | ['1002' , 26.5 , 26.0 ] |
9:01:10 | - | [23.6 , 27.0 , 5 ] | - |
9:01:20 | ['1001' , 21.0 ] | - | - |
9:01:30 | - | - | ['1001' , 21.0 , 21.0 ] |
9:02:10 | - | [21.0 , 21.0 , 1 ] | - |