Skip to main content

collect (Aggregate Function)

Collect multiple key-value pairs to construct a map. Only distinct keys are collected. If a duplicate key arrives, then it overrides the old value.

Syntax

<OBJECT> map:collect(<INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
keyKey of the map entryINT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes
valueValue of the map entryOBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

Example 1

@info(name = 'StockDetailsCollection')
INSERT INTO OutputStream
SELECT map:collect(symbol, price) AS stockDetails
FROM StockStream WINDOW TUMBLING_LENGTH(10);

In this example, the query named 'StockDetailsCollection' applies a tumbling window of length 10 to StockStream. The map:collect(symbol, price) function is used to create a map for each window of events, where it pairs the symbol attribute as the key and the price attribute as the value. This means that for every ten events in the StockStream, it forms a map of stock symbols to their respective prices.

Upon the expiration of the window (i.e., after processing 10 events), the query emits a single event into OutputStream. This event contains a map named stockDetails, which holds the collected key-value pairs of stock symbols and their corresponding prices from the events in the window.

Example 2

CREATE STREAM StockInputStream (symbol string, price double, volume int);
CREATE SINK STREAM StockPriceVolumeMapStream (symbol string, priceMap object, volumeMap object);

@info(name = 'CollectPriceAndVolumeDetails')
INSERT INTO StockPriceVolumeMapStream
SELECT symbol, map:collect('Price', price) AS priceMap, map:collect('Volume', volume) AS volumeMap
FROM StockInputStream WINDOW TUMBLING_TIME(5 min);

In this example, the stream worker 'CollectPriceAndVolumeDetails' uses the StockInputStream as the input stream for the query. This stream comprises of stock symbols, their prices, and trading volumes.

The output of the worker is directed to the StockPriceVolumeMapStream, which includes the symbol and two objects, priceMap and volumeMap. Each object is a map generated by the map:collect function.

The map:collect function is used twice in this query. In the first case, it pairs the key 'Price' with the corresponding prices from the input stream. In the second case, it pairs the key 'Volume' with the corresponding volumes from the input stream.

A tumbling window of length 5 minutes is applied to the StockInputStream. After processing events for this period, the query emits an event to the StockPriceVolumeMapStream. This event comprises the stock symbol and the two objects priceMap and volumeMap. These objects hold the collected key-value pairs for 'Price' and 'Volume' and their corresponding values from the window of events.