Skip to main content

TUMBLING_TIME()

A tumbling time batch window holds and processes events that arrive during the time period as a batch.

Syntax

WINDOW TUMBLING_TIME(time <INT|LONG|TIME>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, current.event <BOOL>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>, current.event <BOOL>)

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
timeThe batch time period in which the window process the events.INT LONG TIMENoNo
startThis specifies an offset in milliseconds in order to start the window at a time different to the standard time.Timestamp of first eventINT LONGYesNo
current.eventLet the window stream the current events out as and when they arrive to the window while expiring them in batches.falseBOOLYesNo

Example 1

CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, price double);

@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS price
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec);

This window collects and processes incoming events as a batch every 20 seconds and then outputs them to a stream.

Example 2

CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, sumPrice double);

@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS sumPrice
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec, true);

This window sends the arriving events directly to the output letting the sumPrice to increase gradually and on every 20 second interval it clears the window as a batch resetting the sumPrice to zero.

Example 3

CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) TUMBLING_TIME(20 sec) output all events;
CREATE SINK STREAM OutputStream (symbol string, price double);


@info(name = 'query0')
INSERT INTO StockEventWindow
FROM InputEventStream;

@info(name = 'query1')
INSERT all events INTO OutputStream
SELECT symbol, sum(price) AS price
FROM StockEventWindow;

This uses a defined window to process events arrived every 20 seconds as a batch and output all events.

Example 4

This example shows aggregating events over time in a batch (tumbling) manner.

Stream Worker Code

CREATE STREAM TemperatureStream(sensorId string, temperature double);

CREATE SINK STREAM OverallTemperatureStream(avgTemperature double, maxTemperature double, numberOfEvents long);
CREATE SINK STREAM SensorIdTemperatureStream(sensorId string, avgTemperature double, maxTemperature double);

@info(name = 'Overall-analysis')
-- Calculate average, maximum, and count for `temperature` attribute.
INSERT INTO OverallTemperatureStream
SELECT avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature,
count() AS numberOfEvents
-- Aggregate events every `1 minute`, from the arrival of the first event.
FROM TemperatureStream WINDOW TUMBLING_TIME(1 min);


@info(name = 'SensorId-analysis')
INSERT INTO SensorIdTemperatureStream
SELECT sensorId,
-- Calculate average, and maximum for `temperature`, by grouping events by `sensorId`.
avg(temperature) AS avgTemperature,
max(temperature) AS maxTemperature
-- Aggregate events every `30 seconds` from epoch timestamp `0`.
FROM TemperatureStream WINDOW TUMBLING_TIME(30 sec, 0)
GROUP BY sensorId
-- Output events only when `avgTemperature` is greater than `20.0`.
WHERE avgTemperature > 20.0;

Batch Time Aggregation Behavior

When events are sent to TemperatureStream, the following events are emitted at OverallTemperatureStream via the Overall-analysis query, and SensorIdTemperatureStream via the SensorId-analysis query.

TimeInput to TemperatureStreamOutput at OverallTemperatureStreamOutput at SensorIdTemperatureStream
9:00:10['1001', 21.0]--
9:00:20['1002', 25.0]--
9:00:30--['1001', 21.0, 21.0],['1002', 25.0, 25.0]
9:00:35['1002', 26.0]--
9:00:40['1002', 27.0]--
9:00:55['1001', 19.0]--
9:00:00--['1002', 26.5, 26.0]
9:01:10-[23.6, 27.0, 5]-
9:01:20['1001', 21.0]--
9:01:30--['1001', 21.0, 21.0]
9:02:10-[21.0, 21.0, 1]-