akslack (Stream Processor)
Stream processor performs reordering of out-of-order events optimized for a given parameter using AQ-K-Slack algorithm. This is best for reordering events on attributes those are used for aggregations.data .
Syntax
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k, <BOOL> discard.late.arrival)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k, <BOOL> discard.late.arrival, <DOUBLE> error.threshold, <DOUBLE> confidence.level)
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
timestamp | The event timestamp on which the events should be ordered. | LONG | No | Yes | |
correlation.field | By monitoring the changes in this field Alpha K-Slack dynamically optimizes its behavior. This field is used to calculate the runtime window coverage threshold, which represents the upper limit set for unsuccessfully handled late arrivals. | INT FLOAT LONG DOUBLE | No | Yes | |
batch.size | The parameter batch.size denotes the number of events that should be considered in the calculation of an alpha value. This should be greater than or equal to 15. | 10,000 | LONG | Yes | No |
timeout | A timeout value in milliseconds, where the buffered events who are older than the given timeout period get flushed every second. | `-1` (timeout is infinite) | LONG | Yes | No |
max.k | The maximum K-Slack window threshold (K parameter). | 9,223,372,036,854,775,807 (The maximum Long value) | LONG | Yes | No |
discard.late.arrival | If set to true the processor would discarded the out-of-order events arriving later than the K-Slack window, and in otherwise it allows the late arrivals to proceed. | false | BOOL | Yes | No |
error.threshold | The error threshold to be applied in Alpha K-Slack algorithm. | 0.03 (3%) | DOUBLE | Yes | No |
confidence.level | The confidence level to be applied in Alpha K-Slack algorithm. | 0.95 (95%) | DOUBLE | Yes | No |
Example 1
CREATE STREAM StockStream (eventTime long, symbol string, volume long);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT eventTime, symbol, sum(volume) AS total
FROM StockStream#reorder:akslack(eventTime, volume, 20) WINDOW SLIDING_TIME(5 min);
This query processes stock events from the StockStream
and outputs aggregated volume information to the OutputStream
.
The #reorder:akslack(eventTime, volume, 20)
part of the query is an event reordering extension. It reorders events based on the eventTime
attribute value and optimizes for aggregating the volume
attribute, considering the last 20 events. This reordering technique helps to minimize the error in the aggregation result due to out-of-order events.
The query uses a WINDOW SLIDING_TIME(5 min)
clause, which means that it calculates the aggregation result for a sliding window of 5 minutes.
The query's output includes the eventTime
, symbol
, and the total volume of stocks within the 5-minute sliding window (calculated using sum(volume)
). The result is then directed to the OutputStream
.