Skip to main content

Functions

Core

and (Aggregate Function)

Returns the results of AND operation for all the events.

Syntax

    <BOOL> and(<BOOL> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be AND operation.BOOLNoYes

EXAMPLE 1

    insert into alertStream
select and(isFraud) as isFraudTransaction
from cscStream WINDOW TUMBLING_LENGTH(10);

This will returns the result for AND operation of isFraud values as a boolean value for event chunk expiry by window length batch.

avg (Aggregate Function)

Calculates the average for all the events.

Syntax

    <DOUBLE> avg(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that need to be averaged.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into barStream
select avg(temp) as avgTemp
from fooStream WINDOW TUMBLING_TIME;

avg(temp) returns the average temp value for all the events based on their arrival and expiry.

count (Aggregate Function)

Returns the count of all the events.

Syntax

    <LONG> count()
<LONG> count(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThis function accepts one parameter. It can belong to any one of the available types.INT LONG DOUBLE FLOAT STRING BOOL OBJECTYesYes

EXAMPLE 1

    insert into barStream
select count() as count
from fooStream WINDOW TUMBLING_TIME(10 sec);

This will return the count of all the events for time batch in 10 seconds.

distinctCount (Aggregate Function)

This returns the count of distinct occurrences for a given arg.

Syntax

    <LONG> distinctCount(<INT|LONG|DOUBLE|FLOAT|STRING> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe object for which the number of distinct occurences needs to be counted.INT LONG DOUBLE FLOAT STRINGNoYes

EXAMPLE 1

    insert into barStream
select distinctcount(pageID) as count
from fooStream;

distinctcount(pageID) for the following output returns 3 when the available values are as follows.

  • WEB_PAGE_1
  • WEB_PAGE_1
  • WEB_PAGE_2
  • WEB_PAGE_3
  • WEB_PAGE_1
  • WEB_PAGE_2

The three distinct occurences identified are WEB_PAGE_1, WEB_PAGE_2, and WEB_PAGE_3.

max (Aggregate Function)

Returns the maximum value for all the events.

Syntax

    <INT|LONG|DOUBLE|FLOAT> max(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be compared to find the maximum value.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into barStream
select max(temp) as maxTemp
from fooStream WINDOW TUMBLING_TIME(10 sec);

max(temp) returns the maximum temp value recorded for all the events based on their arrival and expiry.

maxForever (Aggregate Function)

This is the attribute aggregator to store the maximum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.

Syntax

    <INT|LONG|DOUBLE|FLOAT> maxForever(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be compared to find the maximum value.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into outputStream
select maxForever(temp) as max
from inputStream;

maxForever(temp) returns the maximum temp value recorded for all the events throughout the lifetime of the query.

min (Aggregate Function)

Returns the minimum value for all the events.

Syntax

    <INT|LONG|DOUBLE|FLOAT> min(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be compared to find the minimum value.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into outputStream
select min(temp) as minTemp
from inputStream;

min(temp) returns the minimum temp value recorded for all the events based on their arrival and expiry.

minForever (Aggregate Function)

This is the attribute aggregator to store the minimum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.

Syntax

    <INT|LONG|DOUBLE|FLOAT> minForever(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be compared to find the minimum value.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into outputStream
select minForever(temp) as max
from inputStream;

minForever(temp) returns the minimum temp value recorded for all the events throughout the lifetime of the query.

or (Aggregate Function)

Returns the results of OR operation for all the events.

Syntax

    <BOOL> or(<BOOL> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be OR operation.BOOLNoYes

EXAMPLE 1

    insert into alertStream
select or(isFraud) as isFraudTransaction
from cscStream WINDOW TUMBLING_LENGTH(10);

This will returns the result for OR operation of isFraud values as a boolean value for event chunk expiry by window length batch.

stdDev (Aggregate Function)

Returns the calculated standard deviation for all the events.

Syntax

    <DOUBLE> stdDev(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that should be used to calculate the standard deviation.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into outputStream
select stddev(temp) as stdTemp
from inputStream;

stddev(temp) returns the calculated standard deviation of temp for all the events based on their arrival and expiry.

sum (Aggregate Function)

Returns the sum for all the events.

Syntax

<LONG|DOUBLE> sum(<INT|LONG|DOUBLE|FLOAT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value that needs to be summed.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    insert into outputStream
select sum(volume) as sumOfVolume
from inputStream;

This will returns the sum of volume values as a long value for each event arrival and expiry.

unionSet (Aggregate Function)

Union multiple sets. This attribute aggregator maintains a union of sets. The given input set is put into the union set and the union set is returned.

Syntax

    <OBJECT> unionSet(<OBJECT> set)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
setThe java.util.Set object that needs to be added into the union set.OBJECTNoYes

EXAMPLE 1

    insert into initStream
select createSet(symbol) as initialSet
from stockStream

insert into distinctStockStream
select unionSet(initialSet) as distinctSymbols
from initStream WINDOW TUMBLING_TIME(10 sec);

distinctStockStream will return the set object which contains the distinct set of stock symbols received during a sliding window of 10 seconds.

UUID (Function)

Generates a UUID (Universally Unique Identifier).

Syntax

    <STRING> UUID()

EXAMPLE 1

    insert into RoomTempStream
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
from TempStream;

This will converts a room number to string, introducing a message ID to each event asUUID() returns a34eec40-32c2-44fe-8075-7f4fde2e2dd8 from TempStream select convert(roomNo, string) as roomNo, temp, UUID() as messageID insert into RoomTempStream;

cast (Function)

Converts the first parameter according to the cast.to parameter. Incompatible arguments cause Class Cast exceptions if further processed. This function is used with map extension that returns attributes of the object type. You can use this function to cast the object to an accurate and concrete type.

Syntax

    <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> cast(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> to.be.caster, <STRING> cast.to)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
to.be.casterThis specifies the attribute to be casted.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes
cast.toA string constant parameter expressing the cast to type using one of the following strings values: int, long, float, double, string, bool.STRINGNoYes

EXAMPLE 1

    insert into barStream
select symbol as name, cast(temp, 'double') as temp
from fooStream;

This will cast the fooStream temp field value into double format.

coalesce (Function)

Returns the value of the first input parameter that is not null, and all input parameters have to be on the same type.

Syntax

    <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> coalesce(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThis function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select coalesce('123', null, '789') as value
from fooStream;

This will returns first null value 123.

EXAMPLE 2

    insert into barStream
select coalesce(null, 76, 567) as value
from fooStream;

This will returns first null value 76.

EXAMPLE 3

    insert into barStream
select coalesce(null, null, null) as value
from fooStream;

This will returns null as there are no notnull values.

convert (Function)

Converts the first input parameter according to the convertedTo parameter.

Syntax

    <INT|LONG|DOUBLE|FLOAT|STRING|BOOL> convert(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> to.be.converted, <STRING> converted.to)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
to.be.convertedThis specifies the value to be converted.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes
converted.toA string constant parameter to which type the attribute need to be converted using one of the following strings values: int, long, float, double, string, bool.STRINGNoYes

EXAMPLE 1

    insert into barStream
select convert(temp, 'double') as temp
from fooStream;

This will convert fooStream temp value into double.

EXAMPLE 2

    insert into barStream
select convert(temp, 'int') as temp
from fooStream;

This will convert fooStream temp value into int (value = "convert(45.9, int) returns 46").

createSet (Function)

Includes the given input parameter in a java.util.HashSet and returns the set.

Syntax

    <OBJECT> createSet(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL> input)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
inputThe input that needs to be added into the set.INT LONG DOUBLE FLOAT STRING BOOLNoYes

EXAMPLE 1

    insert into initStream
select createSet(symbol) as initialSet
from stockStream;

For every incoming stockStream event, the initStream stream will produce a set object having only one element: the symbol in the incoming stockStream.

currentTimeMillis (Function)

Returns the current timestamp of stream processor application in milliseconds.

Syntax

    <LONG> currentTimeMillis()

EXAMPLE 1

    insert into barStream
select symbol as name, currentTimeMillis() as eventTimestamp
from fooStream;

This will extract current stream processor application timestamp.

default (Function)

Checks if the attribute parameter is null and if so returns the value of the default parameter

Syntax

    <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> default(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> attribute, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> default)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
attributeThe attribute that could be null.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes
defaultThe default value that will be used when attribute parameter is nullINT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into StandardTempStream
select default(temp, 0.0) as temp, roomNum
from TempStream;

This will replace TempStream's temp attribute with default value if the temp is null.

eventTimestamp (Function)

Returns the timestamp of the processed event.

Syntax

    <LONG> eventTimestamp()

EXAMPLE 1

    insert into barStream
select symbol as name, eventTimestamp() as eventTimestamp
from fooStream;

This will extract current events timestamp.

ifThenElse (Function)

Evaluates the condition parameter and returns value of the if.expression parameter if the condition is true, or returns value of the else.expression parameter if the condition is false. Here both if.expression and else.expression should be of the same type.

Syntax

    <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> ifThenElse(<BOOL> condition, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> if.expression, <INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> else.expression)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
conditionThis specifies the if then else condition value.BOOLNoYes
if.expressionThis specifies the value to be returned if the value of the condition parameter is true.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes
else.expressionThis specifies the value to be returned if the value of the condition parameter is false.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    @info(name = 'query1')
insert into outputStream
select sensorValue, ifThenElse(sensorValue>35,'High','Low') as status
from sensorEventStream;

This will returns High if sensorValue = 50.

EXAMPLE 2

    @info(name = 'query1')
insert into outputStream
select sensorValue, ifThenElse(voltage < 5, 0, 1) as status
from sensorEventStream;

This will returns 1 if voltage= 12.

EXAMPLE 3

    @info(name = 'query1')
insert into outputStream
select userName, ifThenElse(password == 'admin', true, false) as passwordState
from userEventStream;

This will returns passwordState as true if password = admin.

instanceOfBoolean (Function)

Checks whether the parameter is an instance of Boolean or not.

Syntax

<BOOL> instanceOfBoolean(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select instanceOfBoolean(switchState) as state
from fooStream;

This will return true if the value of switchState is true.

EXAMPLE 2

    insert into barStream
select instanceOfBoolean(value) as state
from fooStream;

if the value = 32 then this will returns false as the value is not an instance of the boolean.

instanceOfDouble (Function)

Checks whether the parameter is an instance of Double or not.

Syntax

<BOOL> instanceOfDouble(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1 insert into barStream select instanceOfDouble(value) as state from fooStream;

This will return true if the value field format is double ex : 56.45.

EXAMPLE 2

    insert into barStream
select instanceOfDouble(switchState) as state
from fooStream;

if the switchState = true then this will returns false as the value is not an instance of the double.

instanceOfFloat (Function)

Checks whether the parameter is an instance of Float or not.

Syntax

<BOOL> instanceOfFloat(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select instanceOfFloat(value) as state
from fooStream;

This will return true if the value field format is float ex : 56.45f.

EXAMPLE 2

    insert into barStream
select instanceOfFloat(switchState) as state
from fooStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a float.

instanceOfInteger (Function)

Checks whether the parameter is an instance of Integer or not.

Syntax

<BOOL> instanceOfInteger(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select instanceOfInteger(value) as state
from fooStream;

This will return true if the value field format is integer.

EXAMPLE 2

    insert into barStream
select instanceOfInteger(switchState) as state
from fooStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a long.

instanceOfLong (Function)

Checks whether the parameter is an instance of Long or not.

Syntax

<BOOL> instanceOfLong(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select instanceOfLong(value) as state
from fooStream;

This will return true if the value field format is long ex : 56456l.

EXAMPLE 2

    insert into barStream
select instanceOfLong(switchState) as state
from fooStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a long.

instanceOfString (Function)

Checks whether the parameter is an instance of String or not.

Syntax

<BOOL> instanceOfString(<INT|LONG|DOUBLE|FLOAT|STRING|BOOL|OBJECT> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe parameter to be checked.INT LONG DOUBLE FLOAT STRING BOOL OBJECTNoYes

EXAMPLE 1

    insert into barStream
select instanceOfString(value) as state
from fooStream;

This will return true if the value field format is string ex : test.

EXAMPLE 2

    insert into barStream
select instanceOfString(switchState) as state
from fooStream;

if the switchState = true then this will returns false as the value is an instance of the boolean not a string.

maximum (Function)

Returns the maximum value of the input parameters.

Syntax

<INT|LONG|DOUBLE|FLOAT> maximum(<INT|LONG|DOUBLE|FLOAT> arg, <INT|LONG|DOUBLE|FLOAT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThis function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    @info(name = 'query1') from inputStream
insert into outputStream
select maximum(price1, price2, price3) as max;

This will returns the maximum value of the input parameters price1, price2, price3.

minimum (Function)

Returns the minimum value of the input parameters.

Syntax

<INT|LONG|DOUBLE|FLOAT> minimum(<INT|LONG|DOUBLE|FLOAT> arg, <INT|LONG|DOUBLE|FLOAT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThis function accepts one or more parameters. They can belong to any one of the available types. All the specified parameters should be of the same type.INT LONG DOUBLE FLOATNoYes

EXAMPLE 1

    @info(name = 'query1') from inputStream
insert into outputStream
select maximum(price1, price2, price3) as max;

This will returns the minimum value of the input parameters price1, price2, price3.

sizeOfSet (Function)

Returns the size of an object of type java.util.Set.

Syntax

<INT> sizeOfSet(<OBJECT> set)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
setThe set object. This parameter should be of type java.util.Set. A set object may be created by the set attribute aggregator.OBJECTNoYes

EXAMPLE 1 insert into initStream select initSet(symbol) as initialSet from stockStream;

insert into distinctStockStream
select union(initialSet) as distinctSymbols
from initStream WINDOW TUMBLING_TIME(10 sec);

insert into sizeStream
select sizeOfSet(distinctSymbols) sizeOfSymbolSet
from distinctStockStream;

The sizeStream stream will output the number of distinct stock symbols received during a sliding window of 10 seconds.

pol2Cart (Stream Function)

The pol2Cart function calculating the cartesian coordinates x & y for the given theta, rho coordinates and adding them as new attributes to the existing events.

Syntax

pol2Cart(<DOUBLE> theta, <DOUBLE> rho)
pol2Cart(<DOUBLE> theta, <DOUBLE> rho, <DOUBLE> z)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
thetaThe theta value of the coordinates.DOUBLENoYes
rhoThe rho value of the coordinates.DOUBLENoYes
zz value of the cartesian coordinates.If z value is not given, drop the third parameter of the output.DOUBLEYesYes

EXAMPLE 1

insert into outputStream
select x, y
from PolarStream#pol2Cart(theta, rho);

This will return cartesian coordinates (4.99953024681082, 0.06853693328228748) for theta: 0.7854 and rho: 5.

EXAMPLE 2

insert into outputStream
select x, y, z
from PolarStream#pol2Cart(theta, rho, 3.4);

This will return cartesian coordinates (4.99953024681082, 0.06853693328228748, 3.4)for theta: 0.7854 and rho: 5 and z: 3.4.

log (Stream Processor)

Logs the message on the given priority with or without the processed event.

Syntax

log()
log(<STRING> log.message)
log(<BOOL> is.event.logged)
log(<STRING> log.message, <BOOL> is.event.logged)
log(<STRING> priority, <STRING> log.message)
log(<STRING> priority, <STRING> log.message, <BOOL> is.event.logged)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
priorityThe priority/type of this log message (INFO, DEBUG, WARN, FATAL, ERROR, OFF, TRACE).INFOSTRINGYesNo
log.messageThis message will be logged.:STRINGYesYes
is.event.loggedTo log the processed event.trueBOOLYesNo

EXAMPLE 1

insert into BarStream
select *
from FooStream#log();

Logs events with StreamApp name message prefix on default log level INFO.

EXAMPLE 2

insert into BarStream
select *
from FooStream#log("Sample Event :");

Logs events with the message prefix "Sample Event :" on default log level INFO.

EXAMPLE 3

insert into BarStream
select *
from FooStream#log("DEBUG", "Sample Event :", true);

Logs events with the message prefix "Sample Event :" on log level DEBUG.

EXAMPLE 4

insert into BarStream
select *
from FooStream#log("Event Arrived", false);

For each event logs a message "Event Arrived" on default log level INFO.

EXAMPLE 5

insert into BarStream
select *
from FooStream#log("Sample Event :", true);

Logs events with the message prefix "Sample Event :" on default log level INFO.

EXAMPLE 6

insert into BarStream
select *
from FooStream#log(true);

Logs events with on default log level INFO.

batch (Window)

A window that holds an incoming events batch. When a new set of events arrives, the previously arrived old events will be expired. Batch window can be used to aggregate events that comes in batches. If it has the parameter length specified, then batch window process the batch as several chunks.

Syntax

batch()
batch(<INT> window.length)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.lengthThe length of a chunkIf length value was not given it assign 0 as length and process the whole batch as onceINTYesNo

EXAMPLE 1

CREATE STREAM consumerItemStream (itemId string, price float)
insert into outputStream
select price, str:groupConcat(itemId) as itemIds
from consumerItemStream WINDOW TUMBLING()
group by price;

This will output comma separated items IDs that have the same price for each incoming batch of events.

cron (Window)

This window outputs the arriving events as and when they arrive, and resets (expires) the window periodically based on the given cron expression.

Syntax

cron(<STRING> cron.expression)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
cron.expressionThe cron expression that resets the window.STRINGNoNo

EXAMPLE 1

CREATE STREAM  InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as totalPrice
from InputEventStream#cron('*/5 * * * * ?');

This let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds.

EXAMPLE 2

CREATE STREAM StockEventStream (symbol string, price float, volume int)
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) cron('*/5 * * * * ?');

@info(name = 'query0')
insert into StockEventWindow
from StockEventStream;

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as totalPrice
from StockEventWindow;

The defined window will let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds.

delay (Window)

A delay window holds events for a specific time period that is regarded as a delay period before processing them.

Syntax

delay(<INT|LONG|TIME> window.delay)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.delayThe time period (specified in sec, min, ms) for which the window should delay the events.INT LONG TIMENoNo

EXAMPLE 1

CREATE WINDOW delayWindow(symbol string, volume int) delay(1 hour);
CREATE STREAM PurchaseStream(symbol string, volume int);
CREATE STREAM DeliveryStream(symbol string);
CREATE STREAM OutputStream(symbol string);

@info(name='query1')
insert into delayWindow
select symbol, volume
from PurchaseStream;

@info(name='query2')
insert into OutputStream
select delayWindow.symbol
from delayWindow join DeliveryStream
on delayWindow.symbol == DeliveryStream.symbol;

In this example, purchase events that arrive in the PurchaseStream stream are directed to a delay window. At any given time, this delay window holds purchase events that have arrived within the last hour. These purchase events in the window are matched by the symbol attribute, with delivery events that arrive in the DeliveryStream stream. This monitors whether the delivery of products is done with a minimum delay of one hour after the purchase.

externalTime (Window)

A sliding time window based on external time. It holds events that arrived during the last windowTime period from the external timestamp, and gets updated on every monotonically increasing timestamp.

Syntax

externalTime(<LONG> timestamp, <INT|LONG|TIME> window.time)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
timestampThe time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.LONGNoYes
window.timeThe sliding time period for which the window should hold events.INT LONG TIMENoNo

EXAMPLE 1

CREATE WINDOW cseEventWindow (symbol string, price float, volume int) externalTime(eventTime, 20 sec) output expired events;

@info(name = 'query0')
insert into cseEventWindow
from cseEventStream;

@info(name = 'query1')
insert expired events into outputStream
select symbol, sum(price) as price
from cseEventWindow;

processing events arrived within the last 20 seconds from the eventTime and output expired events.

externalTimeBatch (Window)

A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime.

Syntax

externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time, <INT|LONG|TIME> timeout)
externalTimeBatch(<LONG> timestamp, <INT|LONG|TIME> window.time, <INT|LONG|TIME> start.time, <INT|LONG|TIME> timeout, <BOOL> replace.with.batchtime)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
timestampThe time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.LONGNoYes
window.timeThe batch time period for which the window should hold events.INT LONG TIMENoNo
start.timeUser defined start time. This could either be a constant (of type int, long or time) or an attribute of the corresponding stream (of type long). If an attribute is provided, initial value of attribute would be considered as startTime.Timestamp of first eventINT LONG TIMEYesYes
timeoutTime to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch.System waits till an event from next batch arrives to flush current batchINT LONG TIMEYesNo
replace.with.batchtimeThis indicates to replace the expired event timeStamp as the batch end timeStampSystem waits till an event from next batch arrives to flush current batchBOOLYesNo

EXAMPLE 1

CREATE WINDOW cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 1 sec) output expired events;
@info(name = 'query0')
insert into cseEventWindow
from cseEventStream;

@info(name = 'query1')
insert expired events into outputStream
select symbol, sum(price) as price
from cseEventWindow;

This will processing events that arrive every 1 seconds from the eventTime.

EXAMPLE 2

CREATE WINDOW cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 20 sec, 0) output expired events;

This will processing events that arrive every 1 seconds from the eventTime. Starts on 0th millisecond of an hour.

EXAMPLE 3

CREATE WINDOW cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 2 sec, eventTimestamp, 100) output expired events;

This will processing events that arrive every 2 seconds from the eventTim. Considers the first event's eventTimestamp value as startTime. Waits 100 milliseconds for the arrival of a new event before flushing current batch.

frequent (Window)

Deprecated

This window returns the latest events with the most frequently occurred value for a given attribute(s). Frequency calculation for this window processor is based on Misra-Gries counting algorithm.

Syntax

frequent(<INT> event.count)
frequent(<INT> event.count, <STRING> attribute)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
event.countThe number of most frequent events to be emitted to the stream.INTNoNo
attributeThe attributes to group the events. If no attributes are given, the concatenation of all the attributes of the event is considered.The concatenation of all the attributes of the event is considered.STRINGYesYes

EXAMPLE 1

@info(name = 'query1')
select cardNo, price
from purchase[price >= 30] WINDOW FREQUENT(2)
insert all events into PotentialFraud;

This will returns the 2 most frequent events.

EXAMPLE 2

@info(name = 'query1')
select cardNo, price
from purchase[price >= 30] WINDOW FREQUENT(2, cardNo)
insert all events into PotentialFraud;

This will returns the 2 latest events with the most frequently appeared card numbers.

length (Window)

A sliding length window that holds the last window.length events at a given time, and gets updated for each arrival and expiry.

Syntax

length(<INT> window.length)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.lengthThe number of events that should be included in a sliding length window.INTNoNo

EXAMPLE 1

CREATE WINDOW StockEventWindow (symbol string, price float, volume int) length(10) output all events;

@info(name = 'query0')
insert into StockEventWindow
from StockEventStream;

@info(name = 'query1')
insert all events into outputStream
select symbol, sum(price) as price
from StockEventWindow;

This will process last 10 events in a sliding manner.

lengthBatch (Window)

A batch (tumbling) length window that holds and process a number of events as specified in the window.length.

Syntax

lengthBatch(<INT> window.length)
lengthBatch(<INT> window.length, <BOOL> stream.current.event)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.lengthThe number of events the window should tumble.INTNoNo
stream.current.eventLet the window stream the current events out as and when they arrive to the window while expiring them in batches.falseBOOLYesNo

EXAMPLE 1

CREATE STREAM InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as price
from InputEventStream#lengthBatch(10);

This collect and process 10 events as a batch and output them.

EXAMPLE 2

CREATE STREAM InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as sumPrice
from InputEventStream#lengthBatch(10, true);

This window sends the arriving events directly to the output letting the sumPrice to increase gradually, after every 10 events it clears the window as a batch and resets the sumPrice to zero.

EXAMPLE 3

CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) lengthBatch(10) output all events;

@info(name = 'query0')
insert into StockEventWindow
from InputEventStream;

@info(name = 'query1')
insert all events into OutputStream
select symbol, sum(price) as price
from StockEventWindow;

This uses an defined window to process 10 events as a batch and output all events.

lossyFrequent (Window)

Deprecated

This window identifies and returns all the events of which the current frequency exceeds the value specified for the supportThreshold parameter.

Syntax

lossyFrequent(<DOUBLE> support.threshold)
lossyFrequent(<DOUBLE> support.threshold, <DOUBLE> error.bound)
lossyFrequent(<DOUBLE> support.threshold, <DOUBLE> error.bound, <STRING> attribute)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
support.thresholdThe support threshold value.DOUBLENoNo
error.boundThe error bound value.`support.threshold`/10DOUBLEYesNo
attributeThe attributes to group the events. If no attributes are given, the concatenation of all the attributes of the event is considered.The concatenation of all the attributes of the event is considered.STRINGYesYes

EXAMPLE 1

CREATE STREAM purchase (cardNo string, price float);
CREATE WINDOW purchaseWindow (cardNo string, price float) lossyFrequent(0.1, 0.01);

@info(name = 'query0')
insert into purchaseWindow
from purchase[price >= 30];

@info(name = 'query1')
insert all events into PotentialFraud
select cardNo, price
from purchaseWindow;

lossyFrequent(0.1, 0.01) returns all the events of which the current frequency exceeds 0.1, with an error bound of 0.01.

EXAMPLE 2

CREATE STREAM purchase (cardNo string, price float);
CREATE WINDOW purchaseWindow (cardNo string, price float) lossyFrequent(0.3, 0.05, cardNo);

@info(name = 'query0')
insert into purchaseWindow
from purchase[price >= 30];

@info(name = 'query1')
insert all events into PotentialFraud
select cardNo, price
from purchaseWindow;

lossyFrequent(0.3, 0.05, cardNo) returns all the events of which the cardNo attributes frequency exceeds 0.3, with an error bound of 0.05.

session (Window)

Holds events that belong to a session. Events belong to a specific session are identified by a session key, and a session gap is determines the time period after which the session is considered to be expired. To have meaningful aggregation on session windows, the events need to be aggregated based on session key via a group by clause.

Syntax

session(<INT|LONG|TIME> session.gap)
session(<INT|LONG|TIME> session.gap, <STRING> session.key)
session(<INT|LONG|TIME> session.gap, <STRING> session.key, <INT|LONG|TIME> allowed.latency)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
session.gapThe time period after which the session is considered to be expired.INT LONG TIMENoNo
session.keyThe session identification attribute. Used to group events belonging to a specific session.default-keySTRINGYesYes
allowed.latencyThe time period for which the session window is valid after the expiration of the session, to accept late event arrivals. This time period should be less than the session.gap parameter.0INT LONG TIMEYesNo

EXAMPLE 1

CREATE STREAM PurchaseEventStream (user string, item_number int, price float, quantity int);

@info(name='query1)
insert into OutputStream
select user, sum(quantity) as totalQuantity, sum(price) as totalPrice
from PurchaseEventStream WINDOW SESSION(5 sec, user)
group by user;

From the events arriving at the PurchaseEventStream, a session window with 5 seconds session gap is processed based on user attribute as the session group identification key. All events falling into the same session are aggregated based on user attribute, and outputted to the OutputStream.

EXAMPLE 2

CREATE STREAM PurchaseEventStream (user string, item_number int, price float, quantity int);

@info(name='query2)
insert into OutputStream
select user, sum(quantity) as totalQuantity, sum(price) as totalPrice
from PurchaseEventStream WINDOW SESSION(5 sec, user, 2 sec)
group by user;

From the events arriving at the PurchaseEventStream, a session window with 5 seconds session gap is processed based on user attribute as the session group identification key. This session window is kept active for 2 seconds after the session expiry to capture late (out of order) event arrivals. If the event timestamp falls in to the last session the session is reactivated. Then all events falling into the same session are aggregated based on user attribute, and outputted to the OutputStream.

sort (Window)

This window holds a batch of events that equal the number specified as the windowLength and sorts them in the given order.

Syntax

sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute)
sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING> order, <STRING> ...)
sort(<INT> window.length, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING> order, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> attribute, <STRING|DOUBLE|INT|LONG|FLOAT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.lengthThe size of the window length.INTNoNo
attributeThe attribute that should be checked for the order.The concatenation of all the attributes of the event is considered.STRING DOUBLE INT LONG FLOAT LONGNoYes
orderThe order define as "asc" or "desc".ascSTRINGYesNo

EXAMPLE 1

CREATE STREAM cseEventStream (symbol string, price float, volume long);
CREATE WINDOW cseEventWindow (symbol string, price float, volume long) sort(2,volume, 'asc');

@info(name = 'query0')
insert into cseEventWindow
from cseEventStream;

@info(name = 'query1')
insert all events into outputStream
select volume
from cseEventWindow;

sort(5, price, asc) keeps the events sorted by price in the ascending order. Therefore, at any given time, the window contains the 5 lowest prices.

time (Window)

A sliding time window that holds events that arrived during the last windowTime period at a given time, and gets updated for each event arrival and expiry.

Syntax

time(<INT|LONG|TIME> window.time)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.timeThe sliding time period for which the window should hold events.INT LONG TIMENoNo

EXAMPLE 1

CREATE WINDOW cseEventWindow (symbol string, price float, volume int) time(20) output all events;

@info(name = 'query0')
insert into cseEventWindow
from cseEventStream;

@info(name = 'query1')
insert all events into outputStream
select symbol, sum(price) as price
from cseEventWindow;

This will processing events that arrived within the last 20 milliseconds.

timeBatch (Window)

A batch (tumbling) time window that holds and process events that arrive during window.time period as a batch.

Syntax

timeBatch(<INT|LONG|TIME> window.time)
timeBatch(<INT|LONG|TIME> window.time, <INT|LONG> start.time)
timeBatch(<INT|LONG|TIME> window.time, <BOOL> stream.current.event)
timeBatch(<INT|LONG|TIME> window.time, <INT|LONG> start.time, <BOOL> stream.current.event)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.timeThe batch time period in which the window process the events.INT LONG TIMENoNo
start.timeThis specifies an offset in milliseconds in order to start the window at a time different to the standard time.Timestamp of first eventINT LONGYesNo
stream.current.eventLet the window stream the current events out as and when they arrive to the window while expiring them in batches.falseBOOLYesNo

EXAMPLE 1

CREATE STREAM InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as price
from InputEventStream#timeBatch(20 sec);

This collect and process incoming events as a batch every 20 seconds and output them.

EXAMPLE 2

CREATE STREAM InputEventStream (symbol string, price float, volume int);

@info(name = 'query1')
insert into OutputStream
select symbol, sum(price) as sumPrice
from InputEventStream#timeBatch(20 sec, true);

This window sends the arriving events directly to the output letting the sumPrice to increase gradually and on every 20 second interval it clears the window as a batch resetting the sumPrice to zero.

EXAMPLE 3

CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) timeBatch(20 sec) output all events;

@info(name = 'query0')
insert into StockEventWindow
from InputEventStream;

@info(name = 'query1')
insert all events into OutputStream
select symbol, sum(price) as price
from StockEventWindow;

This uses an defined window to process events arrived every 20 seconds as a batch and output all events.

timeLength (Window)

A sliding time window that, at a given time holds the last window.length events that arrived during last window.time period, and gets updated for every event arrival and expiry.

Syntax

timeLength(<INT|LONG|TIME> window.time, <INT> window.length)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
window.timeThe sliding time period for which the window should hold events.INT LONG TIMENoNo
window.lengthThe number of events that should be be included in a sliding length window..INTNoNo

EXAMPLE 1

CREATE STREAM cseEventStream (symbol string, price float, volume int);
CREATE WINDOW cseEventWindow (symbol string, price float, volume int) timeLength(2 sec, 10);

@info(name = 'query0')
insert into cseEventWindow
from cseEventStream;

@info(name = 'query1')
insert all events into outputStream
from cseEventWindow select symbol, price, volume;

window.timeLength(2 sec, 10) holds the last 10 events that arrived during last 2 seconds and gets updated for every event arrival and expiry.

Js

eval (Function)

This extension evaluates a given string and return the output according to the user specified data type.

Syntax

<INT|LONG|DOUBLE|FLOAT|STRING|BOOL> js:eval(<STRING> expression, <STRING> return.type)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
expressionAny single line js expression or function.STRINGNoYes
return.typeThe return type of the evaluated expression. Supported types are int|long|float|double|bool|string.STRINGNoNo

EXAMPLE 1

js:eval("700 > 800", 'bool')

In this example, the expression 700 > 800 will be evaluated and return result as false because user specified return type as bool.

Json

group (Aggregate Function)

This function aggregates the JSON elements and returns a JSON object by adding enclosing.element if it is provided. If enclosing.element is not provided it aggregate the JSON elements returns a JSON array.

Syntax

<OBJECT> json:group(<STRING|OBJECT> json)
<OBJECT> json:group(<STRING|OBJECT> json, <BOOL> distinct)
<OBJECT> json:group(<STRING|OBJECT> json, <STRING> enclosing.element)
<OBJECT> json:group(<STRING|OBJECT> json, <STRING> enclosing.element, <BOOL> distinct)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON element that needs to be aggregated.STRING OBJECTNoYes
enclosing.elementThe JSON element used to enclose the aggregated JSON elements.EMPTY_STRINGSTRINGYesYes
distinctThis is used to only have distinct JSON elements in the concatenated JSON object/array that is returned.falseBOOLYesYes

EXAMPLE 1

select json:group("json") as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"12:20"}, it returns [{"date":"2013-11-19","time":"10:30"}{"date":"2013-11-19","time":"12:20"}] to the OutputStream.

EXAMPLE 2

select json:group("json", true) as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"10:30"}, it returns [{"date":"2013-11-19","time":"10:30"}] to the OutputStream.

EXAMPLE 3

select json:group("json", "result") as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"12:20"}, it returns {"result":[{"date":"2013-11-19","time":"10:30"},{"date":"2013-11-19","time":"12:20"}} to the OutputStream.

EXAMPLE 4

select json:group("json", "result", true) as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"10:30"}, it returns {"result":[{"date":"2013-11-19","time":"10:30"}]} to the OutputStream.

groupAsObject (Aggregate Function)

This function aggregates the JSON elements and returns a JSON object by adding enclosing.element if it is provided. If enclosing.element is not provided it aggregate the JSON elements returns a JSON array.

Syntax

<OBJECT> json:groupAsObject(<STRING|OBJECT> json)
<OBJECT> json:groupAsObject(<STRING|OBJECT> json, <BOOL> distinct)
<OBJECT> json:groupAsObject(<STRING|OBJECT> json, <STRING> enclosing.element)
<OBJECT> json:groupAsObject(<STRING|OBJECT> json, <STRING> enclosing.element, <BOOL> distinct)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON element that needs to be aggregated.STRING OBJECTNoYes
enclosing.elementThe JSON element used to enclose the aggregated JSON elements.EMPTY_STRINGSTRINGYesYes
distinctThis is used to only have distinct JSON elements in the concatenated JSON object/array that is returned.falseBOOLYesYes

EXAMPLE 1

select json:groupAsObject("json") as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"12:20"}, it returns [{"date":"2013-11-19","time":"10:30"}{"date":"2013-11-19","time":"12:20"}] to the OutputStream.

EXAMPLE 2

select json:groupAsObject("json", true) as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"10:30"}, it returns [{"date":"2013-11-19","time":"10:30"}] to the OutputStream.

EXAMPLE 3

select json:groupAsObject("json", "result") as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"12:20"}, it returns {"result":[{"date":"2013-11-19","time":"10:30"},{"date":"2013-11-19","time":"12:20"}} to the OutputStream.

EXAMPLE 4

select json:groupAsObject("json", "result", true) as groupedJSONArray
from InputStream WINDOW SLIDING_LENGTH(5)
input OutputStream;

When we input events having values for the json as {"date":"2013-11-19","time":"10:30"} and {"date":"2013-11-19","time":"10:30"}, it returns {"result":[{"date":"2013-11-19","time":"10:30"}]} to the OutputStream.

getBool (Function)

Function retrieves the boolean value specified in the given path of the JSON element.

Syntax

<BOOL> json:getBool(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing boolean value.STRING OBJECTNoYes
pathThe JSON path to fetch the boolean value.STRINGNoYes

EXAMPLE 1

json:getBool(json,'$.married')

If the json is the format {'name' : 'John', 'married' : true}, the function returns true as there is a matching boolean at $.married.

EXAMPLE 2

json:getBool(json,'$.name')

If the json is the format {'name' : 'John', 'married' : true}, the function returns null as there is no matching boolean at $.name.

EXAMPLE 3

json:getBool(json,'$.foo')

If the json is the format {'name' : 'John', 'married' : true}, the function returns null as there is no matching element at $.foo.

getDouble (Function)

Function retrieves the double value specified in the given path of the JSON element.

Syntax

<DOUBLE> json:getDouble(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing double value.STRING OBJECTNoYes
pathThe JSON path to fetch the double value.STRINGNoYes

EXAMPLE 1

json:getDouble(json,'$.salary')

If the json is the format {'name' : 'John', 'salary' : 12000.0}, the function returns 12000.0 as there is a matching double at $.salary.

EXAMPLE 2

json:getDouble(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

EXAMPLE 3

json:getDouble(json,'$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching double at $.name.

getFloat (Function)

Function retrieves the float value specified in the given path of the JSON element.

Syntax

<FLOAT> json:getFloat(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing float value.STRING OBJECTNoYes
pathThe JSON path to fetch the float value.STRINGNoYes

EXAMPLE 1

json:getFloat(json,'$.salary')

If the json is the format {'name' : 'John', 'salary' : 12000.0}, th function returns 12000 as there is a matching float at $.salary.

EXAMPLE 2

json:getFloat(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

EXAMPLE 3

json:getFloat(json,'$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching float at $.name.

getInt (Function)

Function retrieves the int value specified in the given path of the JSON element.

Syntax

<INT> json:getInt(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing int value.STRING OBJECTNoYes
pathThe JSON path to fetch the int value.STRINGNoYes

EXAMPLE 1

json:getInt(json,'$.age')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns 23 as there is a matching int at $.age.

EXAMPLE 2

json:getInt(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

EXAMPLE 3

json:getInt(json,'$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching int at $.name.

getLong (Function)

Function retrieves the long value specified in the given path of the JSON element.

Syntax

<LONG> json:getLong(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing long value.STRING OBJECTNoYes
pathThe JSON path to fetch the long value.STRINGNoYes

EXAMPLE 1

json:getLong(json,'$.age')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns 23 as there is a matching long at $.age.

EXAMPLE 2

json:getLong(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

EXAMPLE 3

json:getLong(json,'$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching long at $.name.

getObject (Function)

Function retrieves the object specified in the given path of the JSON element.

Syntax

<OBJECT> json:getObject(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing the object.STRING OBJECTNoYes
pathThe JSON path to fetch the object.STRINGNoYes

EXAMPLE 1

json:getObject(json,'$.address')

If the json is the format {'name' : 'John', 'address' : {'city' : 'NY', 'country' : 'USA'}}, the function returns {'city' : 'NY', 'country' : 'USA'} as there is a matching object at $.address.

EXAMPLE 2

json:getObject(json,'$.age')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns 23 as there is a matching object at $.age.

EXAMPLE 3

json:getObject(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

getString (Function)

Function retrieves value specified in the given path of the JSON element as a string.

Syntax

<STRING> json:getString(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input containing value.STRING OBJECTNoYes
pathThe JSON path to fetch the value.STRINGNoYes

EXAMPLE 1

json:getString(json,'$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns John as there is a matching string at $.name.

EXAMPLE 2

json:getString(json,'$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns null as there are no matching element at $.salary.

EXAMPLE 3

json:getString(json,'$.age')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns 23 as a string as there is a matching element at $.age.

EXAMPLE 4

json:getString(json,'$.address')

If the json is the format {'name' : 'John', 'address' : {'city' : 'NY', 'country' : 'USA'}}, the function returns {'city' : 'NY', 'country' : 'USA'} as a string as there is a matching element at $.address.

isExists (Function)

Function checks whether there is a JSON element present in the given path or not.

Syntax

<BOOL> json:isExists(<STRING|OBJECT> json, <STRING> path)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON input that needs to be searched for an elements.STRING OBJECTNoYes
pathThe JSON path to check for the element.STRINGNoYes

EXAMPLE 1

json:isExists(json, '$.name')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns true as there is an element in the given path.

EXAMPLE 2

json:isExists(json, '$.salary')

If the json is the format {'name' : 'John', 'age' : 23}, the function returns false as there is no element in the given path.

setElement (Function)

Function sets JSON element into a given JSON at the specific path.

Syntax

<OBJECT> json:setElement(<STRING|OBJECT> json, <STRING> path, <STRING|BOOL|DOUBLE|FLOAT|INT|LONG|OBJECT> json.element)
<OBJECT> json:setElement(<STRING|OBJECT> json, <STRING> path, <STRING|BOOL|DOUBLE|FLOAT|INT|LONG|OBJECT> json.element, <STRING> key)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe JSON to which a JSON element needs to be added/replaced.STRING OBJECTNoYes
pathThe JSON path where the JSON element should be added/replaced.STRINGNoYes
json.elementThe JSON element being added.STRING BOOL DOUBLE FLOAT INT LONG OBJECTNoYes
keyThe key to be used to refer the newly added element in the input JSON.Assumes the element is added to a JSON array, or the element selected by the JSON path will be updated.STRINGYesYes

EXAMPLE 1

json:setElement(json, '$', "{'country' : 'USA'}", 'address')

If the json is the format {'name' : 'John', 'married' : true},the function updates the json as {'name' : 'John', 'married' : true, 'address' : {'country' : 'USA'}} by adding address element and returns the updated JSON.

EXAMPLE 2

json:setElement(json, '$', 40, 'age')

If the json is the format {'name' : 'John', 'married' : true},the function updates the json as {'name' : 'John', 'married' : true, 'age' : 40} by adding age element and returns the updated JSON.

EXAMPLE 3

json:setElement(json, '$', 45, 'age')

If the json is the format {'name' : 'John', 'married' : true, 'age' : 40}, the function updates the json as {'name' : 'John', 'married' : true, 'age' : 45} by replacing age element and returns the updated JSON.

EXAMPLE 4

json:setElement(json, '$.items', 'book')

If the json is the format {'name' : 'Stationary', 'items' : ['pen', 'pencil']}, the function updates the json as {'name' : 'John', 'items' : ['pen', 'pencil', 'book']} by adding book in the items array and returns the updated JSON.

EXAMPLE 5

json:setElement(json, '$.item', 'book')

If the json is the format {'name' : 'Stationary', 'item' : 'pen'}, the function updates the json as {'name' : 'John', 'item' : 'book'} by replacing item element and returns the updated JSON.

EXAMPLE 6

json:setElement(json, '$.address', 'city', 'SF')

If the json is the format {'name' : 'John', 'married' : true},the function will not update, but returns the original JSON as there are no valid path for $.address.

toObject (Function)

Function generate JSON object from the given JSON string.

Syntax

<OBJECT> json:toObject(<STRING> json)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonA valid JSON string that needs to be converted to a JSON object.STRINGNoYes

EXAMPLE 1

json:toJson(json)

This returns the JSON object corresponding to the given JSON string.

toString (Function)

Function generates a JSON string corresponding to a given JSON object.

Syntax

<STRING> json:toString(<OBJECT> json)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonA valid JSON object to generates a JSON string.OBJECTNoYes

EXAMPLE 1

json:toString(json)

This returns the JSON string corresponding to a given JSON object.

tokenize (Stream Processor)

Stream processor tokenizes the given JSON into to multiple JSON string elements and sends them as separate events.

Syntax

json:tokenize(<STRING|OBJECT> json, <STRING> path)
json:tokenize(<STRING|OBJECT> json, <STRING> path, <BOOL> fail.on.missing.attribute)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe input JSON that needs to be tokenized.STRING OBJECTNoYes
pathThe path of the set of elements that will be tokenized.STRINGNoYes
fail.on.missing.attributeIf there are no element on the given path, when set to true the system will drop the event, and when set to false the system will pass null value to the jsonElement output attribute.trueBOOLYesNo

Extra Return Attributes

NameDescriptionPossible Types
jsonElementThe JSON element retrieved based on the given path will be returned as a JSON string. If the path selects a JSON array then the system returns each element in the array as a JSON string via a separate events.STRING

EXAMPLE 1

CREATE STREAM InputStream (json string, path string);

@info(name = 'query1')
insert into OutputStream
select path, jsonElement
from InputStream#json:tokenizeAsObject(json, path);

If the input json is {name:'John', enrolledSubjects:['Mathematics', 'Physics']}, and the path is passed as $.enrolledSubjects then for both the elements in the selected JSON array, it generates it generates events as ('$.enrolledSubjects', 'Mathematics'), and ('$.enrolledSubjects', 'Physics'). For the same input JSON, if the path is passed as $.name then it will only produce one event ('$.name', 'John') as the path provided a single JSON element.

EXAMPLE 2

CREATE STREAM InputStream (json string, path string);

@info(name = 'query1')
insert into OutputStream
select path, jsonElement
from InputStream#json:tokenizeAsObject(json, path, true);

If the input json is {name:'John', age:25},and the path is passed as $.salary then the system will produce ('$.salary', null), as the fail.on.missing.attribute is true and there are no matching element for $.salary.

tokenizeAsObject (Stream Processor)

Stream processor tokenizes the given JSON into to multiple JSON object elements and sends them as separate events.

Syntax

json:tokenizeAsObject(<STRING|OBJECT> json, <STRING> path)
json:tokenizeAsObject(<STRING|OBJECT> json, <STRING> path, <BOOL> fail.on.missing.attribute)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jsonThe input JSON that needs to be tokenized.STRING OBJECTNoYes
pathThe path of the set of elements that will be tokenized.STRINGNoYes
fail.on.missing.attributeIf there are no element on the given path, when set to true the system will drop the event, and when set to false the system will pass null value to the jsonElement output attribute.trueBOOLYesNo

Extra Return Attributes

NameDescriptionPossible Types
jsonElementThe JSON element retrieved based on the given path will be returned as a JSON object. If the path selects a JSON array then the system returns each element in the array as a JSON object via a separate events.OBJECT

EXAMPLE 1

CREATE STREAM InputStream (json string, path string);

@info(name = 'query1')
insert into OutputStream
select path, jsonElement
from InputStream#json:tokenizeAsObject(json, path);

If the input json is {name:'John', enrolledSubjects:['Mathematics', 'Physics']}, and the path is passed as $.enrolledSubjects then for both the elements in the selected JSON array, it generates it generates events as ('$.enrolledSubjects', 'Mathematics'), and ('$.enrolledSubjects', 'Physics'). For the same input JSON, if the path is passed as $.name then it will only produce one event ('$.name', 'John') as the path provided a single JSON element.

EXAMPLE 2

CREATE STREAM InputStream (json string, path string);

@info(name = 'query1')
insert into OutputStream
select path, jsonElement
from InputStream#json:tokenizeAsObject(json, path, true);

If the input json is {name:'John', age:25},and the path is passed as $.salary then the system will produce ('$.salary', null), as the fail.on.missing.attribute is true and there are no matching element for $.salary.

List

collect (Aggregate Function)

Collects multiple values to construct a list.

Syntax

<OBJECT> list:collect(<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)
<OBJECT> list:collect(<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value, <BOOL> is.distinct)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
valueValue of the list elementOBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes
is.distinctIf true only distinct elements are collectedfalseBOOLYesYes

EXAMPLE 1

insert into OutputStream
select list:collect(symbol) as stockSymbols
from StockStream WINDOW TUMBLING_LENGTH(10);

For the window expiry of 10 events, the collect() function will collect attributes of symbol to a single list and return as stockSymbols.

merge (Aggregate Function)

Collects multiple lists to merge as a single list.

Syntax

<OBJECT> list:merge(<OBJECT> list)
<OBJECT> list:merge(<OBJECT> list, <BOOL> is.distinct)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listList to be mergedOBJECTNoYes
is.distinctWhether to return list with distinct valuesfalseBOOLYesYes

EXAMPLE 1

insert into OutputStream
select list:merge(list) as stockSymbols
from StockStream WINDOW TUMBLING_LENGTH(2);

For the window expiry of 2 events, the merge() function will collect attributes of list and merge them to a single list, returned as stockSymbols.

add (Function)

Function returns the updated list after adding the given value.

Syntax

<OBJECT> list:add(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)
<OBJECT> list:add(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value, <INT> index)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list to which the value should be added.OBJECTNoYes
valueThe value to be added.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes
indexThe index in which the value should to be added.lastINTYesYes

EXAMPLE 1

list:add(stockSymbols, 'IBM')

Function returns the updated list after adding the value IBM in the last index.

EXAMPLE 2

list:add(stockSymbols, 'IBM', 0)

Function returns the updated list after adding the value IBM in the 0th index`.

addAll (Function)

Function returns the updated list after adding all the values from the given list.

Syntax

<OBJECT> list:addAll(<OBJECT> to.list, <OBJECT> from.list)
<OBJECT> list:addAll(<OBJECT> to.list, <OBJECT> from.list, <BOOL> is.distinct)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
to.listThe list into which the values need to copied.OBJECTNoYes
from.listThe list from which the values are copied.OBJECTNoYes
is.distinctIf true returns list with distinct valuesfalseBOOLYesYes

EXAMPLE 1

list:putAll(toList, fromList)

If toList contains values (IBM, gdn), and if fromList contains values (IBM, XYZ) then the function returns updated toList with values (IBM, gdn, IBM, XYZ).

EXAMPLE 2

list:putAll(toList, fromList, true)

If toList contains values (IBM, gdn), and if fromList contains values (IBM, XYZ) then the function returns updated toList with values (IBM, gdn, XYZ).

clear (Function)

Function returns the cleared list.

Syntax

<OBJECT> list:clear(<OBJECT> list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list which needs to be clearedOBJECTNoYes

EXAMPLE 1

list:clear(stockDetails)

Returns an empty list.

clone (Function)

Function returns the cloned list.

Syntax

<OBJECT> list:clone(<OBJECT> list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list to which needs to be cloned.OBJECTNoYes

EXAMPLE 1

list:clone(stockSymbols)

Function returns cloned list of stockSymbols.

contains (Function)

Function checks whether the list contains the specific value.

Syntax

<BOOL> list:contains(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be checked on whether it contains the value or not.OBJECTNoYes
valueThe value that needs to be checked.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:contains(stockSymbols, 'IBM')

Returns true if the stockSymbols list contains value IBM else it returns false.

containsAll (Function)

Function checks whether the list contains all the values in the given list.

Syntax

<BOOL> list:containsAll(<OBJECT> list, <OBJECT> given.list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be checked on whether it contains all the values or not.OBJECTNoYes
given.listThe list which contains all the values to be checked.OBJECTNoYes

EXAMPLE 1

list:containsAll(stockSymbols, latestStockSymbols)

Returns true if the stockSymbols list contains values in latestStockSymbols else it returns false.

create (Function)

Function creates a list containing all values provided.

Syntax

<OBJECT> list:create()
<OBJECT> list:create(<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value1)
<OBJECT> list:create(<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value1, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
value1Value 1OBJECT INT LONG FLOAT DOUBLE BOOL STRINGYesYes

EXAMPLE 1

list:create(1, 2, 3, 4, 5, 6)

This returns a list with values 1, 2, 3, 4, 5 and 6.

EXAMPLE 2

list:create()

This returns an empty list.

get (Function)

Function returns the value at the specific index, null if index is out of range.

Syntax

<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> list:get(<OBJECT> list, <INT> index)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listAttribute containing the listOBJECTNoYes
indexIndex of the elementINTNoYes

EXAMPLE 1

list:get(stockSymbols, 1)

This returns the element in the 1st index in the stockSymbols list.

indexOf (Function)

Function returns the last index of the given element.

Syntax

<INT> list:indexOf(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list to be checked to get index of an element.OBJECTNoYes
valueValue for which last index needs to be identified.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:indexOf(stockSymbols. `IBM`)

Returns the last index of the element IBM if present else it returns -1.

isEmpty (Function)

Function checks if the list is empty.

Syntax

<BOOL> list:isEmpty(<OBJECT> list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be checked whether it's empty or not.OBJECTNoYes

EXAMPLE 1

list:isEmpty(stockSymbols)

Returns true if the stockSymbols list is empty else it returns false.

isList (Function)

Function checks if the object is type of a list.

Syntax

<BOOL> list:isList(<OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe argument the need to be determined whether it`s a list or not.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:isList(stockSymbols)

Returns true if the stockSymbols is and an instance of java.util.List else it returns false.

lastIndexOf (Function)

Function returns the index of the given value.

Syntax

<INT> list:lastIndexOf(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list to be checked to get index of an element.OBJECTNoYes
valueValue for which last index needs to be identified.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:lastIndexOf(stockSymbols. `IBM`)

Returns the last index of the element IBM if present else it returns -1.

remove (Function)

Function returns the updated list after removing the element with the specified value.

Syntax

<OBJECT> list:remove(<OBJECT> list, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be updated.OBJECTNoYes
valueThe value of the element that needs to removed.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:remove(stockSymbols, 'IBM')

This returns the updated list, stockSymbols after stockSymbols the value IBM.

removeAll (Function)

Function returns the updated list after removing all the element with the specified list.

Syntax

<OBJECT> list:removeAll(<OBJECT> list, <OBJECT> given.list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be updated.OBJECTNoYes
given.listThe list with all the elements that needs to removed.OBJECTNoYes

EXAMPLE 1

list:removeAll(stockSymbols, latestStockSymbols)

This returns the updated list, stockSymbols after removing all the values in latestStockSymbols.

removeByIndex (Function)

Function returns the updated list after removing the element with the specified index.

Syntax

<OBJECT> list:removeByIndex(<OBJECT> list, <INT> index)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be updated.OBJECTNoYes
indexThe index of the element that needs to removed.INTNoYes

EXAMPLE 1

list:removeByIndex(stockSymbols, 0)

This returns the updated list, stockSymbols after removing value at 0 th index.

retainAll (Function)

Function returns the updated list after retaining all the elements in the specified list.

Syntax

<OBJECT> list:retainAll(<OBJECT> list, <OBJECT> given.list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list that needs to be updated.OBJECTNoYes
given.listThe list with all the elements that needs to reatined.OBJECTNoYes

EXAMPLE 1

list:retainAll(stockSymbols, latestStockSymbols)

This returns the updated list, stockSymbols after retaining all the values in latestStockSymbols.

setValue (Function)

Function returns the updated list after replacing the element in the given index by the given value.

Syntax

<OBJECT> list:setValue(<OBJECT> list, <INT> index, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list to which the value should be updated.OBJECTNoYes
indexThe index in which the value should to be updated.INTNoYes
valueThe value to be updated with.OBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

list:set(stockSymbols, 0, 'IBM')

Function returns the updated list after replacing the value at 0th index with the value IBM

size (Function)

Function to return the size of the list.

Syntax

<INT> list:size(<OBJECT> list)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list for which size should be returned.OBJECTNoYes

EXAMPLE 1

list:size(stockSymbols)

Returns size of the stockSymbols list.

sort (Function)

Function returns lists sorted in ascending or descending order.

Syntax

<OBJECT> list:sort(<OBJECT> list)
<OBJECT> list:sort(<OBJECT> list, <STRING> order)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listThe list which should be sorted.OBJECTNoYes
orderOrder in which the list needs to be sorted (ASC/DESC/REV).REVSTRINGYesNo

EXAMPLE 1

list:sort(stockSymbols)

Function returns the sorted list in ascending order.

EXAMPLE 2

list:sort(stockSymbols, 'DESC')

Function returns the sorted list in descending order.

tokenize (Stream Processor)

Tokenize the list and return each key, value as new attributes in events

Syntax

list:tokenize(<OBJECT> list)
list:tokenize(<OBJECT> list, <OBJECT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
listArray list which needs to be tokenizedOBJECTNoYes

Extra Return Attributes

NameDescriptionPossible Types
indexIndex of an entry consisted in the listINT
valueValue of an entry consisted in the listOBJECT

EXAMPLE 1

list:tokenize(customList)

If custom list contains (gdn, IBM, XYZ) elements, then tokenize function will return 3 events with value attributes gdn, IBM and XYZ respectively.

Map

collect (Aggregate Function)

Collect multiple key-value pairs to construct a map. Only distinct keys are collected, if a duplicate key arrives, it overrides the old value

Syntax

<OBJECT> map:collect(<INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key, <OBJECT|INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
keyKey of the map entryINT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes
valueValue of the map entryOBJECT INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

insert into OutputStream
select map:collect(symbol, price) as stockDetails
from StockStream WINDOW TUMBLING_LENGTH(10);

For the window expiry of 10 events, the collect() function will collectattributes of key and value to a single map and return as stockDetails.

merge (Aggregate Function)

Collect multiple maps to merge as a single map. Only distinct keys are collected, if a duplicate key arrives, it overrides the old value.

Syntax

<OBJECT> map:merge(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapMaps to be collectedOBJECTNoYes

EXAMPLE 1

insert into OutputStream
select map:merge(map) as stockDetails
from StockStream WINDOW TUMBLING_LENGTH(2);

For the window expiry of 2 events, the merge() function will collect attributes of map and merge them to a single map, returned as stockDetails.

clear (Function)

Function returns the cleared map.

Syntax

<OBJECT> map:clear(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map which needs to be clearedOBJECTNoYes

EXAMPLE 1

map:clear(stockDetails)

Returns an empty map.

clone (Function)

Function returns the cloned map.

Syntax

<OBJECT> map:clone(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map to which needs to be cloned.OBJECTNoYes

EXAMPLE 1

map:clone(stockDetails)

Function returns cloned map of stockDetails.

combineByKey (Function)

Function returns the map after combining all the maps given as parameters, such that the keys, of all the maps will be matched with an Array list of values from each map respectively.

Syntax

<OBJECT> map:combineByKey(<OBJECT> map, <OBJECT> map)
<OBJECT> map:combineByKey(<OBJECT> map, <OBJECT> map, <OBJECT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map into which the key-values need to copied.OBJECTNoYes

EXAMPLE 1

map:combineByKey(map1, map2)

If map2 contains key-value pairs (symbol: gdn), (volume :100), and if map2 contains key-value pairs (symbol: IBM),(price : 12), then the function returns the map with key value pairs as follows, (symbol: ArrayList(gdn, IBM)), (volume: ArrayList(100, null)) and (price: ArrayList(null, 12))

containsKey (Function)

Function checks if the map contains the key.

Syntax

<BOOL> map:containsKey(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|BOOL|STRING> key)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map the needs to be checked on containing the key or not.OBJECTNoYes
keyThe key to be checked.INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

map:containsKey(stockDetails, '1234')

Returns true if the stockDetails map contains key 1234 else it returns false.

containsValue (Function)

Function checks if the map contains the value.

Syntax

<BOOL> map:containsValue(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map the needs to be checked on containing the value or not.OBJECTNoYes
valueThe value to be checked.INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

map:containsValue(stockDetails, 'IBM')

Returns true if the stockDetails map contains value IBM else it returns false.

create (Function)

Function creates a map pairing the keys and their corresponding values.

Syntax

<OBJECT> map:create()
<OBJECT> map:create(<OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key1, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> value1)
<OBJECT> map:create(<OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key1, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> value1, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
key1Key 1-OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGYesYes
value1Value 1-OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGYesYes

EXAMPLE 1

map:create(1, 'one', 2, 'two', 3, 'three')

This returns a map with keys 1, 2, 3 mapped with their corresponding values, one, two, three.

EXAMPLE 2

map:create()

This returns an empty map.

createFromJSON (Function)

Function returns the map created by pairing the keys with their corresponding values given in the JSON string.

Syntax

<OBJECT> map:createFromJSON(<STRING> json.string)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
json.stringJSON as a string, which is used to create the map.STRINGNoYes

EXAMPLE 1

map:createFromJSON("{€˜symbol' : 'IBM', 'price' : 200, 'volume' : 100}")

This returns a map with the keys symbol, price, and volume, and their values, IBM, 200 and 100 respectively.

createFromXML (Function)

Function returns the map created by pairing the keys with their corresponding values,given as an XML string.

Syntax

<OBJECT> map:createFromXML(<STRING> xml.string)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
xml.stringThe XML string, which is used to create the map.STRINGNoYes

EXAMPLE 1

map:createFromXML("<stock>
<symbol>IBM</symbol>
<price>200</price>
<volume>100</volume>
</stock>")

This returns a map with the keys symbol, price, volume, and with their values IBM, 200 and 100 respectively.

get (Function)

Function returns the value corresponding to the given key from the map.

Syntax

<OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> map:get(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key)
<OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> map:get(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> default.value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map from where the value should be obtained.OBJECTNoYes
keyThe key to fetch the value.INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes
default.valueThe value to be returned if the map does not have the key.OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGYesYes

EXAMPLE 1

map:get(companyMap, 1)

If the companyMap has key 1 and value ABC in it's set of key value pairs. The function returns ABC.

EXAMPLE 2

map:get(companyMap, 2)

If the companyMap does not have any value for key 2 then the function returns null.

EXAMPLE 3

map:get(companyMap, 2, 'two')

If the companyMap does not have any value for key 2 then the function returns two.

isEmpty (Function)

Function checks if the map is empty.

Syntax

<BOOL> map:isEmpty(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map the need to be checked whether it's empty or not.OBJECTNoYes

EXAMPLE 1

map:isEmpty(stockDetails)

Returns true if the stockDetails map is empty else it returns false.

isMap (Function)

Function checks if the object is type of a map.

Syntax

<BOOL> map:isMap(<OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> arg)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe argument the need to be determined whether it's a map or not.OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes

EXAMPLE 1

map:isMap(stockDetails)

Returns true if the stockDetails is and an instance of java.util.Map else it returns false.

keys (Function)

Function to return the keys of the map as a list.

Syntax

<OBJECT> map:keys(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map from which list of keys to be returned.OBJECTNoYes

EXAMPLE 1

map:keys(stockDetails)

Returns keys of the stockDetails map.

put (Function)

Function returns the updated map after adding the given key-value pair. If the key already exist in the map the key is updated with the new value.

Syntax

<OBJECT> map:put(<OBJECT> map, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map to which the value should be added.OBJECTNoYes
keyThe key to be added.OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes
valueThe value to be added.OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes

EXAMPLE 1

map:put(stockDetails , 'IBM' , '200')

Function returns the updated map named stockDetails after adding the value 200 with the key IBM.

putAll (Function)

Function returns the updated map after adding all the key-value pairs from another map. If there are duplicate keys, the key will be assignedn new values from the map that's being copied.

Syntax

<OBJECT> map:putAll(<OBJECT> to.map, <OBJECT> from.map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
to.mapThe map into which the key-values need to copied.OBJECTNoYes
from.mapThe map from which the key-values are copied.OBJECTNoYes

EXAMPLE 1

map:putAll(toMap, fromMap)

If toMap contains key-value pairs (symbol: gdn), (volume: 100), and if fromMap contains key-value pairs (symbol: IBM),(price : 12), then the function returns updated toMap with key-value pairs (symbol: IBM), (price : 12), (volume :100).

putIfAbsent (Function)

Function returns the updated map after adding the given key-value pair if key is absent.

Syntax

<OBJECT> map:putIfAbsent(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|BOOL|STRING> key, <INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map to which the value should be added.OBJECTNoYes
keyThe key to be added.INT LONG FLOAT DOUBLE BOOL STRINGNoYes
valueThe value to be added.INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

map:putIfAbsent(stockDetails , 1234 , 'IBM')

Function returns the updated map named stockDetails after adding the value IBM with the key 1234 if key is absent from the original map.

remove (Function)

Function returns the updated map after removing the element with the specified key.

Syntax

<OBJECT> map:remove(<OBJECT> map, <OBJECT|INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map that needs to be updated.OBJECTNoYes
keyThe key of the element that needs to removed.OBJECT INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes

EXAMPLE 1

map:remove(stockDetails, 1234)

This returns the updated map, stockDetails after removing the key-value pair corresponding to the key 1234.

replace (Function)

Function returns the updated map after replacing the given key-value pair only if key is present.

Syntax

<OBJECT> map:replace(<OBJECT> map, <INT|LONG|FLOAT|DOUBLE|FLOAT|BOOL|STRING> key, <INT|LONG|FLOAT|DOUBLE|BOOL|STRING> value)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map to which the key-value should be replaced.OBJECTNoYes
keyThe key to be replaced.INT LONG FLOAT DOUBLE FLOAT BOOL STRINGNoYes
valueThe value to be replaced.INT LONG FLOAT DOUBLE BOOL STRINGNoYes

EXAMPLE 1

map:replace(stockDetails , 1234 , 'IBM')

Function returns the updated map named stockDetails after replacing the value IBM with the key 1234 if present.

replaceAll (Function)

Function returns the updated map after replacing all the key-value pairs from another map, if keys are present.

Syntax

<OBJECT> map:replaceAll(<OBJECT> to.map, <OBJECT> from.map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
to.mapThe map into which the key-values need to copied.OBJECTNoYes
from.mapThe map from which the key-values are copied.OBJECTNoYes

EXAMPLE 1

map:replaceAll(toMap, fromMap)

If toMap contains key-value pairs (symbol: gdn), (volume: 100), and if fromMap contains key-value pairs (symbol: IBM), (price : 12), then the function returns updated toMap with key-value pairs (symbol: IBM), (volume : 100).

size (Function)

Function to return the size of the map.

Syntax

<INT> map:size(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map for which size should be returned.OBJECTNoYes

EXAMPLE 1

map:size(stockDetails)

Returns size of the stockDetails map.

toJSON (Function)

Function converts a map into a JSON object and returns the JSON as a string.

Syntax

<STRING> map:toJSON(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map that needs to be converted to JSONOBJECTNoYes

EXAMPLE 1

map:toJSON(company)

If company is a map with key-value pairs, (symbol:gdn),(volume : 100), and (price, 200), it returns the JSON string {"symbol" : "gdn", "volume" : 100 , "price" : 200}.

toXML (Function)

Function returns the map as an XML string.

Syntax

<STRING> map:toXML(<OBJECT> map)
<STRING> map:toXML(<OBJECT> map, <OBJECT|STRING> root.element.name)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map that needs to be converted to XML.OBJECTNoYes
root.element.nameThe root element of the map.The XML root element will be ignoredOBJECT STRINGYesYes

EXAMPLE 1

toXML(company, 'abcCompany')

If company is a map with key-value pairs, (symbol : gdn),(volume : 100), and (price : 200), this function returns XML as a string, <abcCompany><symbol>gdn</symbol><volume><100></volume><price>200</price></abcCompany>.

EXAMPLE 2

toXML(company)

If company is a map with key-value pairs, (symbol : gdn), (volume : 100), and (price : 200), this function returns XML without root element as a string, <symbol>gdn</symbol><volume><100></volume><price>200</price>.

values (Function)

Function to return the values of the map.

Syntax

<OBJECT> map:values(<OBJECT> map)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapThe map from which list if values to be returned.OBJECTNoYes

EXAMPLE 1

map:values(stockDetails)

Returns values of the stockDetails map.

tokenize (Stream Processor)

Tokenize the map and return each key, value as new attributes in events

Syntax

map:tokenize(<OBJECT> map)
map:tokenize(<OBJECT> map, <OBJECT> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
mapHash map containing key value pairsOBJECTNoYes

Extra Return Attributes

NameDescriptionPossible Types
keyKey of an entry consisted in the mapOBJECT
valueValue of an entry consisted in the map. If more than one map is given, then an Array List of values from each map is returned for the value attribute.OBJECT

EXAMPLE 1

CREATE STREAM StockStream(symbol string, price float);

insert into TempStream
select map:collect(symbol, price) as symbolPriceMap
from StockStream WINDOW TUMBLING_LENGTH(2);

insert into SymbolStream
select key, value
from TempStream#map:tokenize(customMap);

Based on the length batch window, symbolPriceMap will collect two events, and the map will then again tokenized to give 2 events with key and values being symbol name and price respectively.

Math

percentile (Aggregate Function)

This functions returns the pth percentile value of a given argument.

Syntax

<DOUBLE> math:percentile(<INT|LONG|FLOAT|DOUBLE> arg, <DOUBLE> p)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
argThe value of the parameter whose percentile should be found.INT LONG FLOAT DOUBLENoYes
pEstimate of the percentile to be found (pth percentile) where p is any number greater than 0 or lesser than or equal to 100.DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (sensorId int, temperature double);

insert into OutMediationStream
select math:percentile(temperature, 97.0) as percentile
from InValueStream;

This function returns the percentile value based on the argument given. For example, math:percentile(temperature, 97.0) returns the 97th percentile value of all the temperature events.

abs (Function)

This function returns the absolute value of the given parameter. It wraps the java.lang.Math.abs() function.

Syntax

<DOUBLE> math:abs(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The parameter whose absolute value is found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:abs(inValue) as absValue
from InValueStream;

Irrespective of whether the invalue in the input stream holds a value of abs(3) or abs(-3),the function returns 3 since the absolute value of both 3 and -3 is 3. The result directed to OutMediationStream stream.

acos (Function)

If -1 \<= p1 \<= 1, this function returns the arc-cosine (inverse cosine) value of p1.If the domain is invalid, it returns NULL. The value returned is in radian scale. This function wraps the java.lang.Math.acos() function.

Syntax

<DOUBLE> math:acos(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose arc-cosine (inverse cosine) value is found.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:acos(inValue) as acosValue
from InValueStream;

If the inValue in the input stream is given, the function calculates the arc-cosine value of it and returns the arc-cosine value to the output stream, OutMediationStream. For example, acos(0.5) returns 1.0471975511965979.

asin (Function)

If -1 \<= p1 \<= 1, this function returns the arc-sin (inverse sine) value of p1. If the domain is invalid, it returns NULL. The value returned is in radian scale. This function wraps the java.lang.Math.asin() function.

Syntax

<DOUBLE> math:asin(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose arc-sin (inverse sine) value is found.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:asin(inValue) as asinValue
from InValueStream;

If the inValue in the input stream is given, the function calculates the arc-sin value of it and returns the arc-sin value to the output stream, OutMediationStream. For example, asin(0.5) returns 0.5235987755982989.

atan (Function)

1. If a single p1 is received, this function returns the arc-tangent (inverse tangent) value of p1. 2. If p1 is received along with an optional p1, it considers them as x and y coordinates and returns the arc-tangent (inverse tangent) value. The returned value is in radian scale. This function wraps the java.lang.Math.atan() function.

Syntax

<DOUBLE> math:atan(<INT|LONG|FLOAT|DOUBLE> p1)
<DOUBLE> math:atan(<INT|LONG|FLOAT|DOUBLE> p1, <INT|LONG|FLOAT|DOUBLE> p2)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose arc-tangent (inverse tangent) is found. If the optional second parameter is given this represents the x coordinate of the (x,y) coordinate pair.INT LONG FLOAT DOUBLENoYes
p2This optional parameter represents the y coordinate of the (x,y) coordinate pair.0DINT LONG FLOAT DOUBLEYesYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double, inValue2 double);

insert into OutMediationStream
select math:atan(inValue1, inValue2) as convertedValue
from InValueStream;

If the inValue1 in the input stream is given, the function calculates the arc-tangent value of it and returns the arc-tangent value to the output stream, OutMediationStream. If both the inValue1 and inValue2 are given, then the function considers them to be x and y coordinates respectively and returns the calculated arc-tangent value to the output stream, OutMediationStream. For example, atan(12d, 5d) returns 1.1760052070951352.

bin (Function)

This function returns a string representation of the p1 argument, that is of either integer or long data type, as an unsigned integer in base 2. It wraps the java.lang.Integer.toBinaryString and java.lang.Long.toBinaryString` methods.

Syntax

<STRING> math:bin(<INT|LONG> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value in either integer or long, that should be converted into an unsigned integer of base 2.INT LONGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue long);

insert into OutMediationStream
select math:bin(inValue) as binValue
from InValueStream;

If the inValue in the input stream is given, the function converts it into an unsigned integer in base 2 and directs the output to the output stream, OutMediationStream. For example, bin(9) returns 1001.

cbrt (Function)

This function returns the cube-root of p1 which is in radians. It wraps the java.lang.Math.cbrt() function.

Syntax

<DOUBLE> math:cbrt(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose cube-root should be found. Input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:cbrt(inValue) as cbrtValue
from InValueStream;

If the inValue is given, the function calculates the cube-root value for the same and directs the output to the output stream, OutMediationStream. For example, cbrt(17d) returns 2.5712815906582356.

ceil (Function)

This function returns the smallest double value, i.e., the closest to the negative infinity, that is greater than or equal to the p1 argument, and is equal to a mathematical integer. It wraps the java.lang.Math.ceil() method.

Syntax

<DOUBLE> math:ceil(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose ceiling value is found.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:ceil(inValue) as ceilingValue
from InValueStream;

This function calculates the ceiling value of the given inValue and directs the result to OutMediationStream output stream. For example, ceil(423.187d) returns 424.0.

conv (Function)

This function converts a from the fromBase base to the toBase base.

Syntax

<STRING> math:conv(<STRING> a, <INT> from.base, <INT> to.base)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
aThe value whose base should be changed. Input should be given as a String.STRINGNoYes
from.baseThe source base of the input parameter a.INTNoYes
to.baseThe target base that the input parameter a should be converted into.INTNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue string,fromBase int,toBase int);

insert into OutMediationStream
select math:conv(inValue,fromBase,toBase) as convertedValue
from InValueStrea;

If the inValue in the input stream is given, and the base in which it currently resides in and the base to which it should be converted to is specified, the function converts it into a string in the target base and directs it to the output stream, OutMediationStream. For example, conv("7f", 16, 10) returns "127".

copySign (Function)

This function returns a value of an input with the received magnitude and sign of another input. It wraps the java.lang.Math.copySign() function.

Syntax

<DOUBLE> math:copySign(<INT|LONG|FLOAT|DOUBLE> magnitude, <INT|LONG|FLOAT|DOUBLE> sign)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
magnitudeThe magnitude of this parameter is used in the output attribute.INT LONG FLOAT DOUBLENoYes
signThe sign of this parameter is used in the output attribute.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double, inValue2 double);

insert into OutMediationStream
select math:copySign(inValue1,inValue2) as copysignValue
from InValueStream;

If two values are provided as inValue1 and inValue2, the function copies the magnitude and sign of the second argument into the first one and directs the result to the output stream, OutMediatonStream. For example, copySign(5.6d, -3.0d) returns -5.6.

cos (Function)

This function returns the cosine of p1 which is in radians. It wraps the java.lang.Math.cos() function.

Syntax

<DOUBLE> math:cos(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose cosine value should be found.The input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:cos(inValue) as cosValue
from InValueStream;

If the inValue is given, the function calculates the cosine value for the same and directs the output to the output stream, OutMediationStream. For example, cos(6d) returns 0.9601702866503661.

cosh (Function)

This function returns the hyperbolic cosine of p1 which is in radians. It wraps the java.lang.Math.cosh() function.

Syntax

<DOUBLE> math:cosh(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose hyperbolic cosine should be found. The input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:cosh(inValue) as cosValue
from InValueStream;

If the inValue is given, the function calculates the hyperbolic cosine value for the same and directs the output to the output stream, OutMediationStream. For example, cosh (6d) returns 201.7156361224559.

e (Function)

This function returns the java.lang.Math.E constant, which is the closest double value to e, where e is the base of the natural logarithms.

Syntax

<DOUBLE> math:e()

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:e() as eValue
from InValueStream;

This function returns the constant, 2.7182818284590452354 which is then closest double value to e and directs the output to OutMediationStream output stream.

exp (Function)

This function returns the Euler's number e raised to the power of p1. It wraps the java.lang.Math.exp() function.

Syntax

<DOUBLE> math:exp(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The power that the Euler's number e is raised to.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:exp(inValue) as expValue
from InValueStream;

If the inValue in the inputstream holds a value, this function calculates the corresponding Euler's number e and directs it to the output stream, OutMediationStream. For example, exp(10.23) returns 27722.51006805505.

floor (Function)

This function wraps the java.lang.Math.floor() function and returns the largest value, i.e., closest to the positive infinity, that is less than or equal to p1, and is equal to a mathematical integer.

Syntax

<DOUBLE> math:floor(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose floor value should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:floor(inValue) as floorValue
from InValueStream;

This function calculates the floor value of the given inValue input and directs the output to the OutMediationStream output stream. For example, (10.23) returns 10.0.

getExponent (Function)

This function returns the unbiased exponent that is used in the representation of p1. This function wraps the java.lang.Math.getExponent() function.

Syntax

<INT> math:getExponent(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of whose unbiased exponent representation should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:getExponent(inValue) as expValue
from InValueStream;

This function calculates the unbiased exponent of a given input, inValue and directs the result to the OutMediationStream output stream. For example, getExponent(60984.1) returns 15.

hex (Function)

This function wraps the java.lang.Double.toHexString() function. It returns a hexadecimal string representation of the input, p1`.

Syntax

<STRING> math:hex(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose hexadecimal value should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue int);

insert into OutMediationStream
select math:hex(inValue) as hexString
from InValueStream;

If the inValue in the input stream is provided, the function converts this into its corresponding hexadecimal format and directs the output to the output stream, OutMediationStream. For example, hex(200) returns "c8".

isInfinite (Function)

This function wraps the java.lang.Float.isInfinite() and java.lang.Double.isInfinite() and returns true if p1 is infinitely large in magnitude and false if otherwise.

Syntax

<BOOL> math:isInfinite(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1This is the value of the parameter that the function determines to be either infinite or finite.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double,inValue2 int);

insert into OutMediationStream
select math:isInfinite(inValue1) as isInfinite
from InValueStream;

If the value given in the inValue in the input stream is of infinitely large magnitude, the function returns the value, true and directs the result to the output stream, OutMediationStream. For example, isInfinite(java.lang.Double.POSITIVE_INFINITY) returns true.

isNan (Function)

This function wraps the java.lang.Float.isNaN() and java.lang.Double.isNaN() functions and returns true if p1 is NaN (Not-a-Number), and returns false if otherwise.

Syntax

<BOOL> math:isNan(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter which the function determines to be either NaN or a number.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double,inValue2 int);

insert into OutMediationStream
select math:isNan(inValue1) as isNaN
from InValueStream;

If the inValue1 in the input stream has a value that is undefined, then the function considers it as an NaN value and directs True to the output stream, OutMediationStream. For example, isNan(java.lang.Math.log(-12d)) returns true.

ln (Function)

This function returns the natural logarithm (base e) of p1.

Syntax

<DOUBLE> math:ln(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose natural logarithm (base e) should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:ln(inValue) as lnValue
from InValueStream;

If the inValue in the input stream is given, the function calculates its natural logarithm (base e) and directs the results to the output stream, OutMeditionStream. For example, ln(11.453) returns 2.438251704415579.

log (Function)

This function returns the logarithm of the received number as per the given base.

Syntax

<DOUBLE> math:log(<INT|LONG|FLOAT|DOUBLE> number, <INT|LONG|FLOAT|DOUBLE> base)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
numberThe value of the parameter whose base should be changed.INT LONG FLOAT DOUBLENoYes
baseThe base value of the ouput.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (number double, base double);

insert into OutMediationStream
select math:log(number, base) as logValue
from InValueStream;

If the number and the base to which it has to be converted into is given in the input stream, the function calculates the number to the base specified and directs the result to the output stream, OutMediationStream. For example, log(34, 2f) returns 5.08746284125034.

log10 (Function)

This function returns the base 10 logarithm of p1.

Syntax

<DOUBLE> math:log10(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose base 10 logarithm should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:log10(inValue) as lnValue
from InValueStream;

If the inValue in the input stream is given, the function calculates the base 10 logarithm of the same and directs the result to the output stream, OutMediatioStream. For example, log10(19.234) returns 1.2840696117100832.

log2 (Function)

This function returns the base 2 logarithm of p1.

Syntax

<DOUBLE> math:log2(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose base 2 logarithm should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:log2(inValue) as lnValue
from InValueStream;

If the inValue in the input stream is given, the function calculates the base 2 logarithm of the same and returns the value to the output stream, OutMediationStream. For example log2(91d) returns 6.507794640198696.

max (Function)

This function returns the greater value of p1 and p2.

Syntax

<DOUBLE> math:max(<INT|LONG|FLOAT|DOUBLE> p1, <INT|LONG|FLOAT|DOUBLE> p2)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1One of the input values to be compared in order to find the larger value of the twoINT LONG FLOAT DOUBLENoYes
p2The input value to be compared with p1 in order to find the larger value of the two.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double,inValue2 int);

insert into OutMediationStream
select math:max(inValue1,inValue2) as maxValue
from InValueStream;

If two input values inValue1, and inValue2 are given, the function compares them and directs the larger value to the output stream, OutMediationStream. For example, max(123.67d, 91) returns 123.67.

min (Function)

This function returns the smaller value of p1 and p2.

Syntax

<DOUBLE> math:min(<INT|LONG|FLOAT|DOUBLE> p1, <INT|LONG|FLOAT|DOUBLE> p2)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1One of the input values that are to be compared in order to find the smaller value.INT LONG FLOAT DOUBLENoYes
p2The input value that is to be compared with p1 in order to find the smaller value.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double,inValue2 int);

insert into OutMediationStream
select math:min(inValue1,inValue2) as minValue
from InValueStream;

If two input values, inValue1 and inValue2 are given, the function compares them and directs the smaller value of the two to the output stream, OutMediationStream. For example, min(123.67d, 91) returns 91.

oct (Function)

This function converts the input parameter p1 to octal.

Syntax

<STRING> math:oct(<INT|LONG> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose octal representation should be found.INT LONGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue long);

insert into OutMediationStream
select math:oct(inValue) as octValue
from InValueStream;

If the inValue in the input stream is given, this function calculates the octal value corresponding to the same and directs it to the output stream, OutMediationStream. For example, oct(99l) returns "143".

parseDouble (Function)

This function returns the double value of the string received.

Syntax

<DOUBLE> math:parseDouble(<STRING> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be converted into a double value.STRINGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue string);

insert into OutMediationStream
select math:parseDouble(inValue) as output
from InValueStream;

If the inValue in the input stream holds a value, this function converts it into the corresponding double value and directs it to the output stream, OutMediationStream. For example, parseDouble("123") returns 123.0.

parseFloat (Function)

This function returns the float value of the received string.

Syntax

<FLOAT> math:parseFloat(<STRING> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be converted into a float value.STRINGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue string);

insert into OutMediationStream
select math:parseFloat(inValue) as output
from InValueStream;

The function converts the input value given in inValue,into its corresponding float value and directs the result into the output stream, OutMediationStream. For example, parseFloat("123") returns 123.0.

parseInt (Function)

This function returns the integer value of the received string.

Syntax

<INT> math:parseInt(<STRING> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be converted to an integer.STRINGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue string);

insert into OutMediationStream
select math:parseInt(inValue) as output
from InValueStream;

The function converts the inValue into its corresponding integer value and directs the output to the output stream, OutMediationStream. For example, parseInt("123") returns 123.

parseLong (Function)

This function returns the long value of the string received.

Syntax

<LONG> math:parseLong(<STRING> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be converted to a long value.STRINGNoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue string);

insert into OutMediationStream
select math:parseLong(inValue) as output
from InValueStream;

The function converts the inValue to its corresponding long value and directs the result to the output stream, OutMediationStream. For example, parseLong("123") returns 123.

pi (Function)

This function returns the java.lang.Math.PI constant, which is the closest value to pi, i.e., the ratio of the circumference of a circle to its diameter.

Syntax

<DOUBLE> math:pi()

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:pi() as piValue
from InValueStream;

pi() always returns 3.141592653589793.

power (Function)

This function raises the given value to a given power.

Syntax

<DOUBLE> math:power(<INT|LONG|FLOAT|DOUBLE> value, <INT|LONG|FLOAT|DOUBLE> to.power)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
valueThe value that should be raised to the power of to.power input parameter.INT LONG FLOAT DOUBLENoYes
to.powerThe power to which the value input parameter should be raised.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue1 double, inValue2 double);

insert into OutMediationStream
select math:power(inValue1,inValue2) as powerValue
from InValueStream;

This function raises the inValue1 to the power of inValue2 and directs the output to the output stream, OutMediationStream. For example, (5.6d, 3.0d) returns 175.61599999999996.

rand (Function)

This returns a stream of pseudo-random numbers when a sequence of calls are sent to the rand(). Optionally, it is possible to define a seed, i.e., rand(seed) using which the pseudo-random numbers are generated. These functions internally use the java.util.Random class.

Syntax

<DOUBLE> math:rand()
<DOUBLE> math:rand(<INT|LONG> seed)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
seedAn optional seed value that will be used to generate the random number sequence.defaultSeedINT LONGYesYes

EXAMPLE 1

CREATE STREAM InValueStream (symbol string, price long, volume long);

insert into OutMediationStream
select math:oct(inValue) as octValue
from InValueStream select symbol, math:rand() as randNumber;

In the example given above, a random double value between 0 and 1 will be generated using math:rand().

round (Function)

This function returns the value of the input argument rounded off to the closest integer/long value.

Syntax

<INT|LONG> math:round(<FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be rounded off to the closest integer/long value.FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:round(inValue) as roundValue
from InValueStream;

The function rounds off inValue1 to the closest int/long value and directs the output to the output stream, OutMediationStream. For example, round(3252.353) returns 3252.

signum (Function)

This returns +1, 0, or -1 for the given positive, zero and negative values respectively. This function wraps the java.lang.Math.signum() function.

Syntax

<INT> math:signum(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value that should be checked to be positive, negative or zero.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:signum(inValue) as sign
from InValueStream;

The function evaluates the inValue given to be positive, negative or zero and directs the result to the output stream, OutMediationStream. For example, signum(-6.32d) returns -1.

sin (Function)

This returns the sine of the value given in radians. This function wraps the java.lang.Math.sin() function.

Syntax

<DOUBLE> math:sin(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose sine value should be found. Input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:sin(inValue) as sinValue
from InValueStream;

The function calculates the sine value of the given inValue and directs the output to the output stream, OutMediationStream. For example, sin(6d) returns -0.27941549819892586.

sinh (Function)

This returns the hyperbolic sine of the value given in radians. This function wraps the java.lang.Math.sinh() function.

Syntax

<DOUBLE> math:sinh(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose hyperbolic sine value should be found. Input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:sinh(inValue) as sinhValue
from InValueStream;

This function calculates the hyperbolic sine value of inValue and directs the output to the output stream, OutMediationStream. For example, sinh(6d) returns 201.71315737027922.

sqrt (Function)

This function returns the square-root of the given value. It wraps the java.lang.Math.sqrt()s function.

Syntax

<DOUBLE> math:sqrt(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose square-root value should be found.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:sqrt(inValue) as sqrtValue
from InValueStream;

The function calculates the square-root value of the inValue and directs the output to the output stream, OutMediationStream. For example, sqrt(4d) returns 2.

tan (Function)

This function returns the tan of the given value in radians. It wraps the java.lang.Math.tan() function.

Syntax

<DOUBLE> math:tan(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose tan value should be found. Input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
select math:tan(inValue) as tanValue
from InValueStream;

This function calculates the tan value of the inValue given and directs the output to the output stream, OutMediationStream. For example, tan(6d) returns -0.29100619138474915.

tanh (Function)

This function returns the hyperbolic tangent of the value given in radians. It wraps the java.lang.Math.tanh() function.

Syntax

<DOUBLE> math:tanh(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The value of the parameter whose hyperbolic tangent value should be found. Input is required to be in radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
from InValueStream
select math:tanh(inValue) as tanhValue;

If the inVaue in the input stream is given, this function calculates the hyperbolic tangent value of the same and directs the output to OutMediationStream stream. For example, tanh(6d) returns 0.9999877116507956.

toDegrees (Function)

This function converts the value given in radians to degrees. It wraps the java.lang.Math.toDegrees() function.

Syntax

<DOUBLE> math:toDegrees(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The input value in radians that should be converted to degrees.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
from InValueStream
select math:toDegrees(inValue) as degreesValue;

The function converts the inValue in the input stream from radians to degrees and directs the output to OutMediationStream output stream. For example, toDegrees(6d) returns 343.77467707849394.

toRadians (Function)

This function converts the value given in degrees to radians. It wraps the java.lang.Math.toRadians() function.

Syntax

<DOUBLE> math:toRadians(<INT|LONG|FLOAT|DOUBLE> p1)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
p1The input value in degrees that should be converted to radians.INT LONG FLOAT DOUBLENoYes

EXAMPLE 1

CREATE STREAM InValueStream (inValue double);

insert into OutMediationStream
from InValueStream
select math:toRadians(inValue) as radiansValue;

This function converts the input, from degrees to radians and directs the result to OutMediationStream output stream. For example, toRadians(6d) returns 0.10471975511965977.

Rdbms

cud (Stream Processor)

This function performs SQL CUD (INSERT, UPDATE, DELETE) queries on data sources.

Syntax

rdbms:cud(<STRING> datasource.name, <STRING> query)
rdbms:cud(<STRING> datasource.name, <STRING> query, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> parameter)
rdbms:cud(<STRING> datasource.name, <STRING> query, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> parameter, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
datasource.nameThe name of the datasource for which the query should be performed.STRINGNoNo
queryThe update, delete, or insert query(formatted according to the relevant database type) that needs to be performed.STRINGNoYes
parameterIf the second parameter is a parametrised SQL query, then stream processor attributes can be passed to set the values of the parametersSTRING BOOL INT DOUBLE FLOAT LONGYesYes

System Parameters

NameDescriptionDefault ValuePossible Parameters
perform.CUD.operationsIf this parameter is set to true, the RDBMS CUD function is enabled to perform CUD operations.falsetrue false

Extra Return Attributes

NameDescriptionPossible Types
numRecordsThe number of records manipulated by the query.INT

EXAMPLE 1

insert into  RecordStream
select numRecords
from TriggerStream#rdbms:cud("SAMPLE_DB", "UPDATE Customers_Table SET customerName='abc' where customerName='xyz'");

This query updates the events from the input stream named TriggerStream with an additional attribute named numRecords, of which the value indicates the number of records manipulated. The updated events are inserted into an output stream named RecordStream.

EXAMPLE 2

insert into  RecordStream
select numRecords
from TriggerStream#rdbms:cud("SAMPLE_DB", "UPDATE Customers_Table SET customerName=? where customerName=?", changedName, previousName);

This query updates the events from the input stream named TriggerStream with an additional attribute named numRecords, of which the value indicates the number of records manipulated. The updated events are inserted into an output stream named RecordStream. Here the values of attributes changedName and previousName in the event will be set to the query.

query (Stream Processor)

This function performs SQL retrieval queries on data sources.

Syntax

rdbms:query(<STRING> datasource.name, <STRING> attribute.definition.list, <STRING> query)
rdbms:query(<STRING> datasource.name, <STRING> attribute.definition.list, <STRING> query, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> parameter)
rdbms:query(<STRING> datasource.name, <STRING> attribute.definition.list, <STRING> query, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> parameter, <STRING|BOOL|INT|DOUBLE|FLOAT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
datasource.nameThe name of the datasource for which the query should be performed.STRINGNoNo
attribute.definition.listThis is provided as a comma-separated list in the <AttributeName AttributeType> format. The SQL query is expected to return the attributes in the given order. e.g., If one attribute is defined here, the SQL query should return one column result set. If more than one column is returned, then the first column is processed. The data types supported are STRING, INT, LONG, DOUBLE, FLOAT, and BOOL. Mapping of the data type to the database data type can be done as follows, * Datatype* -> *Datasource Datatype* STRING -> CHAR,VARCHAR,LONGVARCHAR INT -> INTEGER LONG -> BIGINT DOUBLE-> DOUBLE FLOAT -> REAL BOOL -> BITSTRINGNoNo
queryThe select query(formatted according to the relevant database type) that needs to be performedSTRINGNoYes
parameterIf the second parameter is a parametrised SQL query, then stream processor attributes can be passed to set the values of the parametersSTRING BOOL INT DOUBLE FLOAT LONGYesYes

Extra Return Attributes

NameDescriptionPossible Types
attributeNameThe return attributes will be the ones defined in the parameterattribute.definition.list.STRING INT LONG DOUBLE FLOAT BOOL

EXAMPLE 1

insert into recordStream
select creditcardno, country, transaction, amount
from TriggerStream#rdbms:query('SAMPLE_DB', 'creditcardno string, country string, transaction string, amount int', 'select * from Transactions_Table');

Events inserted into recordStream includes all records matched for the query i.e an event will be generated for each record retrieved from the datasource. The event will include as additional attributes, the attributes defined in the attribute.definition.list(creditcardno, country, transaction, amount).

EXAMPLE 2

insert into recordStream
select creditcardno, country, transaction, amount
from TriggerStream#rdbms:query('SAMPLE_DB', 'creditcardno string, country string,transaction string, amount int', 'select * from where country=?', countrySearchWord);

Events inserted into recordStream includes all records matched for the query i.e an event will be generated for each record retrieved from the datasource. The event will include as additional attributes, the attributes defined in the attribute.definition.list(creditcardno, country, transaction, amount). countrySearchWord value from the event will be set in the query when querying the datasource.

Regex

find (Function)

Finds the subsequence that matches the given regex pattern.

Syntax

<BOOL> regex:find(<STRING> regex, <STRING> input.sequence)
<BOOL> regex:find(<STRING> regex, <STRING> input.sequence, <INT> starting.index)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
regexA regular expression that is matched to a sequence in order to find the subsequence of the same. For example, \d\d(.*)gdn.STRINGNoYes
input.sequenceThe input sequence to be matched with the regular expression. For example, 21 products are produced by gdn.STRINGNoYes
starting.indexThe starting index of the input sequence from where the input sequence ismatched with the given regex pattern.For example, 10.0INTYesYes

EXAMPLE 1

    regex:find('\d\d(.*)gdn', '21 products are produced by gdn currently')

This method attempts to find the subsequence of the input.sequence that matches the regex pattern, \d\d(.*)gdn. It returns true as a subsequence exists.

EXAMPLE 2

    regex:find('\d\d(.*)gdn', '21 products are produced by gdn.', 4)

This method attempts to find the subsequence of the input.sequence that matches the regex pattern, \d\d(.*)gdn starting from index 4. It returns false as subsequence does not exists.

group (Function)

Returns the subsequence captured by the given group during the regex match operation.

Syntax

<STRING> regex:group(<STRING> regex, <STRING> input.sequence, <INT> group.id)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
regexA regular expression. For example, \d\d(.*)gdn.STRINGNoYes
input.sequenceThe input sequence to be matched with the regular expression. For example, 21 products are produced by gdn.STRINGNoYes
group.idThe given group id of the regex expression. For example, 2.INTNoYes

EXAMPLE 1

    regex:group('\d\d(.*)(gdn.*)(gdn.*)', '21 products are produced within 10 years by gdn currently by gdn employees', 3)

Function returns gdn employees, the subsequence captured by the groupID 3 according to the regex pattern, \d\d(.*)(gdn.*)(gdn.*).

lookingAt (Function)

Matches the input.sequence from the beginning against the regex pattern, and unlike regex:matches() it does not require that the entire input.sequence be matched.

Syntax

<BOOL> regex:lookingAt(<STRING> regex, <STRING> input.sequence)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
regexA regular expression. For example, \d\d(.*)gdn.STRINGNoYes
input.sequenceThe input sequence to be matched with the regular expression. For example, 21 products are produced by gdn.STRINGNoYes

EXAMPLE 1

regex:lookingAt('\d\d(.*)(gdn.*)', '21 products are produced by gdn currently in Sri Lanka')

Function matches the input.sequence against the regex pattern, \d\d(.*)(gdn.*) from the beginning, and as it matches it returns true.

EXAMPLE 2

regex:lookingAt('gdn(.*)middleware(.*)', 'sample test string and gdn is situated in trace and it's a middleware company')

Function matches the input.sequence against the regex pattern, gdn(.*)middleware(.*) from the beginning, and as it does not match it returns false.

matches (Function)

Matches the entire input.sequence against the regex pattern.

Syntax

<BOOL> regex:matches(<STRING> regex, <STRING> input.sequence)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
regexA regular expression. For example, \d\d(.*)gdn.STRINGNoYes
input.sequenceThe input sequence to be matched with the regular expression. For example, 21 products are produced by gdn.STRINGNoYes

EXAMPLE 1

regex:matches('gdn(.*)middleware(.*)', 'gdn is situated in trace and its a middleware company')

Function matches the entire input.sequence against gdn(.*)middleware(.*) regex pattern, and as it matches it returns true.

EXAMPLE 2

    regex:matches('gdn(.*)middleware', 'gdn is situated in trace and its a middleware company')

Function matches the entire input.sequence against gdn(.*)middleware regex pattern. As it does not match it returns false.

Reorder

akslack (Stream Processor)

Stream processor performs reordering of out-of-order events optimized for a givenparameter using [AQ-K-Slack algorithm](http://dl.acm.org/citation.cfm?doid=2675743.2771828). This is best for reordering events on attributes those are used for aggregations.data .

Syntax

reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k, <BOOL> discard.late.arrival)
reorder:akslack(<LONG> timestamp, <INT|FLOAT|LONG|DOUBLE> correlation.field, <LONG> batch.size, <LONG> timeout, <LONG> max.k, <BOOL> discard.late.arrival, <DOUBLE> error.threshold, <DOUBLE> confidence.level)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
timestampThe event timestamp on which the events should be ordered.LONGNoYes
correlation.fieldBy monitoring the changes in this field Alpha K-Slack dynamically optimises its behavior. This field is used to calculate the runtime window coverage threshold, which represents the upper limit set for unsuccessfully handled late arrivals.INT FLOAT LONG DOUBLENoYes
batch.sizeThe parameter batch.size denotes the number of events that should be considered in the calculation of an alpha value. This should be greater than or equal to 15.`10,000`LONGYesNo
timeoutA timeout value in milliseconds, where the buffered events who are older than the given timeout period get flushed every second.`-1` (timeout is infinite)LONGYesNo
max.kThe maximum K-Slack window threshold (K parameter).`9,223,372,036,854,775,807` (The maximum Long value)LONGYesNo
discard.late.arrivalIf set to true the processor would discarded the out-of-order events arriving later than the K-Slack window, and in otherwise it allows the late arrivals to proceed.falseBOOLYesNo
error.thresholdThe error threshold to be applied in Alpha K-Slack algorithm.`0.03` (3%)DOUBLEYesNo
confidence.levelThe confidence level to be applied in Alpha K-Slack algorithm.`0.95` (95%)DOUBLEYesNo

EXAMPLE 1

CREATE STREAM StockStream (eventTime long, symbol string, volume long);

@info(name = 'query1')
insert into OutputStream
select eventTime, symbol, sum(volume) as total
from StockStream#reorder:akslack(eventTime, volume, 20) WINDOW SLIDING_TIME(5 min);

The query reorders events based on the eventTime attribute value and optimises for aggregating volume attribute considering last 20 events.

kslack (Stream Processor)

Stream processor performs reordering of out-of-order events using [K-Slack algorithm](https://www2.informatik.uni-erlangen.de/publication/download/IPDPS2013.pdf).

Syntax

reorder:kslack(<LONG> timestamp)
reorder:kslack(<LONG> timestamp, <LONG> timeout)
reorder:kslack(<LONG> timestamp, <BOOL> discard.late.arrival)
reorder:kslack(<LONG> timestamp, <LONG> timeout, <LONG> max.k)
reorder:kslack(<LONG> timestamp, <LONG> timeout, <BOOL> discard.late.arrival)
reorder:kslack(<LONG> timestamp, <LONG> timeout, <LONG> max.k, <BOOL> discard.late.arrival)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
timestampThe event timestamp on which the events should be ordered.LONGNoYes
timeoutA timeout value in milliseconds, where the buffered events who are older than the given timeout period get flushed every second.-1 (timeout is infinite)LONGYesNo
max.kThe maximum K-Slack window threshold (K parameter).`9,223,372,036,854,775,807` (The maximum Long value)LONGYesNo
discard.late.arrivalIf set to true the processor would discarded the out-of-order events arriving later than the K-Slack window, and in otherwise it allows the late arrivals to proceed.falseBOOLYesNo

EXAMPLE 1

CREATE STREAM StockStream (eventTime long, symbol string, volume long);

@info(name = 'query1')
insert into OutputStream
select eventTime, symbol, volume
from StockStream#reorder:kslack(eventTime, 5000);

The query reorders events based on the eventTime attribute value, and it forcefully flushes all the events who have arrived older than the given timeout value (5000 milliseconds) every second.

Script

javascript (Script)

This extension allows you to include JavaScript functions within the Query Language.

Syntax

define function <FunctionName>[javascript] return <type> {
// Script code
};

EXAMPLE 1

define function concatJ[JavaScript] return string {"  var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var res = str1.concat(str2,str3);
return res;
};

This JS function will consume 3 var variables, concatenate them and will return as a string

Sink

email (Sink)

The email sink uses the smtp server to publish events via emails. The events can be published in text, xml or json formats. The user can define email sink parameters in either the \<SP_HOME>/conf/<PROFILE>/deployment yaml file or in the stream definition. The email sink first checks the stream definition for parameters, and if they are no configured there, it checks the deployment.yaml file. If the parameters are not configured in either place, default values are considered for optional parameters. If you need to configure server system parameters that are not provided as options in the stream definition, then those parameters need to be defined them in the deployment.yaml file under email sink properties. For more information about the SMTP server parameters, see https://javaee.github.io/javamail/SMTP-Transport. Further, some email accounts are required to enable the access to less secure apps option. For gmail accounts, you can enable this option via https://myaccount.google.com/lesssecureapps.

Syntax

CREATE SINK <NAME> WITH (type="email", map.type="<STRING>", username="<STRING>", address="<STRING>", password="<STRING>", host="<STRING>", port="<INT>", ssl.enable="<BOOL>", auth="<BOOL>", content.type="<STRING>", subject="<STRING>", to="<STRING>", cc="<STRING>", bcc="<STRING>", attachments="<STRING>", connection.pool.size="<INT>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
usernameThe username of the email account that is used to send emails. e.g., abc is the username of the [email protected] account.STRINGNoNo
addressThe address of the email account that is used to send emails.STRINGNoNo
passwordThe password of the email account.STRINGNoNo
hostThe host name of the SMTP server. e.g., smtp.gmail.com is a host name for a gmail account. The default value smtp.gmail.com is only valid if the email account is a gmail account.smtp.gmail.comSTRINGYesNo
portThe port that is used to create the connection.465 the default value is only valid is SSL is enabled.INTYesNo
ssl.enableThis parameter specifies whether the connection should be established via a secure connection or not. The value can be either true or false. If it is true, then the connection is establish via the 493 port which is a secure connection.trueBOOLYesNo
authThis parameter specifies whether to use the AUTH command when authenticating or not. If the parameter is set to true, an attempt is made to authenticate the user using the AUTH command.trueBOOLYesNo
content.typeThe content type can be either text/plain or text/html.text/plainSTRINGYesNo
subjectThe subject of the mail to be send.STRINGNoYes
toThe address of the to recipient. If there are more than one to recipients, then all the required addresses can be given as a comma-separated list.STRINGNoYes
ccThe address of the cc recipient. If there are more than one cc recipients, then all the required addresses can be given as a comma-separated list.NoneSTRINGYesNo
bccThe address of the bcc recipient. If there are more than one bcc recipients, then all the required addresses can be given as a comma-separated list.NoneSTRINGYesNo
attachmentsFile paths of the files that need to be attached to the email. These paths should be absolute paths. They can be either directories or files . If the path is to a directory, all the files located at the first level (i.e., not within another sub directory) are attached.NoneSTRINGYesYes
connection.pool.sizeNumber of concurrent Email client connections.1INTYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
mail.smtp.ssl.trustIf this parameter is se, and a socket factory has not been specified, it enables the use of a MailSSLSocketFactory. If this parameter is set to "*", all the hosts are trusted. If it is set to a whitespace-separated list of hosts, only those specified hosts are trusted. If not, the hosts trusted depends on the certificate presented by the server.*String
mail.smtp.connectiontimeoutThe socket connection timeout value in milliseconds.infinite timeoutAny Integer
mail.smtp.timeoutThe socket I/O timeout value in milliseconds.infinite timeoutAny Integer
mail.smtp.fromThe email address to use for the SMTP MAIL command. This sets the envelope return address.Defaults to msg.getFrom() or InternetAddress.getLocalAddress().Any valid email address
mail.smtp.localportThe local port number to bind to when creating the SMTP socket.Defaults to the port number picked by the Socket class.Any Integer
mail.smtp.ehloIf this parameter is set to false, you must not attempt to sign in with the EHLO command.truetrue or false
mail.smtp.auth.login.disableIf this is set to true, it is not allowed to use the AUTH LOGIN command.falsetrue or false
mail.smtp.auth.plain.disableIf this parameter is set to true, it is not allowed to use the AUTH PLAIN command.falsetrue or false
mail.smtp.auth.digest-md5.disableIf this parameter is set to true, it is not allowed to use the AUTH DIGEST-MD5 command.falsetrue or false
mail.smtp.auth.ntlm.disableIf this parameter is set to true, it is not allowed to use the AUTH NTLM commandfalsetrue or false
mail.smtp.auth.ntlm.domainThe NTLM authentication domain.NoneThe valid NTLM authentication domain name.
mail.smtp.auth.ntlm.flagsNTLM protocol-specific flags. For more details, see http://curl.haxx.se/rfc/ntlm.html\#theNtlmFlags.NoneValid NTLM protocol-specific flags.
mail.smtp.dsn.notifyThe NOTIFY option to the RCPT command.NoneEither NEVER, or a combination of SUCCESS, FAILURE, and DELAY (separated by commas).
mail.smtp.dsn.retThe RET option to the MAIL command.NoneEither FULL or HDRS.
mail.smtp.sendpartialIf this parameter is set to true and a message is addressed to both valid and invalid addresses, the message is sent with a log that reports the partial failure with a SendFailedException error. If this parameter is set to false (which is default), the message is not sent to any of the recipients when the recipient lists contain one or more invalid addresses.falsetrue or false
mail.smtp.sasl.enableIf this parameter is set to true, the system attempts to use the javax.security.sasl package to choose an authentication mechanism for the login.falsetrue or false
mail.smtp.sasl.mechanismsEnter a space or a comma-separated list of SASL mechanism names that the system shouldt try to use.None
mail.smtp.sasl.authorizationidThe authorization ID to be used in the SASL authentication. If no value is specified, the authentication ID (i.e., username) is used.usernameValid ID
mail.smtp.sasl.realmThe realm to be used with the DIGEST-MD5 authentication.None
mail.smtp.quitwaitIf this parameter is set to false, the QUIT command is issued and the connection is immediately closed. If this parameter is set to true (which is default), the transport waits for the response to the QUIT command.falsetrue or false
mail.smtp.reportsuccessIf this parameter is set to true, the transport to includes an SMTPAddressSucceededException for each address to which the message is successfully delivered.falsetrue or false
mail.smtp.socketFactoryIf this parameter is set to a class that implements the javax.net.SocketFactory interface, this class is used to create SMTP sockets.NoneSocket Factory
mail.smtp.socketFactory.classIf this parameter is set, it specifies the name of a class that implements the javax.net.SocketFactory interface. This class is used to create SMTP sockets.None
mail.smtp.socketFactory.fallbackIf this parameter is set to true, the failure to create a socket using the specified socket factory class causes the socket to be created using the java.net.Socket class.truetrue or false
mail.smtp.socketFactory.portThis specifies the port to connect to when using the specified socket factory.25Valid port number
mail.smtp.ssl.protocolsThis specifies the SSL protocols that need to be enabled for the SSL connections.NoneThis parameter specifies a whitespace separated list of tokens that are acceptable to the javax.net.ssl.SSLSocket.setEnabledProtocols method.
mail.smtp.starttls.enableIf this parameter is set to true, it is possible to issue the STARTTLS command (if supported by the server) to switch the connection to a TLS-protected connection before issuing any login commands.falsetrue or false
mail.smtp.starttls.requiredIf this parameter is set to true, it is required to use the STARTTLS command. If the server does not support the STARTTLS command, or if the command fails, the connection method will fail.falsetrue or false
mail.smtp.socks.hostThis specifies the host name of a SOCKS5 proxy server to be used for the connections to the mail server.None
mail.smtp.socks.portThis specifies the port number for the SOCKS5 proxy server. This needs to be used only if the proxy server is not using the standard port number 1080.1080valid port number
mail.smtp.auth.ntlm.disableIf this parameter is set to true, the AUTH NTLM command cannot be issued.falsetrue or false
mail.smtp.mailextensionThe extension string to be appended to the MAIL command.None
mail.smtp.usersetIf this parameter is set to true, you should use the RSET command instead of the NOOP command in the isConnected method. In some scenarios, sendmail responds slowly after many NOOP commands. This is avoided by using RSET instead.falsetrue or false

EXAMPLE 1

CREATE SINK FooStream WITH (type='email', map.type ='json', username='sender.account', address='[email protected]',password='account.password',subject='Alerts from gdn Stream Processor',to='{{email}}') (email string, loginId int, name string);

This example illustrates how to publish events via an email sink based on the values provided for the mandatory parameters. As shown in the example, it publishes events from the FooStream in json format as emails to the specified to recipients via the email sink. The email is sent from the [email protected] email address via a secure connection.

EXAMPLE 2

CREATE SINK FooStream WITH type='email', map.type ='json', subject='Alerts from gdn Stream Processor',to='{{email}}') (email string, loginId int, name string);

This example illustrates how to configure the query parameters and the system parameters in the deployment.yaml file.

As shown in the example, events from the FooStream are published in json format via the email sink as emails to the given to recipients. The email is sent from the [email protected] address via a secure connection.

EXAMPLE 3

CREATE SINK FooStream WITH (type='email', map.type ='json', username='sender.account', address='[email protected]',password='account.password',host='smtp.gmail.com',port='465',ssl.enable='true',auth='true',content.type='text/html',subject='Alerts from gdn Stream Processor-{{name}}',to='[email protected], [email protected]',cc='[email protected], [email protected]',bcc='[email protected]) (name string, age int, country string);

This example illustrates how to publish events via the email sink. Events from the FooStream stream are published in xml format via the email sink as a text/html message and sent to the specified to, cc, and bcc recipients via a secure connection. The name namespace in the subject attribute is the value of the name parameter in the corresponding output event.

EXAMPLE 4

CREATE SINK FooStream WITH (type='email', map.type ='json', username='sender.account', address='[email protected]',password='account.password',host='smtp.gmail.com',port='465',ssl.enable='true',auth='true',content.type='text/html',subject='Alerts from gdn Stream Processor-{{name}}',to='[email protected], [email protected]',cc='[email protected], [email protected]',bcc='[email protected]= '{{attachments}}') (name string, age int, country string, attachments string);

This example illustrates how to publish events via the email sink. Here, the email also contains attachments. Events from the FooStream are published in xml format via the email sink as a text/html message to the specified to,cc, and bcc recipients via a secure connection. The name namespace in the subject attribute is the value for the name parameter in the corresponding output event. The attachments included in the email message are the local files available in the path specified as the value for the attachments attribute.

grpc (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes added. The default service is called "EventService". If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the stream processor-tooling folder if we use it with stream processor-tooling). This grpc sink is used for scenarios where we send a request and don't expect a response back. I.e getting a google.protobuf.Empty response back.

Syntax

CREATE SINK <NAME> WITH (type="grpc", map.type="<STRING>", publisher.url="<STRING>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
headersGRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent-STRINGYesNo
idle.timeoutSet the duration in seconds without ongoing RPCs before going to idle mode.1800LONGYesNo
keep.alive.timeSets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.Long.MAX_VALUELONGYesNo
keep.alive.timeoutSets the time in seconds waiting for read activity after sending a keepalive ping.20LONGYesNo
keep.alive.without.callsSets whether keepalive will be performed when there are no outstanding RPC on a connection.falseBOOLYesNo
enable.retryEnables the retry mechanism provided by the gRPC library.falseBOOLYesNo
max.retry.attemptsSets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.5INTYesNo
retry.buffer.sizeSets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.16777216LONGYesNo
per.rpc.buffer.sizeSets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.1048576LONGYesNo
channel.termination.waiting.timeThe time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.5LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is enabled-STRINGYesNo
keystore.passwordthe password of keystore-STRINGYesNo
keystore.algorithmthe encryption algorithm to be used for client authentication-STRINGYesNo
enable.sslto enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
mutual.auth.enabledto enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo

EXAMPLE 1

CREATE SINK FooStream WITH (type='grpc', map.type='JSON', publisher.url = 'grpc://134.23.43.35:8080/org.gdn.grpc.EventService/consume') (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/consume the sink will be operating in default mode

EXAMPLE 2

CREATE SINK FooStream WITH (type='grpc', map.type='JSON', publisher.url = 'grpc://134.23.43.35:8080/org.gdn.grpc.EventService/consume', headers='{{headers}}', map.payload='{{message}}') (message String, headers String);

A similar example to above but with headers. Headers are also send into the stream as a data. In the sink headers dynamic property reads the value and sends it as MetaData with the request

EXAMPLE 3

CREATE SINK FooStream WITH (type='grpc', map.type='protobuf', publisher.url = 'grpc://134.23.43.35:8080/org.gdn.grpc.MyService/send') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080 since there is no mapper provided, attributes of stream definition should be as same as the attributes of protobuf message definition.

EXAMPLE 4

CREATE SINK FooStream WITH (type='grpc', map.type='protobuf', publisher.url = 'grpc://134.23.43.35:8080/org.gdn.grpc.MyService/testMap') (stringValue string, intValue int,map object);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 134.23.43.35 listening to port 8080. The map object in the stream definition defines that this stream is going to use Map object with grpc service. We can use any map object that extends java.util.AbstractMap class.

EXAMPLE 5

CREATE SINK FooStream WITH (type='grpc', map.type='protobuf', publisher.url = 'grpc://134.23.43.35:8080/org.gdn.grpc.MyService/testMap', map.payload="stringValue='a',longValue='b',intValue='c',booleanValue='d',floatValue = 'e', doubleValue = 'f'"') (a string, b long, c int,d bool,e float,f double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. map.payload is provided in this stream, therefore we can use any name for the attributes in the stream definition, but we should correctly map those names with protobuf message attributes. If we are planning to send metadata within a stream we should use map.payload to map attributes to identify the metadata attribute and the protobuf attributes separately.

EXAMPLE 6

CREATE SINK FooStream WITH (type='grpc', map.type='protobuf', publisher.url = 'grpc://194.23.98.100:8888/org.gdn.grpc.test.StreamService/clientStream') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here in the grpc sink, we are sending a stream of requests to the server that runs on 194.23.98.100 and port 8888. When we need to send a stream of requests from the grpc sink we have to define a client stream RPC method.Then the stream processor will identify whether it's a unary method or a stream method and send requests according to the method type.

grpc-call (Sink)

This extension publishes event data encoded into GRPC Classes as defined in the user input jar. This extension has a default gRPC service classes jar added. The default service is called "EventService". If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the stream processor-tooling folder if we use it with stream processor-tooling). This grpc-call sink is used for scenarios where we send a request out and expect a response back. In default mode this will use EventService process method. grpc-call-response source is used to receive the responses. A unique sink.id is used to correlate between the sink and its corresponding source.

Syntax

CREATE SINK <NAME> WITH (type="grpc-call", map.type="<STRING>" publisher.url="<STRING>", sink.id="<INT>", headers="<STRING>", idle.timeout="<LONG>", keep.alive.time="<LONG>", keep.alive.timeout="<LONG>", keep.alive.without.calls="<BOOL>", enable.retry="<BOOL>", max.retry.attempts="<INT>", retry.buffer.size="<LONG>", per.rpc.buffer.size="<LONG>", channel.termination.waiting.time="<LONG>", max.inbound.message.size="<LONG>", max.inbound.metadata.size="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe url to which the outgoing events should be published via this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
sink.ida unique ID that should be set for each grpc-call-sink. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the source also.INTNoNo
headersGRPC Request headers in format "'<key>:<value>','<key>:<value>'". If header parameter is not provided just the payload is sent-STRINGYesNo
idle.timeoutSet the duration in seconds without ongoing RPCs before going to idle mode.1800LONGYesNo
keep.alive.timeSets the time in seconds without read activity before sending a keepalive ping. Keepalives can increase the load on services so must be used with caution. By default set to Long.MAX_VALUE which disables keep alive pinging.Long.MAX_VALUELONGYesNo
keep.alive.timeoutSets the time in seconds waiting for read activity after sending a keepalive ping.20LONGYesNo
keep.alive.without.callsSets whether keepalive will be performed when there are no outstanding RPC on a connection.falseBOOLYesNo
enable.retryEnables the retry and hedging mechanism provided by the gRPC library.falseBOOLYesNo
max.retry.attemptsSets max number of retry attempts. The total number of retry attempts for each RPC will not exceed this number even if service config may allow a higher number.5INTYesNo
retry.buffer.sizeSets the retry buffer size in bytes. If the buffer limit is exceeded, no RPC could retry at the moment, and in hedging case all hedges but one of the same RPC will cancel.16777216LONGYesNo
per.rpc.buffer.sizeSets the per RPC buffer limit in bytes used for retry. The RPC is not retriable if its buffer limit is exceeded.1048576LONGYesNo
channel.termination.waiting.timeThe time in seconds to wait for the channel to become terminated, giving up if the timeout is reached.5LONGYesNo
max.inbound.message.sizeSets the maximum message size allowed to be received on the channel in bytes4194304LONGYesNo
max.inbound.metadata.sizeSets the maximum size of metadata allowed to be received in bytes8192LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is enabled-STRINGYesNo
keystore.passwordthe password of keystore-STRINGYesNo
keystore.algorithmthe encryption algorithm to be used for client authentication-STRINGYesNo
enable.sslto enable ssl. If set to true and truststore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
mutual.auth.enabledto enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo

EXAMPLE 1

CREATE SINK FooStream WITH (type='grpc-call', map.type='json', publisher.url = 'grpc://194.23.98.100:8080/EventService/process', sink.id= '1') (message String);

CREATE SOURCE BarStream WITH (type='grpc-call-response', sink.id= '1') (message String);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. sink.id is set to 1 here. So we can write a source with sink.id 1 so that it will listen to responses for requests published from this stream. Note that since we are using EventService/process the sink will be operating in default mode

EXAMPLE 2

CREATE SINK FooStream WITH (type='grpc-call`, map.type='json', publisher.url = 'grpc://194.23.98.100:8080/EventService/process', sink.id= '1',) (message String);

CREATE SOURCE BarStream WITH (type='grpc-call-response', sink.id= '1') (message String);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream.

EXAMPLE 3

CREATE SINK FooStream WITH (type='grpc-call`, map.type='protobuf', publisher.url = 'grpc://194.23.98.100:8888/org.gdn.grpc.test.MyService/process', sink.id= '1') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

CREATE SOURCE FooStream WITH (type='grpc-call-response', map.type='protobuf', receiver.url = 'grpc://localhost:8888/org.gdn.grpc.MyService/process', sink.id= '1',) (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here a stream named FooStream is defined with grpc sink. A grpc server should be running at 194.23.98.100 listening to port 8080. We have added another stream called BarStream which is a grpc-call-response source with the same sink.id 1 and as same as FooStream definition. So the responses for calls sent from the FooStream will be added to BarStream. Since there is no mapping available in the stream definition attributes names should be as same as the attributes of the protobuf message definition. (Here the only reason we provide receiver.url in the grpc-call-response source is for protobuf mapper to map Response into a stream processor event, we can give any address and any port number in the URL, but we should provide the service name and the method name correctly)

EXAMPLE 4

CREATE SINK FooStream WITH (type='grpc-call', map.type='protobuf', publisher.url = 'grpc://194.23.98.100:8888/org.gdn.grpc.test.MyService/process', sink.id= '1', map.payload="stringValue='a',longValue='c',intValue='b',booleanValue='d',floatValue = 'e', doubleValue = 'f'") (a string, b int,c long,d bool,e float,f double);

CREATE SOURCE FooStream WITH (type='grpc-call-response', map.type='protobuf', receiver.url = 'grpc://localhost:8888/org.gdn.grpc.test.MyService/process', sink.id= '1', map.attributes="a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue'") (a string, b int,c long,d bool,e float,f double);

Here with the same FooStream definition we have added a BarStream which has a grpc-call-response source with the same sink.id 1. So the responses for calls sent from the FooStream will be added to BarStream. In this stream we provided mapping for both the sink and the source. so we can use any name for the attributes in the stream definition, but we have to map those attributes with correct protobuf attributes. As same as the grpc-sink, if we are planning to use metadata we should map the attributes.

grpc-service-response (Sink)

This extension is used to send responses back to a gRPC client after receiving requests through grpc-service source. This correlates with the particular source using a unique source.id

Syntax

CREATE SINK <NAME> WITH (type="grpc-service-response", map-type="<STRING>", source.id="<INT>", )

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
source.idA unique id to identify the correct source to which this sink is mapped. There is a 1:1 mapping between source and sinkINTNoNo

EXAMPLE 1

CREATE SINK BarStream WITH (type='grpc-service-response', source.id='1', map.type='json') (messageId String, message String);

CREATE SOURCE FooStream WITH (type='grpc-service',url='grpc://134.23.43.35:8080/org.gdn.grpc.EventService/process', source.id='1', map.type='json', map.attributes="messageId='trp:messageId', message='message'") (messageId String, message String);

insert into BarStream
select *
from FooStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough as we are selecting everything from FooStream and inserting into BarStream.

http (Sink)

HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML and JSON. It can also publish to endpoints protected by basic authentication or OAuth 2.0.

Syntax

CREATE SINK <NAME> WITH (type="http", map.type="<STRING>" publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", tls.store.type="<STRING>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe URL to which the outgoing events should be published. Examples: http://localhost:8080/endpoint, https://localhost:8080/endpointSTRINGNoNo
basic.auth.usernameThe username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.-STRINGYesNo
basic.auth.passwordThe password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.-STRINGYesNo
https.truststore.fileThe file path of the client truststore when sending messages through https protocol.`\${carbon.home}/resources/security/client-truststore.jks`STRINGYesNo
https.truststore.passwordThe password for the client-truststore.gdncarbonSTRINGYesNo
oauth.usernameThe username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.-STRINGYesNo
oauth.passwordThe password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.-STRINGYesNo
consumer.keyConsumer key used for calling endpoints protected by OAuth 2.0-STRINGYesNo
consumer.secretConsumer secret used for calling endpoints protected by OAuth 2.0-STRINGYesNo
token.urlToken URL to generate a new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
refresh.tokenRefresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When Content-Type header is not provided the system derives the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo
methodThe HTTP method used for calling the endpoint.POSTSTRINGYesNo
socket.idle.timeoutSocket timeout in millis.6000INTYesNo
chunk.disabledDisable chunked transfer encoding.falseBOOLYesNo
ssl.protocolSSL/TLS protocol.TLSSTRINGYesNo
ssl.verification.disabledDisable SSL verification.falseBOOLYesNo
tls.store.typeTLS store type.JKSSTRINGYesNo
ssl.configurationsSSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'-STRINGYesNo
proxy.hostProxy server host-STRINGYesNo
proxy.portProxy server port-STRINGYesNo
proxy.usernameProxy server username-STRINGYesNo
proxy.passwordProxy server password-STRINGYesNo
client.bootstrap.configurationsClient bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000' - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15' - Client socket reuse: 'client.bootstrap.socket.reuse:true' - Enable TCP no delay: 'client.bootstrap.nodelay:true' - Enable client keep alive: 'client.bootstrap.keepalive:true' - Send buffer size: 'client.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'-STRINGYesNo
max.pool.active.connectionsMaximum possible number of active connection per client pool.-1INTYesNo
min.pool.idle.connectionsMinimum number of idle connections that can exist per client pool.0INTYesNo
max.pool.idle.connectionsMaximum number of idle connections that can exist per client pool.100INTYesNo
min.evictable.idle.timeMinimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.300000STRINGYesNo
time.between.eviction.runsTime between two eviction operations (in millis) on the client pool.30000STRINGYesNo
max.wait.timeThe maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.60000STRINGYesNo
test.on.borrowEnable connections to be validated before being borrowed from the client pool.trueBOOLYesNo
test.while.idleEnable connections to be validated during the eviction operation (if any).trueBOOLYesNo
exhausted.actionAction that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. 0 - Fail the request. 1 - Block the request, until a connection returns to the pool. 2 - Grow the connection pool size.1 (Block when exhausted)INTYesNo
hostname.verification.enabledEnable hostname verification.trueBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
clientBootstrapClientGroupSizeNumber of client threads to perform non-blocking read and write to one or more channels.(Number of available processors) * 2Any positive integer
clientBootstrapBossGroupSizeNumber of boss threads to accept incoming connections.Number of available processorsAny positive integer
clientBootstrapWorkerGroupSizeNumber of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.(Number of available processors) * 2Any positive integer
trustStoreLocationThe default truststore file path.`\${carbon.home}/resources/security/client-truststore.jks`Path to client truststore `.jks` file
trustStorePasswordThe default truststore password.gdncarbonTruststore password as string

EXAMPLE 1

CREATE SINK StockStream WITH (type = 'http', map.type = 'json', publisher.url = 'http://stocks.com/stocks') (symbol string, price float, volume long);

Events arriving on the StockStream will be published to the HTTP endpoint http://stocks.com/stocks using POST method with Content-Type application/json by converting those events to the default JSON format as following:

{
"event": {
"symbol": "FB",
"price": 24.5,
"volume": 5000
}
}

EXAMPLE 2

CREATE SINK FooStream WITH (type='http', map.type='xml', publisher.url = 'http://localhost:8009/foo', client.bootstrap.configurations = "'client.bootstrap.socket.timeout:20'", max.pool.active.connections = '1', headers = "{{headers}}", map.payload="""<stock>{{payloadBody}}</stock>""") (payloadBody String, headers string);

Events arriving on FooStream will be published to the HTTP endpoint http://localhost:8009/foo using POST method with Content-Type application/xml and setting payloadBody and header attribute values. If the payloadBody contains

<symbol>gdn</symbol>
<price>55.6</price>
<volume>100</volume>

and header contains 'topic:foobar' values, then the system will generate an output with the body:

<stock>
<symbol>gdn</symbol>
<price>55.6</price>
<volume>100</volume>
</stock>

and HTTP headers: Content-Length:xxx, Content-Location:'xxx', Content-Type:'application/xml', HTTP_METHOD:'POST'

http-call (Sink)

The http-call sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML or JSON and consume responses through its corresponding http-call-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.

Syntax

CREATE SINK <NAME> WITH (type="http-call", map.type="<STRING>", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe URL which should be called. Examples: http://localhost:8080/endpoint, https://localhost:8080/endpointSTRINGNoNo
sink.idIdentifier to correlate the http-call sink to its corresponding http-call-response sources to retrieved the responses.STRINGNoNo
basic.auth.usernameThe username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.-STRINGYesNo
basic.auth.passwordThe password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.-STRINGYesNo
https.truststore.fileThe file path of the client truststore when sending messages through https protocol.`\${carbon.home}/resources/security/client-truststore.jks`STRINGYesNo
https.truststore.passwordThe password for the client-truststore.gdncarbonSTRINGYesNo
oauth.usernameThe username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.-STRINGYesNo
oauth.passwordThe password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.-STRINGYesNo
consumer.keyConsumer key used for calling endpoints protected by OAuth 2.0-STRINGYesNo
consumer.secretConsumer secret used for calling endpoints protected by OAuth 2.0-STRINGYesNo
token.urlToken URL to generate a new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
refresh.tokenRefresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo
methodThe HTTP method used for calling the endpoint.POSTSTRINGYesNo
downloading.enabledEnable response received by the http-call-response source to be written to a file. When this is enabled the download.path property should be also set.falseBOOLYesNo
download.pathThe absolute file path along with the file name where the downloads should be saved.-STRINGYesYes
blocking.ioBlocks the request thread until a response it received from HTTP call-response source before sending any other request.falseBOOLYesNo
socket.idle.timeoutSocket timeout in millis.6000INTYesNo
chunk.disabledDisable chunked transfer encoding.falseBOOLYesNo
ssl.protocolSSL/TLS protocol.TLSSTRINGYesNo
ssl.verification.disabledDisable SSL verification.falseBOOLYesNo
ssl.configurationsSSL/TSL configurations. Expected format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'-STRINGYesNo
proxy.hostProxy server host-STRINGYesNo
proxy.portProxy server port-STRINGYesNo
proxy.usernameProxy server username-STRINGYesNo
proxy.passwordProxy server password-STRINGYesNo
client.bootstrap.configurationsClient bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000' - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15' - Client socket reuse: 'client.bootstrap.socket.reuse:true' - Enable TCP no delay: 'client.bootstrap.nodelay:true' - Enable client keep alive: 'client.bootstrap.keepalive:true' - Send buffer size: 'client.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'-STRINGYesNo
max.pool.active.connectionsMaximum possible number of active connection per client pool.-1INTYesNo
min.pool.idle.connectionsMinimum number of idle connections that can exist per client pool.0INTYesNo
max.pool.idle.connectionsMaximum number of idle connections that can exist per client pool.100INTYesNo
min.evictable.idle.timeMinimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.300000STRINGYesNo
time.between.eviction.runsTime between two eviction operations (in millis) on the client pool.30000STRINGYesNo
max.wait.timeThe maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.60000STRINGYesNo
test.on.borrowEnable connections to be validated before being borrowed from the client pool.trueBOOLYesNo
test.while.idleEnable connections to be validated during the eviction operation (if any).trueBOOLYesNo
exhausted.actionAction that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. 0 - Fail the request. 1 - Block the request, until a connection returns to the pool. 2 - Grow the connection pool size.1 (Block when exhausted)INTYesNo
hostname.verification.enabledEnable hostname verificationtrueBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
clientBootstrapClientGroupSizeNumber of client threads to perform non-blocking read and write to one or more channels.(Number of available processors) * 2Any positive integer
clientBootstrapBossGroupSizeNumber of boss threads to accept incoming connections.Number of available processorsAny positive integer
clientBootstrapWorkerGroupSizeNumber of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.(Number of available processors) * 2Any positive integer
trustStoreLocationThe default truststore file path.`\${carbon.home}/resources/security/client-truststore.jks`Path to client truststore `.jks` file
trustStorePasswordThe default truststore password.gdncarbonTruststore password as string

EXAMPLE 1

    CREATE SINK FooStream WITH (type='http-call', sink.id='foo', publisher.url='http://localhost:8009/foo', map.type='xml', map.payload="'{{payloadBody}}'") (payloadBody string);

CREATE SOURCE ResponseStream WITH (type='http-call-response', sink.id='foo', map.type='text', regex.A='((.|\n)*)', map.attributes="headers='trp:headers', message='A[1]'") (message string, headers string);

When events arrive in `FooStream`, http-call sink makes calls to
endpoint on url `http://localhost:8009/foo` with `POST` method and
Content-Type `application/xml`. If the event `payloadBody` attribute
contains following XML:

<item>
<name>apple</name>
<price>55</price>
<quantity>5</quantity>
</item>

the http-call sink maps that and sends it to the endpoint. When endpoint sends a response it will be consumed by the corresponding http-call-response source correlated via the same sink.id foo and that will map the response message and send it via ResponseStream steam by assigning the message body as message attribute and response headers as headers attribute of the event.

EXAMPLE 2

    CREATE SINK DownloadRequestStream WITH (type='http-call', publisher.url='http://localhost:8005/files/{{name}}', downloading.enabled='true', download.path='{{downloadPath}}{{name}}', method='GET', sink.id='download', map.type='json') (name String, id int, downloadPath string);

CREATE SOURCE ResponseStream2xx WITH (type='http-call-response', sink.id='download', http.status.code='2\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="name='trp:name', id='trp:id', file='A[1]'") (name string, id string, file string);

CREATE SOURCE ResponseStream4xx WITH (type='http-call-response', sink.id='download', http.status.code='4\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="errorMsg='A[1]'") (errorMsg string);

When events arrive in DownloadRequestStream with name:foo.txt, id:75 and downloadPath:/user/download/ the http-call sink sends a GET request to the url http://localhost:8005/files/foo.txt to download the file to the given path /user/download/foo.txt and capture the response via its corresponding http-call-response source based on the response status code. If the response status code is in the range of 200 the message will be received by the http-call-response source associated with the ResponseStream2xx stream which expects http.status.code with regex 2\\d+ while downloading the file to the local file system on the path /user/download/foo.txt and mapping the response message having the absolute file path to event's file attribute. If the response status code is in the range of 400 then the message will be received by the http-call-response source associated with the ResponseStream4xx stream which expects http.status.code with regex 4\\d+ while mapping the error response to the errorMsg attribute of the event.

http-request (Sink)

Deprecated

_(Use http-call sink instead)._ The http-request sink publishes messages to endpoints via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML or JSON and consume responses through its corresponding http-response source. It also supports calling endpoints protected with basic authentication or OAuth 2.0.

Syntax

CREATE SINK <NAME> WITH (sink.type="http-request", map.type="<STRING>", publisher.url="<STRING>", sink.id="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", downloading.enabled="<BOOL>", download.path="<STRING>", blocking.io="<BOOL>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe URL which should be called. Examples: http://localhost:8080/endpoint, https://localhost:8080/endpointSTRINGNoNo
sink.idIdentifier to correlate the http-request sink to its corresponding http-response sources to retrieved the responses.STRINGNoNo
basic.auth.usernameThe username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.-STRINGYesNo
basic.auth.passwordThe password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.-STRINGYesNo
https.truststore.fileThe file path of the client truststore when sending messages through https protocol.`\${carbon.home}/resources/security/client-truststore.jks`STRINGYesNo
https.truststore.passwordThe password for the client-truststore.gdncarbonSTRINGYesNo
oauth.usernameThe username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.-STRINGYesNo
oauth.passwordThe password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.-STRINGYesNo
consumer.keyConsumer key used for calling endpoints protected by OAuth 2.0-STRINGYesNo
consumer.secretConsumer secret used for calling endpoints protected by OAuth 2.0-STRINGYesNo
token.urlToken URL to generate a new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
refresh.tokenRefresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo
methodThe HTTP method used for calling the endpoint.POSTSTRINGYesNo
downloading.enabledEnable response received by the http-response source to be written to a file. When this is enabled the download.path property should be also set.falseBOOLYesNo
download.pathThe absolute file path along with the file name where the downloads should be saved.-STRINGYesYes
blocking.ioBlocks the request thread until a response it received from HTTP call-response source before sending any other request.falseBOOLYesNo
socket.idle.timeoutSocket timeout in millis.6000INTYesNo
chunk.disabledDisable chunked transfer encoding.falseBOOLYesNo
ssl.protocolSSL/TLS protocol.TLSSTRINGYesNo
ssl.verification.disabledDisable SSL verification.falseBOOLYesNo
ssl.configurationsSSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'-STRINGYesNo
proxy.hostProxy server host-STRINGYesNo
proxy.portProxy server port-STRINGYesNo
proxy.usernameProxy server username-STRINGYesNo
proxy.passwordProxy server password-STRINGYesNo
client.bootstrap.configurationsClient bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000' - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15' - Client socket reuse: 'client.bootstrap.socket.reuse:true' - Enable TCP no delay: 'client.bootstrap.nodelay:true' - Enable client keep alive: 'client.bootstrap.keepalive:true' - Send buffer size: 'client.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'-STRINGYesNo
max.pool.active.connectionsMaximum possible number of active connection per client pool.-1INTYesNo
min.pool.idle.connectionsMinimum number of idle connections that can exist per client pool.0INTYesNo
max.pool.idle.connectionsMaximum number of idle connections that can exist per client pool.100INTYesNo
min.evictable.idle.timeMinimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.300000STRINGYesNo
time.between.eviction.runsTime between two eviction operations (in millis) on the client pool.30000STRINGYesNo
max.wait.timeThe maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.60000STRINGYesNo
test.on.borrowEnable connections to be validated before being borrowed from the client pool.trueBOOLYesNo
test.while.idleEnable connections to be validated during the eviction operation (if any).trueBOOLYesNo
exhausted.actionAction that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. 0 - Fail the request. 1 - Block the request, until a connection returns to the pool. 2 - Grow the connection pool size.1 (Block when exhausted)INTYesNo
hostname.verification.enabledEnable hostname verificationtrueBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
clientBootstrapClientGroupSizeNumber of client threads to perform non-blocking read and write to one or more channels.(Number of available processors) * 2Any positive integer
clientBootstrapBossGroupSizeNumber of boss threads to accept incoming connections.Number of available processorsAny positive integer
clientBootstrapWorkerGroupSizeNumber of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.(Number of available processors) * 2Any positive integer
trustStoreLocationThe default truststore file path.`\${carbon.home}/resources/security/client-truststore.jks`Path to client truststore `.jks` file
trustStorePasswordThe default truststore password.gdncarbonTruststore password as string

EXAMPLE 1

    CREATE SINK FooStream WITH (type='http-request', sink.id='foo', publisher.url='http://localhost:8009/foo', map.type='xml', map.payload="'{{payloadBody}}'") (payloadBody string);

CREATE SOURCE ResponseStream WITH (type='http-response', sink.id='foo', map.type='text', map.regex.A='((.|\n)*)', map.attributes="headers='trp:headers', message='A[1]'") (message string, headers string);

When events arrive in `FooStream`, http-request sink makes calls to
endpoint on url `http://localhost:8009/foo` with `POST` method and
Content-Type `application/xml`. If the event `payloadBody` attribute
contains following XML:

<item>
<name>apple</name>
<price>55</price>
<quantity>5</quantity>
</item>

the http-request sink maps that and sends it to the endpoint. When
endpoint sends a response it will be consumed by the corresponding
http-response source correlated via the same `sink.id` `foo` and that
will map the response message and send it via `ResponseStream` steam by
assigning the message body as `message` attribute and response headers
as `headers` attribute of the event.

EXAMPLE 2

    CREATE SINK DownloadRequestStream WITH (type='http-request', publisher.url='http://localhost:8005/files/{{name}}', downloading.enabled='true', download.path='{{downloadPath}}{{name}}', method='GET', sink.id='download', map.type='json') (name String, id int, downloadPath string);

CREATE SOURCE ResponseStream2xx WITH (type='http-response', sink.id='download', http.status.code='2\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="name='trp:name', id='trp:id', file='A[1]'") (name string, id string, file string);

CREATE SOURCE ResponseStream4xx WITH (type='http-response', sink.id='download', http.status.code='4\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="errorMsg='A[1]'") (errorMsg string);

When events arrive in DownloadRequestStream with name:foo.txt, id:75 and downloadPath:/user/download/ the http-request sink sends a GET request to the url http://localhost:8005/files/foo.txt to download the file to the given path /user/download/foo.txt and capture the response via its corresponding http-response source based on the response status code. If the response status code is in the range of 200 the message will be received by the http-response source associated with the ResponseStream2xx stream which expects http.status.code with regex 2\\d+ while downloading the file to the local file system on the path /user/download/foo.txt and mapping the response message having the absolute file path to event's file attribute. If the response status code is in the range of 400 then the message will be received by the http-response source associated with the ResponseStream4xx stream which expects http.status.code with regex 4\\d+ while mapping the error response to the errorMsg attribute of the event.

http-response (Sink)

Deprecated

(Use http-service-response sink instead). The http-response sink send responses of the requests consumed by its corresponding http-request source, by mapping the response messages to formats such as text, XML and JSON.

Syntax

CREATE SINK <NAME> WITH (type="http-response", map.type="<STRING>", source.id="<STRING>", message.id="<STRING>", headers="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
source.idIdentifier to correlate the http-response sink to its corresponding http-request source which consumed the request.STRINGNoNo
message.idIdentifier to correlate the response with the request received by http-request source.STRINGNoYes
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo

EXAMPLE 1

CREATE SOURCE AddStream WITH (type='http-request', receiver.url='http://localhost:5005/add', source.id='adder', map.type='json, map.attributes="messageId='trp:messageId', value1='$.event.value1', value2='$.event.value2'") (messageId string, value1 long, value2 long);

CREATE SINK ResultStream WITH (type='http-response', source.id='adder', message.id='{{messageId}}', map.type='json') (messageId string, results long);

@info(name = 'query1')
insert into ResultStream
select messageId, value1 + value2 as results
from AddStream;

The http-request source on stream AddStream listens on url http://localhost:5005/stocks for JSON messages with format:

{
"event": {
"value1": 3,
"value2": 4
}
}

and when events arrive it maps to AddStream events and pass them to query query1 for processing. The query results produced on ResultStream are sent as a response via http-response sink with format:

{
"event": {
"results": 7
}
}

Here the request and response are correlated by passing the messageId produced by the http-request to the respective http-response sink.

http-service-response (Sink)

The http-service-response sink send responses of the requests consumed by its corresponding http-service source, by mapping the response messages to formats such as text, XML and JSON.

Syntax

CREATE SINK <NAME> WITH (type="http-service-response", map.type="<STRING>", source.id="<STRING>", message.id="<STRING>", headers="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
source.idIdentifier to correlate the http-service-response sink to its corresponding http-service source which consumed the request.STRINGNoNo
message.idIdentifier to correlate the response with the request received by http-service source.STRINGNoYes
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When the Content-Type header is not provided the system decides the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo

EXAMPLE 1

CREATE SOURCE AddStream WITH (type='http-service', receiver.url='http://localhost:5005/add', source.id='adder', map.type='json, map.attributes="messageId='trp:messageId', value1='$.event.value1', value2='$.event.value2'") (messageId string, value1 long, value2 long);

CREATE SINK ResultStream WITH (type='http-service-response', source.id='adder', message.id='{{messageId}}', map.type='json') (messageId string, results long);

@info(name = 'query1')
from AddStream
select messageId, value1 + value2 as results
insert into ResultStream;

The http-service source on stream AddStream listens on url http://localhost:5005/stocks for JSON messages with format:

{
"event": {
"value1": 3,
"value2": 4
}
}

and when events arrive it maps to AddStream events and pass them to query query1 for processing. The query results produced on ResultStream are sent as a response via http-service-response sink with format:

{
"event": {
"results": 7
}
}

Here the request and response are correlated by passing the messageId produced by the http-service to the respective http-service-response sink.

inMemory (Sink)

In-memory sink publishes events to In-memory sources that are subscribe to the same topic to which the sink publishes. This provides a way to connect multiple Stream Apps deployed under the same Stream Apps Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.

Syntax

CREATE SINK <NAME> WITH (type="inMemory", map.type="<STRING>", topic="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
topicEvent are delivered to allthe subscribers subscribed on this topic.STRINGNoNo

EXAMPLE 1

CREATE SINK StocksStream WITH (type='inMemory', topic='Stocks', map.type='passThrough') (symbol string, price float, volume long);

Here the StocksStream uses inMemory sink to emit the Stream App events to all the inMemory sources deployed in the same JVM and subscribed to the topic Stocks.

jms (Sink)

JMS Sink allows users to subscribe to a JMS broker and publish JMS messages.

Syntax

CREATE SINK <NAME> WITH (type="jms", map.type="<STRING>", connection.factory.jndi.name="<STRING>", factory.initial="<STRING>", provider.url="<STRING>", connection.factory.type="<STRING>", connection.username="<STRING>", connection.password="<STRING>", connection.factory.nature="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
destinationQueue/Topic name which JMS Source should subscribe toSTRINGNoYes
connection.factory.jndi.nameJMS Connection Factory JNDI name. This value will be used for the JNDI lookup to find the JMS Connection Factory.QueueConnectionFactorySTRINGYesNo
factory.initialNaming factory initial valueSTRINGNoNo
provider.urlJava naming provider URL. Property for specifying configuration information for the service provider to use. The value of the property should contain a URL string (e.g. "ldap://somehost:389")STRINGNoNo
connection.factory.typeType of the connection connection factory. This can be either queue or topic.queueSTRINGYesNo
connection.usernameusername for the broker.NoneSTRINGYesNo
connection.passwordPassword for the brokerNoneSTRINGYesNo
connection.factory.natureConnection factory nature for the broker(cached/pooled).defaultSTRINGYesNo

EXAMPLE 1

CREATE SINK inputStream WITH (type='jms', map.type='xml', factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost', destination='DAS_JMS_OUTPUT_TEST', connection.factory.type='topic', connection.factory.jndi.name='TopicConnectionFactory') (name string, age int, country string);

This example shows how to publish to an ActiveMQ topic.

EXAMPLE 2

CREATE SINK inputStream WITH (type='jms', map.type='xml', factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='vm://localhost',destination='DAS_JMS_OUTPUT_TEST') (name string, age int, country string);

This example shows how to publish to an ActiveMQ queue. Note that we are not providing properties like connection factory type

kafka (Sink)

A Kafka sink publishes events processed by gdn SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT XML JSON or Binary format. If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Stream App event. To configure a sink to use the Kafka transport, the type parameter should have kafka as its value.

Syntax

CREATE SINK <NAME> WITH (type="kafka", map.type="<STRING>", bootstrap.servers="<STRING>", topic="<STRING>", partition.no="<INT>", sequence.id="<STRING>", key="<STRING>", is.binary.message="<BOOL>", optional.configuration="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
bootstrap.serversThis parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma separated values. e.g., localhost:9092,localhost:9093.STRINGNoNo
topicThe topic to which the Kafka sink needs to publish events. Only one topic must be specified.STRINGNoNo
partition.noThe partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0)0INTYesNo
sequence.idA unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message.nullSTRINGYesNo
keyThe key contains the values that are used to maintain ordering in a Kafka partition.nullSTRINGYesNo
is.binary.messageIn order to send the binary events via kafka sink, this parameter is set to True.nullBOOLNoNo
optional.configurationThis parameter contains all the other possible configurations that the producer is created with. e.g., producer.type:async,batch.size:200nullSTRINGYesNo

EXAMPLE 1

@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);

@info(name = 'query1')
CREATE SINK BarStream WITH (type='kafka', topic='topic_with_partitions', partition.no='0', bootstrap.servers='localhost:9092', map.type='xml') (symbol string, price float, volume long);

insert into BarStream
from FooStream select symbol, price, volume ;

This Kafka sink configuration publishes to 0th partition of the topic named topic_with_partitions.

EXAMPLE 2

@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);

@info(name = 'query1')
CREATE SINK BarStream WITH (type='kafka', topic='{{symbol}}', partition.no='{{volume}}', bootstrap.servers='localhost:9092', map.type='xml') (symbol string, price float, volume long);

insert into BarStream
from FooStream select symbol, price, volume ;

This query publishes dynamic topic and partitions that are taken from the Stream App event. The value for partition.no is taken from the volume attribute, and the topic value is taken from the symbol attribute.

kafkaMultiDC (Sink)

A Kafka sink publishes events processed by gdn SP to a topic with a partition for a Kafka cluster. The events can be published in the TEXT XML JSON or Binary format. If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Stream App event. To configure a sink to publish events via the Kafka transport, and using two Kafka brokers to publish events to the same topic, the type parameter must have kafkaMultiDC as its value.

Syntax

CREATE SINK <NAME> WITH (type="kafkaMultiDC", map.type="<STRING>", bootstrap.servers="<STRING>", topic="<STRING>", sequence.id="<STRING>", key="<STRING>", partition.no="<INT>", is.binary.message="<BOOL>", optional.configuration="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
bootstrap.serversThis parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma -separated values. There must be at least two servers in this list. e.g., localhost:9092,localhost:9093.STRINGNoNo
topicThe topic to which the Kafka sink needs to publish events. Only one topic must be specified.STRINGNoNo
sequence.idA unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message.nullSTRINGYesNo
keyThe key contains the values that are used to maintain ordering in a Kafka partition.nullSTRINGYesNo
partition.noThe partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0)0INTYesNo
is.binary.messageIn order to send the binary events via kafkaMultiDCSink, it is required to set this parameter to true.nullBOOLNoNo
optional.configurationThis parameter contains all the other possible configurations that the producer is created with. e.g., producer.type:async,batch.size:200nullSTRINGYesNo

EXAMPLE 1

@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);

@info(name = 'query1')
CREATE SINK BarStream WITH (type='kafkaMultiDC', topic='myTopic', partition.no='0', bootstrap.servers='host1:9092, host2:9092', map.type='xml') (symbol string, price float, volume long);

insert into BarStream
from FooStream select symbol, price, volume ;

This query publishes to the default (i.e., 0th) partition of the brokers in two data centers

log (Sink)

This is a sink that can be used as a logger. This will log the output events in the output stream with user specified priority and a prefix

Syntax

CREATE SINK <NAME> WITH (type="log", map.type="<STRING>", priority="<STRING>", prefix="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
priorityThis will set the logger priority i.e log level. Accepted values are INFO, DEBUG, WARN, FATAL, ERROR, OFF, TRACEINFOSTRINGYesNo
prefixThis will be the prefix to the output message. If the output stream has event [2,4] and the prefix is given as "Hello" then the log will show "Hello : [2,4]"default prefix will be :STRINGYesNo

EXAMPLE 1

CREATE SINK BarStream WITH (type='log', prefix='My Log', priority='DEBUG') (symbol string, price float, volume long)

In this example BarStream uses log sink and the prefix is given as My Log. Also the priority is set to DEBUG.

EXAMPLE 2

CREATE SINK BarStream WITH (type='log', priority='DEBUG') (symbol string, price float, volume long)

In this example BarStream uses log sink and the priority is set to DEBUG. User has not specified prefix so the default prefix will be in the form \<Stream App App Name> : \<Stream Name>

EXAMPLE 3

CREATE SINK BarStream WITH (type='log', prefix='My Log') (symbol string, price float, volume long)

In this example BarStream uses log sink and the prefix is given as My Log. User has not given a priority so it will be set to default INFO.

EXAMPLE 4

CREATE SINK BarStream WITH (type='log') (symbol string, price float, volume long)

In this example BarStream uses log sink. The user has not given prefix or priority so they will be set to their default values.

nats (Sink)

NATS Sink allows users to subscribe to a NATS broker and publish messages.

Syntax

CREATE SINK <NAME> WITH (type="nats", map.type="<STRING>", destination="<STRING>", bootstrap.servers="<STRING>", client.id="<STRING>", cluster.id="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
destinationSubject name which NATS sink should publish to.STRINGNoYes
bootstrap.serversThe NATS based url of the NATS server.nats://localhost:4222STRINGYesNo
client.idThe identifier of the client publishing/connecting to the NATS broker. Should be unique for each client connecting to the server/cluster.NoneSTRINGYesNo
cluster.idThe identifier of the NATS server/cluster.test-clusterSTRINGYesNo

EXAMPLE 1

CREATE SINK outputStream WITH (type='nats', map.type='xml', destination='SP_NATS_OUTPUT_TEST', bootstrap.servers='nats://localhost:4222',client.id='nats_client',server.id='test-cluster') (name string, age int, country string);

This example shows how to publish to a NATS subject with all supporting configurations. With the following configuration the sink identified as nats-client will publish to a subject named as SP_NATS_OUTPUT_TEST which resides in a nats instance with a cluster id of test-cluster, running in localhost and listening to the port 4222 for client connection.

EXAMPLE 2

CREATE SINK outputStream WITH (type='nats', map.type='xml', destination='SP_NATS_OUTPUT_TEST') (name string, age int, country string);

This example shows how to publish to a NATS subject with mandatory configurations. With the following configuration the sink identified with an auto generated client id will publish to a subject named as SP_NATS_OUTPUT_TEST which resides in a nats instance with a cluster id of test-cluster, running in localhost and listening to the port 4222 for client connection.

prometheus (Sink)

This sink publishes events processed by Stream App into Prometheus metrics and exposes them to the Prometheus server at the specified URL. The created metrics can be published to Prometheus via server or pushGateway, depending on your preference. The metric types that are supported by the Prometheus sink are counter, gauge, histogram, and summary. The values and labels of the Prometheus metrics can be updated through the events.

Syntax

CREATE SINK <NAME> WITH (type="prometheus", map.type="<STRING>", job="<STRING>", publish.mode="<STRING>", push.url="<STRING>", server.url="<STRING>", metric.type="<STRING>", metric.help="<STRING>", metric.name="<STRING>", buckets="<STRING>", quantiles="<STRING>", quantile.error="<DOUBLE>", value.attribute="<STRING>", push.operation="<STRING>", grouping.key="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
jobThis parameter specifies the job name of the metric. This must be the same job name that is defined in the Prometheus configuration file.stream processor JobSTRINGYesNo
publish.modeThe mode in which the metrics need to be exposed to the Prometheus server.The possible publishing modes are server and pushgateway.The server mode exposes the metrics through an HTTP server at the specified URL, and the pushGateway mode pushes the metrics to the pushGateway that needs to be running at the specified URL.serverSTRINGYesNo
push.urlThis parameter specifies the target URL of the Prometheus pushGateway. This is the URL at which the pushGateway must be listening. This URL needs to be defined in the Prometheus configuration file as a target before it can be used here.http://localhost:9091STRINGYesNo
server.urlThis parameter specifies the URL where the HTTP server is initiated to expose metrics in the server publish mode. This URL needs to be defined in the Prometheus configuration file as a target before it can be used here.http://localhost:9080STRINGYesNo
metric.typeThe type of Prometheus metric that needs to be created at the sink. The supported metric types are counter, gauge, histogram and summary.STRINGNoNo
metric.helpA brief description of the metric and its purpose.STRINGYesNo
metric.nameThis parameter allows you to assign a preferred name for the metric. The metric name must match the regex format, i.e., [a-zA-Z:][a-zA-Z0-9:]*.STRINGYesNo
bucketsThe bucket values preferred by the user for histogram metrics. The bucket values must be in the string format with each bucket value separated by a comma as shown in the example below. "2,4,6,8"nullSTRINGYesNo
quantilesThis parameter allows you to specify quantile values for summary metrics as preferred. The quantile values must be in the string format with each quantile value separated by a comma as shown in the example below. "0.5,0.75,0.95"nullSTRINGYesNo
quantile.errorThe error tolerance value for calculating quantiles in summary metrics. This must be a positive value, but less than 1.0.001DOUBLEYesNo
value.attributeThe name of the attribute in the stream definition that specifies the metric value. The defined value attribute must be included in the stream definition. The system increases the metric value for the counter and gauge metric types by the value of the value attribute. The system observes the value of the value attribute for the calculations of summary and histogram metric types.valueSTRINGYesNo
push.operationThis parameter defines the mode for pushing metrics to the pushGateway. The available push operations are push and pushadd. The operations differ according to the existing metrics in pushGateway where push operation replaces the existing metrics, and pushadd operation only updates the newly created metrics.pushaddSTRINGYesNo
grouping.keyThis parameter specifies the grouping key of created metrics in key-value pairs. The grouping key is used only in pushGateway mode in order to distinguish the metrics from already existing metrics. The expected format of the grouping key is as follows: "key1:value1,key2:value2"STRINGYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
jobNameThis property specifies the default job name for the metric. This job name must be the same as the job name defined in the Prometheus configuration file.stream processor JobAny string
publishModeThe default publish mode for the Prometheus sink for exposing metrics to the Prometheus server. The mode can be either server or pushgateway.serverserver or pushgateway
serverURLThis property configures the URL where the HTTP server is initiated to expose metrics. This URL needs to be defined in the Prometheus configuration file as a target to be identified by Prometheus before it can be used here. By default, the HTTP server is initiated at http://localhost:9080.http://localhost:9080Any valid URL
pushURLThis property configures the target URL of the Prometheus pushGateway (where the pushGateway needs to listen). This URL needs to be defined in the Prometheus configuration file as a target to be identified by Prometheus before it can be used here.http://localhost:9091Any valid URL
groupingKeyThis property configures the grouping key of created metrics in key-value pairs. Grouping key is used only in pushGateway mode in order to distinguish these metrics from already existing metrics under the same job. The expected format of the grouping key is as follows: "key1:value1,key2:value2" .nullAny key value pairs in the supported format

EXAMPLE 1

CREATE SINK FooCountStream WITH (type='prometheus',job='fooOrderCount', server.url ='http://localhost:9080', publish.mode='server', metric.type='counter', metric.help= 'Number of foo orders', map.type='keyvalue') (Name String, quantity int, value int);

In the above example, the Prometheus-sink creates a counter metric with the stream name and defined attributes as labels. The metric is exposed through an HTTP server at the target URL.

EXAMPLE 2

CREATE SINK InventoryLevelStream WITH (type='prometheus',job='inventoryLevel', push.url='http://localhost:9080', publish.mode='pushGateway', metric.type='gauge', metric.help= 'Current level of inventory', map.type='keyvalue') (Name String, value int);

In the above example, the Prometheus-sink creates a gauge metric with the stream name and defined attributes as labels.The metric is pushed to the Prometheus pushGateway at the target URL.

rabbitmq (Sink)

The rabbitmq sink pushes the events into a rabbitmq broker using the AMQP protocol

Syntax

CREATE SINK <NAME> WITH (type="rabbitmq", map.type="<STRING>", uri="<STRING>", heartbeat="<INT>", exchange.name="<STRING>", exchange.type="<STRING>", exchange.durable.enabled="<BOOL>", exchange.autodelete.enabled="<BOOL>", delivery.mode="<INT>", content.type="<STRING>", content.encoding="<STRING>", priority="<INT>", correlation.id="<STRING>", reply.to="<STRING>", expiration="<STRING>", message.id="<STRING>", timestamp="<STRING>", type="<STRING>", user.id="<STRING>", app.id="<STRING>", routing.key="<STRING>", headers="<STRING>", tls.enabled="<BOOL>", tls.truststore.path="<STRING>", tls.truststore.password="<STRING>", tls.truststore.type="<STRING>", tls.version="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
uriThe URI that used to connect to an AMQP server. If no URI is specified, an error is logged in the CLI.e.g., amqp://guest:guest, amqp://guest:[email protected]:5672STRINGNoNo
heartbeatThe period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries.60INTYesNo
exchange.nameThe name of the exchange that decides what to do with a message it sends.If the exchange.name already exists in the RabbitMQ server, then the system uses that exchange.name instead of redeclaring.STRINGNoYes
exchange.typeThe type of the exchange.name. The exchange types available are direct, fanout, topic and headers. For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html)directSTRINGYesYes
exchange.durable.enabledIf this is set to true, the exchange remains declared even if the broker restarts.falseBOOLYesYes
exchange.autodelete.enabledIf this is set to true, the exchange is automatically deleted when it is not used anymore.falseBOOLYesYes
delivery.modeThis determines whether the connection should be persistent or not. The value must be either 1 or 2.If the delivery.mode = 1, then the connection is not persistent. If the delivery.mode = 2, then the connection is persistent.1INTYesNo
content.typeThe message content type. This should be the MIME content type.nullSTRINGYesNo
content.encodingThe message content encoding. The value should be MIME content encoding.nullSTRINGYesNo
prioritySpecify a value within the range 0 to 9 in this parameter to indicate the message priority.0INTYesYes
correlation.idThe message correlated to the current message. e.g., The request to which this message is a reply. When a request arrives, a message describing the task is pushed to the queue by the front end server. After that the frontend server blocks to wait for a response message with the same correlation ID. A pool of worker machines listen on queue, and one of them picks up the task, performs it, and returns the result as message. Once a message with right correlation ID arrives, thefront end server continues to return the response to the caller.nullSTRINGYesYes
reply.toThis is an anonymous exclusive callback queue. When the RabbitMQ receives a message with the reply.to property, it sends the response to the mentioned queue. This is commonly used to name a reply queue (or any other identifier that helps a consumer application to direct its response).nullSTRINGYesNo
expirationThe expiration time after which the message is deleted. The value of the expiration field describes the TTL (Time To Live) period in milliseconds.nullSTRINGYesNo
message.idThe message identifier. If applications need to identify messages, it is recommended that they use this attribute instead of putting it into the message payload.nullSTRINGYesYes
timestampTimestamp of the moment when the message was sent. If you do not specify a value for this parameter, the system automatically generates the current date and time as the timestamp value. The format of the timestamp value is dd/mm/yyyy.current timestampSTRINGYesNo
typeThe type of the message. e.g., The type of the event or the command represented by the message.nullSTRINGYesNo
user.idThe user ID specified here is verified by RabbitMQ against theuser name of the actual connection. This is an optional parameter.nullSTRINGYesNo
app.idThe identifier of the application that produced the message.nullSTRINGYesNo
routing.keyThe key based on which the excahnge determines how to route the message to the queue. The routing key is similar to an address for the message.emptySTRINGYesYes
headersThe headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding.nullSTRINGYesYes
tls.enabledThis parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to true, the tls.truststore.path and tls.truststore.password parameters are initialized.falseBOOLYesNo
tls.truststore.pathThe file path to the location of the truststore of the client that sends the RabbitMQ events via the AMQP protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the ${carbon.home}/resources/security directory.\${carbon.home}/resources/security/client-truststore.jksSTRINGYesNo
tls.truststore.passwordThe password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses gdncarbon as the default password.gdncarbonSTRINGYesNo
tls.truststore.typeThe type of the truststore.JKSSTRINGYesNo
tls.versionThe version of the tls/ssl.SSLSTRINGYesNo

EXAMPLE 1

@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);

@info(name = 'query1')
CREATE SINK BarStream WITH (type ='rabbitmq', uri = 'amqp://guest:[email protected]:5672', exchange.name = 'direct', routing.key= 'direct', map.type='xml') (symbol string, price float, volume long);

insert into BarStream
from FooStream select symbol, price, volume ;

This query publishes events to the direct exchange with the direct exchange type and the directTest routing key.

s3 (Sink)

S3 sink publishes events as Amazon AWS S3 buckets.

Syntax

CREATE SINK <NAME> WITH (type="s3", map.type="<STRING>", credential.provider.class="<STRING>", aws.access.key="<STRING>", aws.secret.key="<STRING>", bucket.name="<STRING>", aws.region="<STRING>", versioning.enabled="<BOOL>", object.path="<STRING>", storage.class="<STRING>", content.type="<STRING>", bucket.acl="<STRING>", node.id="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
credential.provider.classAWS credential provider class to be used. If blank along with the username and the password, default credential provider will be used.EMPTY_STRINGSTRINGYesNo
aws.access.keyAWS access key. This cannot be used along with the credential.provider.classEMPTY_STRINGSTRINGYesNo
aws.secret.keyAWS secret key. This cannot be used along with the credential.provider.classEMPTY_STRINGSTRINGYesNo
bucket.nameName of the S3 bucketSTRINGNoNo
aws.regionThe region to be used to create the bucketEMPTY_STRINGSTRINGYesNo
versioning.enabledFlag to enable versioning support in the bucketfalseBOOLYesNo
object.pathPath for each S3 objectSTRINGNoYes
storage.classAWS storage classstandardSTRINGYesNo
content.typeContent type of the eventapplication/octet-streamSTRINGYesYes
bucket.aclAccess control list for the bucketEMPTY_STRINGSTRINGYesNo
node.idThe node ID of the current publisher. This needs to be unique for each publisher instance as it may cause object overwrites while uploading the objects to same S3 bucket from different publishers.EMPTY_STRINGSTRINGYesNo

EXAMPLE 1

CREATE SINK UserStream WITH (type='s3', bucket.name='user-stream-bucket',object.path='bar/users', credential.provider='software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider', flush.size='3', map.type='json', enclosing.element='$.user', map.payload=""""{"name": "{{name}}", "age": {{age}}}"""") (name string, age int);  

This creates a S3 bucket named user-stream-bucket. Then this will collect 3 events together and create a JSON object and save that in S3.

tcp (Sink)

A Stream App application can be configured to publish events via the TCP transport by adding the type='tcp' annotation at the top of an event stream definition.

Syntax

CREATE SINK <NAME> WITH (type="tcp", map.type="<STRING>", url="<STRING>", sync="<STRING>", tcp.no.delay="<BOOL>", keep.alive="<BOOL>", worker.threads="<INT|LONG>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
urlThe URL to which outgoing events should be published via TCP. The URL should adhere to tcp://<host>:<port>/<context> format.STRINGNoNo
syncThis parameter defines whether the events should be published in a synchronized manner or not. If sync = true, then the worker will wait for the ack after sending the message. Else it will not wait for an ack.falseSTRINGYesYes
tcp.no.delayThis is to specify whether to disable Nagle algorithm during message passing. If tcp.no.delay = true, the execution of Nagle algorithm will be disabled in the underlying TCP logic. Hence there will be no delay between two successive writes to the TCP connection. Else there can be a constant ack delay.trueBOOLYesNo
keep.aliveThis property defines whether the server should be kept alive when there are no connections available.trueBOOLYesNo
worker.threadsNumber of threads to publish events.10INT LONGYesNo

EXAMPLE 1

CREATE SINK Foo WITH (type = 'tcp', url='tcp://localhost:8080/abc', sync='true' map.type='binary') (attribute1 string, attribute2 int);

A sink of type tcp has been defined. All events arriving at Foo stream via TCP transport will be sent to the url tcp://localhost:8080/abc in a synchronous manner.

Sinkmapper

avro (Sink Mapper)

This extension is a Stream App Event to Avro Message output mapper.Transports that publish messages to Avro sink can utilize this extension to convert Stream App events to Avro messages. You can either specify the Avro schema or provide the schema registry URL and the schema reference ID as parameters in the stream definition. If no Avro schema is specified, a flat Avro schema of the record type is generated with the stream attributes as schema fields.

Syntax

CREATE SINK <NAME> WITH (map.type="avro", map.schema.def="<STRING>", map.schema.registry="<STRING>", map.schema.id="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
schema.defThis specifies the required Avro schema to be used to convert Stream App events to Avro messages. The schema needs to be specified as a quoted JSON string.STRINGNoNo
schema.registryThis specifies the URL of the schema registry.STRINGNoNo
schema.idThis specifies the ID of the avro schema. This ID is the global ID that is returned from the schema registry when posting the schema to the registry. The specified ID is used to retrieve the schema from the schema registry.STRINGNoNo

EXAMPLE 1

CREATE SINK StockStream WITH (type='inMemory', topic='stock', map.type='avro', map.schema.def = """{"type":"record","name":"stock","namespace":"stock.example","fields":[{"name":"symbol","type":"string"},{"name":"price","type":"float"},{"name":"volume","type":"long"}]}""") (symbol string, price float, volume long);

The above configuration performs a default Avro mapping that generates an Avro message as an output ByteBuffer.

EXAMPLE 2

CREATE SINK StockStream WITH (type='inMemory', topic='stock', map.type='avro', map.schema.registry = 'http://localhost:8081', map.schema.id ='22', map.payload=""""{"Symbol":{{symbol}},"Price":{{price}},"Volume":{{volume}}}"""") (symbol string, price float, volume long);

The above configuration performs a custom Avro mapping that generates an Avro message as an output ByteBuffer. The Avro schema is retrieved from the given schema registry (localhost:8081) using the schema ID provided.

binary (Sink Mapper)

This section explains how to map events processed via Stream App in order to publish them in the binary format.

Syntax

CREATE SINK <NAME> WITH (map.type="binary")

EXAMPLE 1

CREATE SINK FooStream WITH (type='inMemory', topic='gdn', map.type='binary') (symbol string, price float, volume long);

This will publish Stream App event in binary format.

csv (Sink Mapper)

This output mapper extension allows you to convert Stream App events processed by the gdn SP to CSV message before publishing them. You can either use custom placeholder to map a custom CSV message or use pre-defined CSV format where event conversion takes place without extra configurations.

Syntax

CREATE SINK <NAME> WITH (map.type="csv", map.delimiter="<STRING>", map.header="<BOOL>", map.event.grouping.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
delimiterThis parameter used to separate the output CSV data, when converting a Stream App event to CSV format,,STRINGYesNo
headerThis parameter specifies whether the CSV messages will be generated with header or not. If this parameter is set to true, message will be generated with headerfalseBOOLYesNo
event.grouping.enabledIf this parameter is set to true, events are grouped via a line.separator when multiple events are received. It is required to specify a value for the System.lineSeparator() when the value for this parameter is true.falseBOOLYesNo

EXAMPLE 1

CREATE SINK BarStream WITH (type='inMemory', topic='{{symbol}}', map.type='csv') (symbol string, price float, volume long);

Above configuration will perform a default CSV output mapping, which will generate output as follows: symbol-price-volumegdn-55.6-100

If header is true and delimiter is "-", then the output will be as follows: symbol-price-volume

EXAMPLE 2

CREATE SINK BarStream WITH (type='inMemory', topic='{{symbol}}', map.type='csv', map.header='true', map.delimiter='-', map.payload="symbol='0',price='2',volume='1'") (symbol string, price float,volume long);

Above configuration will perform a custom CSV mapping. Here, user can add custom place order in the @payload. The place order indicates that where the attribute name's value will be appear in the output message, The output will be produced output as follows: gdn,100,55.6

If header is true and delimiter is "-", then the output will be as follows: price-volume-symbol 55.6-100-gdn

If event grouping is enabled, then the output is as follows: price-volume-symbol 55.6-100-gdnSystem.lineSeparator() 55.6-100-IBMSystem.lineSeparator() 55.6-100-IFSSystem.lineSeparator()

json (Sink Mapper)

This extension is an Event to JSON output mapper. Transports that publish messages can utilize this extension to convert Stream App events to JSON messages. You can either send a pre-defined JSON format or a custom JSON message.

Syntax

CREATE SINK <NAME> WITH (map.type="json", map.validate.json="<BOOL>", map.enclosing.element="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
validate.jsonIf this property is set to true, it enables JSON validation for the JSON messages generated. When validation is carried out, messages that do not adhere to proper JSON standards are dropped. This property is set to false by default.falseBOOLYesNo
enclosing.elementThis specifies the enclosing element to be used if multiple events are sent in the same JSON message. Stream App treats the child elements of the given enclosing element as events and executes JSON expressions on them. If an enclosing.element is not provided, the multiple event scenario is disregarded and JSON path is evaluated based on the root element.\$STRINGYesNo

EXAMPLE 1

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='json') (symbol string, price float, volume long);

EXAMPLE 2

CREATE SINK BarStream WITH (type='inMemory', topic='{{symbol}}', map.type='json', map.enclosing.element='$.portfolio', map.validate.json='true', map.payload="""{"StockData":{"Symbol":"{{symbol}}","Price":{{price}}}}""") (symbol string, price float, volume long);

keyvalue (Sink Mapper)

The Event to Key-Value Map output mapper extension allows you to convert Stream App events processed by gdn SP to key-value map events before publishing them. You can either use pre-defined keys where conversion takes place without extra configurations, or use custom keys with which the messages can be published.

Syntax

CREATE SINK <NAME> WITH (map.type="keyvalue")

EXAMPLE 1

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='keyvalue') (symbol string, price float, volume long);

This query performs a default Key-Value output mapping. The expected output is something similar to the following: symbol:gdn price : 55.6f volume: 100L

EXAMPLE 2

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='keyvalue', map.payload="a='symbol',b='price',c='volume'") (symbol string, price float, volume long);

This query performs a custom Key-Value output mapping where values are passed as objects. Values for symbol, price, and volume attributes are published with the keys a, b and c respectively. The expected output is a map similar to the following: a:gdn b : 55.6f c: 100L

EXAMPLE 3

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='keyvalue', map.payload="a='{{symbol}} is here',b='`price`',c='volume'") (symbol string, price float, volume long);

This query performs a custom Key-Value output mapping where the values of the a and b attributes are strings and c is object. The expected output should be a Map similar to the following: a:gdn is here b : price c: 100L

passThrough (Sink Mapper)

Pass-through mapper passed events (Event[]) through without any mapping or modifications.

Syntax

CREATE SINK <NAME> WITH (map.type="passThrough")

EXAMPLE 1

CREATE SINK BarStream WITH (type='inMemory', map.type='passThrough') (symbol string, price float, volume long);

In the following example BarStream uses passThrough outputmapper which emit Stream App event directly without any transformation into sink.

protobuf (Sink Mapper)

This output mapper allows you to convert Events to protobuf messages before publishing them. To work with this mapper you have to add auto-generated protobuf classes to the project classpath. When you use this output mapper, you can either define stream attributes as the same names as the protobuf message attributes or you can use custom mapping to map stream definition attributes with the protobuf attributes. When you use this mapper with stream processor-io-grpc you don't have to provide the protobuf message class in the class parameter.

Syntax

CREATE SINK <NAME> WITH (type="protobuf", class="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
classThis specifies the class name of the protobuf message class, If sink type is grpc then it's not necessary to provide this parameter.-STRINGYesNo

EXAMPLE 1

CREATE SINK BarStream WITH (type='inMemory', topic='test01', map.type='protobuf', map.class='io.streamprocessor.extension.map.protobuf.autogenerated.Request') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

This will map BarStream values into io.streamprocessor.extension.map.protobuf.autogenerated.Request protobuf message type.

EXAMPLE 2

CREATE SINK BarStream WITH (type='grpc', publisher.url='grpc://localhost:2000/org.gdn.grpc.test.MyService/process, map.type='protobuf') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double)

Above definition will map BarStream values into the protobuf messages. Since this is a grpc sink, protobuf mapper will get the type of the protobuf class by the publisher.url.

EXAMPLE 3

CREATE SINK BarStream WITH (type='grpc', publisher.url = 'grpc://localhost:2000/org.gdn.grpc.test.MyService/process, map.type='protobuf', map.payload="stringValue='a',longValue='b',intValue='c',booleanValue='d',floatValue = 'e', doubleValue  = 'f'") (a string, b long, c int,d bool,e float,f double);

This will map BarStream values to request message type of the process method in MyService service. and stream values will map like this, - value of a will be assign stringValue variable in the message class

  • value of b will be assign longValue variable in the message class
  • value of c will be assign intValue variable in the message class - value of d will be assign booleanValue variable in the message class
  • value of e will be assign floatValue variable in the message class
  • value of f will be assign doubleValue variable in the message class

EXAMPLE 4

CREATE SINK BarStream WITH (type='inMemory', topic='test01', map.type='protobuf', map.class='io.streamprocessor.extension.map.protobuf.autogenerated.RequestWithList') (stringValue string,intValue int,stringList object, intList object);

This will map BarStream values into io.streamprocessor.extension.map.protobuf.autogenerated.RequestWithList. If you want to map data types other than the scalar data types, you have to use object as the data type as shown in above(stringList object).

text (Sink Mapper)

This extension is a Event to Text output mapper. Transports that publish text messages can utilize this extension to convert the Stream App events to text messages. Users can use a pre-defined text format where event conversion is carried out without any additional configurations, or use custom placeholder(using {{ and }}) to map custom text messages. Again, you can also enable mustache based custom mapping. In mustache based custom mapping you can use custom placeholder (using {{ and }} or {{{ and }}}) to map custom text. In mustache based custom mapping, all variables are HTML escaped by default. For example: & is replaced with &amp; " is replaced with &quot; = is replaced with &#61; If you want to return unescaped HTML, use the triple mustache {{{ instead of double {{.

Syntax

CREATE SINK <NAME> WITH (type="text", event.grouping.enabled="<BOOL>", delimiter="<STRING>", new.line.character="<STRING>", mustache.enabled="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
event.grouping.enabledIf this parameter is set to true, events are grouped via a delimiter when multiple events are received. It is required to specify a value for the delimiter parameter when the value for this parameter is true.falseBOOLYesNo
delimiterThis parameter specifies how events are separated when a grouped event is received. This must be a whole line and not a single character.~~~~~~~~~~STRINGYesNo
new.line.characterThis attribute indicates the new line character of the event that is expected to be received. This is used mostly when communication between 2 types of operating systems is expected. For example, Linux uses \n whereas Windows uses \r\n as the end of line character.\nSTRINGYesNo
mustache.enabledIf this parameter is set to true, then mustache mapping gets enabled forcustom text mapping.falseBOOLYesNo

EXAMPLE 1

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='text') (symbol string, price float, volume long);

This query performs a default text input mapping. The expected output is as follows: symbol:"gdn", price:55.6, volume:100

EXAMPLE 2

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='text', map.event.grouping.enabled='true') (symbol string, price float, volume long);

This query performs a default text input mapping with event grouping. The expected output is as follows: symbol:"gdn", price:55.6, volume:100 ~~ symbol:"gdn", price:55.6, volume:100

EXAMPLE 3

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='text',  map.payload="SensorID : {{symbol}}/{{volume}}, SensorPrice : Rs{{price}}/=, Value : {{volume}}ml") (symbol string, price float, volume long);

This query performs a custom text mapping. The expected output is as follows: SensorID : gdn/100, SensorPrice : Rs1000/=, Value : 100ml for the following stream processor event. {gdn,1000,100}

EXAMPLE 4

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='text', map.event.grouping.enabled='true', map.payload="Stock price of {{symbol}} is {{price}}") (symbol string, price float, volume long);

This query performs a custom text mapping with event grouping. The expected output is as follows: Stock price of gdn is 55.6 ~~ Stock price of gdn is 55.6 ~~\ Stock price of gdn is 55.6 for the following stream processor event. {gdn,55.6,10}

EXAMPLE 5

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='text', map.mustache.enabled='true',  map.payload="SensorID : {{{symbol}}}/{{{volume}}}, SensorPrice : Rs{{{price}}}/=, Value : {{{volume}}}ml") (symbol string, price float, volume long);

This query performs a custom text mapping to return unescaped HTML. The expected output is as follows: SensorID : a&b/100, SensorPrice : Rs1000/=, Value : 100ml for the following stream processor event. {a&b,1000,100}

xml (Sink Mapper)

This mapper converts Stream App output events to XML before they are published via transports that publish in XML format. Users can either send a pre-defined XML format or a custom XML message containing event data.

Syntax

CREATE SINK <NAME> WITH (map.type="xml", validate.xml="<BOOL>", enclosing.element="<STRING>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
validate.xmlThis parameter specifies whether the XML messages generated should be validated or not. If this parameter is set to true, messages that do not adhere to proper XML standards are dropped.falseBOOLYesNo
enclosing.elementWhen an enclosing element is specified, the child elements (e.g., the immediate child elements) of that element are considered as events. This is useful when you need to send multiple events in a single XML message. When an enclosing element is not specified, one XML message per every event will be emitted without enclosing.None in custom mapping and \<events> in default mappingSTRINGYesNo

EXAMPLE 1

CREATE SINK FooStream WITH (type='inMemory', topic='stock', map.type='xml') (symbol string, price float, volume long);

Above configuration will do a default XML input mapping which will generate below output:

<events>     
<event>
<symbol>gdn</symbol>
<price>55.6</price>
<volume>100</volume>
</event>
</events>

EXAMPLE 2

CREATE SINK BarStream WITH (type='inMemory', topic='{{symbol}}', map.type='xml', map.enclosing.element='<portfolio>', map.validate.xml='true', map.payload="<StockData><Symbol>{{symbol}}</Symbol><Price>{{price}}</Price></StockData>") (symbol string, price float, volume long);

Above configuration will perform a custom XML mapping. Inside \@payload you can specify the custom template that you want to send the messages out and addd placeholders to places where you need to add event attributes.Above config will produce below output XML message:

<portfolio>     
<StockData>
<Symbol>gdn</Symbol>
<Price>55.6</Price>
</StockData>
</portfolio>

Source

cdc (Source)

The CDC source receives events when change events (i.e., INSERT, UPDATE, DELETE) are triggered for a database table. Events are received in the key-value format. There are two modes you could perform CDC: Listening mode and Polling mode. In polling mode, the datasource is periodically polled for capturing the changes. The polling period can be configured. In polling mode, you can only capture INSERT and UPDATE changes. On listening mode, the Source will keep listening to the Change Log of the database and notify in case a change has taken place. Here, you are immediately notified about the change, compared to polling mode. The key values of the map of a CDC change event are as follows. For listening mode: For insert: Keys are specified as columns of the table. For delete: Keys are followed by the specified table columns. This is achieved via before_. e.g., specifying before_X results in the key being added before the column named X. For update: Keys are followed followed by the specified table columns. This is achieved via before_. e.g., specifying before_X results in the key being added before the column named X. For polling mode: Keys are specified as the columns of the table.#### Preparations required for working with Oracle Databases in listening mode Using the extension in Windows, Mac OSX and AIX are pretty straight forward inorder to achieve the required behaviour please follow the steps given below - Download the compatible version of oracle instantclient for the database version from [here](https://www.oracle.com/database/technologies/instant-client/downloads.html) and extract - Extract and set the environment variable LD_LIBRARY_PATH to the location of instantclient which was exstracted as shown below

    export LD_LIBRARY_PATH=<path to the instant client location>
  • Inside the instantclient folder which was download there are two jars xstreams.jar and ojdbc<version>.jar convert them to OSGi bundles using the tools which were provided in the <distribution>/bin for converting the ojdbc.jar use the tool spi-provider.sh|bat and for the conversion of xstreams.jar use the jni-provider.sh as shown below(Note: this way of converting Xstreams jar is applicable only for Linux environments for other OSs this step is not required and converting it through the jartobundle.sh tool is enough)

        ./jni-provider.sh <input-jar> <destination> <comma seperated native library names>

once ojdbc and xstreams jars are converted to OSGi copy the generated jars to the <distribution>/lib. Currently streamprocessor-io-cdc only supports the oracle database distributions 12 and above See parameter: mode for supported databases and change events.

Syntax

CREATE SOURCE <NAME> WITH (type="cdc", map.type="<STRING>", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", wait.on.missed.record="<BOOL>", missed.record.waiting.timeout="<INT>") 

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
urlThe connection URL to the database. F=The format used is: jdbc:mysql://<host>:<port>/<database_name>STRINGNoNo
modeMode to capture the change data. The type of events that can be received, and the required parameters differ based on the mode. The mode can be one of the following: polling: This mode uses a column named polling.column to monitor the given table. It captures change events of the RDBMS, INSERT, and UPDATE types. listening: This mode uses logs to monitor the given table. It currently supports change events only of the MySQL, INSERT, UPDATE, and DELETE types.listeningSTRINGYesNo
jdbc.driver.nameThe driver class name for connecting the database. It is required to specify a value for this parameter when the mode is polling.STRINGYesNo
usernameThe username to be used for accessing the database. This user needs to have the SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENTprivileges for the change data capturing table (specified via the table.name parameter). To operate in the polling mode, the user needs SELECT privileges.STRINGNoNo
passwordThe password of the username you specified for accessing the database.STRINGNoNo
pool.propertiesThe pool parameters for the database connection can be specified as key-value pairs.STRINGYesNo
datasource.nameName of the gdn datasource to connect to the database. When datasource name is provided, the URL, username and password are not needed. A datasource based connection is given more priority over the URL based connection. This parameter is applicable only when the mode is set to polling, and it can be applied only when you use this extension with gdn Stream Processor.STRINGYesNo
table.nameThe name of the table that needs to be monitored for data changes.STRINGNoNo
polling.columnThe column name that is polled to capture the change data. It is recommended to have a TIMESTAMP field as the polling.column in order to capture the inserts and updates. Numeric auto-incremental fields and char fields can also be used as polling.column. However, note that fields of these types only support insert change capturing, and the possibility of using a char field also depends on how the data is input. It is required to enter a value for this parameter only when the mode is polling.STRINGYesNo
polling.intervalThe time interval (specified in seconds) to poll the given table for changes. This parameter is applicable only when the mode is set to polling.1INTYesNo
operationThe change event operation you want to carry out. Possible values are insert, update or delete. This parameter is not case sensitive. It is required to specify a value only when the mode is listening.STRINGNoNo
connector.propertiesHere, you can specify Debezium connector properties as a comma-separated string. The properties specified here are given more priority over the parameters. This parameter is applicable only for the listening mode.Empty_StringSTRINGYesNo
database.server.idAn ID to be used when joining MySQL database cluster to read the bin log. This should be a unique integer between 1 to 2\^32. This parameter is applicable only when the mode is listening.Random integer between 5400 and 6400STRINGYesNo
database.server.nameA logical name that identifies and provides a namespace for the database server. This parameter is applicable only when the mode is listening.{host}_{port}STRINGYesNo
wait.on.missed.recordIndicates whether the process needs to wait on missing/out-of-order records. When this flag is set to true the process will be held once it identifies a missing record. The missing recrod is identified by the sequence of the polling.column value. This can be used only with number fields and not recommended to use with time values as it will not be sequential. This should be enabled ONLY where the records can be written out-of-order, (eg. concurrent writers) as this degrades the performance.falseBOOLYesNo
missed.record.waiting.timeoutThe timeout (specified in seconds) to retry for missing/out-of-order record. This should be used along with the wait.on.missed.record parameter. If the parameter is not set, the process will indefinitely wait for the missing record.-1INTYesNo

EXAMPLE 1

CREATE SOURCE inputStream WITH (type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'cdcuser', password = 'pswd4cdc', table.name = 'students', operation = 'insert',
map.type='keyvalue', map.attributes="id='id', name='name'") (id string, name string);

In this example, the CDC source listens to the row insertions that are made in the students table with the column name, and the ID. This table belongs to the SimpleDB MySQL database that can be accessed via the given URL.

EXAMPLE 2

CREATE SOURCE inputStream WITH (type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'cdcuser', password = 'pswd4cdc', table.name = 'students', operation = 'update',
map.type='keyvalue', map.attributes="id='id', name='name', before_id='before_id', before_name='before_name'") (before_id string, id string, before_name string , name string);

In this example, the CDC source listens to the row updates that are made in the students table. This table belongs to the SimpleDB MySQL database that can be accessed via the given URL.

EXAMPLE 3

CREATE SOURCE inputStream WITH (type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'cdcuser', password = 'pswd4cdc', table.name = 'students', operation = 'delete',
map.type='keyvalue', map.attributes="before_id='before_id', before_name='before_name'") (before_id string, before_name string);

In this example, the CDC source listens to the row deletions made in the students table. This table belongs to the SimpleDB database that can be accessed via the given URL.

EXAMPLE 4

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'id', jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'cdcuser', password = 'pswd4cdc', table.name = 'students', map.type='keyvalue', map.attributes="id='id', name='name'") (id int, name string);

In this example, the CDC source polls the students table for inserts. id that is specified as the polling column is an auto incremental field. The connection to the database is made via the URL, username, password, and the JDBC driver name.

EXAMPLE 5

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB', table.name = 'students', map.type='keyvalue', map.attributes="id='id', name='name'") (id int, name string);

In this example, the CDC source polls the students table for inserts. The given polling column is a char column with the S001, S002, ... . pattern. The connection to the database is made via a data source named SimpleDB. Note that the datasource.name parameter works only with the Stream Processor.

EXAMPLE 6

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB', table.name = 'students', map.type='keyvalue') (name string);

In this example, the CDC source polls the students table for inserts and updates. The polling column is a timestamp field.

EXAMPLE 7

CREATE SOURCE inputStream WITH (type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', password='pswd4cdc', table.name='students', mode='polling', polling.column='id', operation='insert', wait.on.missed.record='true', missed.record.waiting.timeout='10', map.type='keyvalue', map.attributes="batch_no='batch_no', item='item', qty='qty'") (id int, name string);

In this example, the CDC source polls the students table for inserts. The polling column is a numeric field. This source expects the records in the database to be written concurrently/out-of-order so it waits if it encounters a missing record. If the record doesn't appear within 10 seconds it resumes the process.

EXAMPLE 8

CREATE SOURCE insertSweetProductionStream WITH (type = 'cdc', url = 'jdbc:oracle:thin://localhost:1521/ORCLCDB', username='c##xstrm', password='xs', table.name='DEBEZIUM.sweetproductiontable', operation = 'insert', connector.properties='oracle.outserver.name=DBZXOUT,oracle.pdb=ORCLPDB1' map.type='keyvalue') (ID int, NAME string, WEIGHT int);

In this example, the CDC source connect to an Oracle database and listens for insert queries of sweetproduction table

email (Source)

The Email source allows you to receive events via emails. An Email source can be configured using the imap or pop3 server to receive events. This allows you to filter the messages that satisfy the criteria specified under the search term option. The email source parameters can be defined in either the <SP_HOME>/conf/<PROFILE>/deployment yaml file or the stream definition. If the parameter configurations are not available in either place, the default values are considered (i.e., if default values are available). If you need to configure server system parameters that are not provided as options in the stream definition, they need to be defined in the deployment yaml file under email source properties. For more information about imap and pop3 server system parameters, see the following. [JavaMail Reference Implementation - IMAP Store](https://javaee.github.io/javamail/IMAP-Store) [JavaMail Reference Implementation - POP3 Store Store](https://javaee.github.io/javamail/POP3-Store)

Syntax

CREATE SOURCE <NAME> WITH (type="email", map.type="<STRING>", username="<STRING>", password="<STRING>", store="<STRING>", host="<STRING>", port="<INT>", folder="<STRING>", search.term="<STRING>", polling.interval="<LONG>", action.after.processed="<STRING>", folder.to.move="<STRING>", content.type="<STRING>", ssl.enable="<BOOL>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
usernameThe user name of the email account. e.g., gdnmail is the username of the [email protected] mail account.STRINGNoNo
passwordThe password of the email accountSTRINGNoNo
storeThe store type that used to receive emails. Possible values are imap and pop3.imapSTRINGYesNo
hostThe host name of the server (e.g., imap.gmail.com is the host name for a gmail account with an IMAP store.). The default value imap.gmail.com is only valid if the email account is a gmail account with IMAP enabled.If store type is imap, then the default value is imap.gmail.com. If the store type is pop3, then the default value is pop3.gmail.com.STRINGYesNo
portThe port that is used to create the connection.993, the default value is valid only if the store is imap and ssl-enabled.INTYesNo
folderThe name of the folder to which the emails should be fetched.INBOXSTRINGYesNo
search.termThe option that includes conditions such as key-value pairs to search for emails. In a string search term, the key and the value should be separated by a semicolon (;). Each key-value pair must be within inverted commas ( ). The string search term can define multiple comma-separated key-value pairs. This string search term currently supports only the subject, from, to, bcc, and cc keys. e.g., if you enter subject:DAS, from:carbon, bcc:gdn, the search term creates a search term instance that filters emails that contain DAS in the subject, carbon in the from address, and gdn in one of the bcc addresses. The string search term carries out sub string matching that is case-sensitive. If @ in included in the value for any key other than the subject key, it checks for an address that is equal to the value given. e.g., If you search for [email protected], the string search terms looks for an address that contains abc before the @ symbol.NoneSTRINGYesNo
polling.intervalThis defines the time interval in seconds at which th email source should poll the account to check for new mail arrivals.in seconds.600LONGYesNo
action.after.processedThe action to be performed by the email source for the processed mail. Possible values are as follows: FLAGGED: Sets the flag as flagged. SEEN: Sets the flag as read. ANSWERED: Sets the flag as answered. DELETE: Deletes tha mail after the polling cycle. MOVE: Moves the mail to the folder specified in the folder.to.move parameter. If the folder specified is pop3, then the only option available is DELETE.NONESTRINGYesNo
folder.to.moveThe name of the folder to which the mail must be moved once it is processed. If the action after processing is MOVE, it is required to specify a value for this parameter.STRINGNoNo
content.typeThe content type of the email. It can be either text/plain or text/html.text/plainSTRINGYesNo
ssl.enableIf this is set to true, a secure port is used to establish the connection. The possible values are true and false.trueBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
mail.imap.partialfetchThis determines whether the IMAP partial-fetch capability should be used.truetrue or false
mail.imap.fetchsizeThe partial fetch size in bytes.16Kvalue in bytes
mail.imap.peekIf this is set to true, the IMAP PEEK option should be used when fetching body parts to avoid setting the SEEN flag on messages. The default value is false. This can be overridden on a per-message basis by the setPeek method in IMAPMessage.falsetrue or false
mail.imap.connectiontimeoutThe socket connection timeout value in milliseconds. This timeout is implemented by java.net.Socket.infinity timeoutAny Integer value
mail.imap.timeoutThe socket read timeout value in milliseconds. This timeout is implemented by java.net.Socket.infinity timeoutAny Integer value
mail.imap.writetimeoutThe socket write timeout value in milliseconds. This timeout is implemented by using a java.util.concurrent.ScheduledExecutorService per connection that schedules a thread to close the socket if the timeout period elapses. Therefore, the overhead of using this timeout is one thread per connection.infinity timeoutAny Integer value
mail.imap.statuscachetimeoutThe timeout value in milliseconds for the cache of STATUS command response.1000msTime out in miliseconds
mail.imap.appendbuffersizeThe maximum size of a message to buffer in memory when appending to an IMAP folder.NoneAny Integer value
mail.imap.connectionpoolsizeThe maximum number of available connections in the connection pool.1Any Integer value
mail.imap.connectionpooltimeoutThe timeout value in milliseconds for connection pool connections.45000msAny Integer
mail.imap.separatestoreconnectionIf this parameter is set to true, it indicates that a dedicated store connection needs to be used for store commands.truetrue or false
mail.imap.auth.login.disableIf this is set to true, it is not possible to use the non-standard AUTHENTICATE LOGIN command instead of the plain LOGIN command.falsetrue or false
mail.imap.auth.plain.disableIf this is set to true, the AUTHENTICATE PLAIN command cannot be used.falsetrue or false
mail.imap.auth.ntlm.disableIf true, prevents use of the AUTHENTICATE NTLM command.falsetrue or false
mail.imap.proxyauth.userIf the server supports the PROXYAUTH extension, this property specifies the name of the user to act as. Authentication to log in to the server is carried out using the administrator's credentials. After authentication, the IMAP provider issues the PROXYAUTH command with the user name specified in this property.NoneValid string value
mail.imap.localaddressThe local address (host name) to bind to when creating the IMAP socket.Defaults to the address picked by the Socket class.Valid string value
mail.imap.localportThe local port number to bind to when creating the IMAP socket.Defaults to the port number picked by the Socket class.Valid String value
mail.imap.sasl.enableIf this parameter is set to true, the system attempts to use the javax.security.sasl package to choose an authentication mechanism for the login.falsetrue or false
mail.imap.sasl.mechanismsA list of SASL mechanism names that the system should to try to use. The names can be separated by spaces or commas.NoneValid string value
mail.imap.sasl.authorizationidThe authorization ID to use in the SASL authentication.If this parameter is not set, the authentication ID (username) is used.Valid string value
mail.imap.sasl.realmThe realm to use with SASL authentication mechanisms that require a realm, such as DIGEST-MD5.NoneValid string value
mail.imap.auth.ntlm.domainThe NTLM authentication domain.NoneValid string value
The NTLM authentication domain.NTLM protocol-specific flags.NoneValid integer value
mail.imap.socketFactoryIf this parameter is set to a class that implements the javax.net.SocketFactory interface, this class is used to create IMAP sockets.NoneValid SocketFactory
mail.imap.socketFactory.classIf this parameter is set, it specifies the name of a class that implements the javax.net.SocketFactory interface. This class is used to create IMAP sockets.NoneValid string
mail.imap.socketFactory.fallbackIf this parameter is set to true, failure to create a socket using the specified socket factory class results in the socket being created using the java.net.Socket class.truetrue or false
mail.imap.socketFactory.portThis specifies the port to connect to when using the specified socket factory. If this parameter is not set, the default port is used.143Valid Integer
mail.imap.ssl.checkserveridentityIf this parameter is set to true, the system checks the server identity as specified by RFC 2595.falsetrue or false
mail.imap.ssl.trustIf this parameter is set and a socket factory has not been specified, it enables the use of a MailSSLSocketFactory. If this parameter is set to *, all the hosts are trusted. If this parameter specifies list of hosts separated by white spaces, only those hosts are trusted. If the parameter is not set to any of the values mentioned above, trust depends on the certificate presented by the server.*Valid String
mail.imap.ssl.socketFactoryIf this parameter is set to a class that extends the javax.net.ssl.SSLSocketFactory class this class is used to create IMAP SSL sockets.NoneSSL Socket Factory
mail.imap.ssl.socketFactory.classIf this parameter is set, it specifies the name of a class that extends the javax.net.ssl.SSLSocketFactory class. This class is used to create IMAP SSL sockets.NoneValid String
mail.imap.ssl.socketFactory.portThis specifies the port to connect to when using the specified socket factory.the default port 993 is used.valid port number
mail.imap.ssl.protocolsThis specifies the SSL protocols that are enabled for SSL connections. The property value is a whitespace-separated list of tokens acceptable to the javax.net.ssl.SSLSocket.setEnabledProtocols method.NoneValid string
mail.imap.starttls.enableIf this parameter is set to true, it is possible to use the STARTTLS command (if supported by the server) to switch the connection to a TLS-protected connection before issuing any login commands.falsetrue or false
mail.imap.socks.hostThis specifies the host name of a SOCKS5 proxy server that is used to connect to the mail server.NoneValid String
mail.imap.socks.portThis specifies the port number for the SOCKS5 proxy server. This is needed if the proxy server is not using the standard port number 1080.1080Valid String
mail.imap.minidletimeThis property sets the delay in milliseconds.10 millisecondstime in seconds (Integer)
mail.imap.enableimapeventsIf this property is set to true, it enables special IMAP-specific events to be delivered to the ConnectionListener of the store. The unsolicited responses received during the idle method of the store are sent as connection events with IMAPStore.RESPONSE as the type. The event's message is the raw IMAP response string.falsetrue or false
mail.imap.folder.classThe class name of a subclass of com.sun.mail.imap.IMAPFolder. The subclass can be used to provide support for additional IMAP commands. The subclass must have public constructors of the form public MyIMAPFolder(String fullName, char separator, IMAPStore store, Boolean isNamespace) and public MyIMAPFolder(ListInfo li, IMAPStore store)NoneValid String
mail.pop3.connectiontimeoutThe socket connection timeout value in milliseconds.Infinite timeoutInteger value
mail.pop3.timeoutThe socket I/O timeout value in milliseconds.Infinite timeoutInteger value
mail.pop3.message.classThe class name of a subclass of com.sun.mail.pop3.POP3Message.NoneValid String
mail.pop3.localaddressThe local address (host name) to bind to when creating the POP3 socket.Defaults to the address picked by the Socket class.Valid String
mail.pop3.localportThe local port number to bind to when creating the POP3 socket.Defaults to the port number picked by the Socket class.Valid port number
mail.pop3.apop.enableIf this parameter is set to true, use APOP instead of USER/PASS to log in to the POP3 server (if the POP3 server supports APOP). APOP sends a digest of the password instead of clearing the text password.falsetrue or false
mail.pop3.socketFactoryIf this parameter is set to a class that implements the javax.net.SocketFactory interface, this class is used to create POP3 sockets.NoneSocket Factory
mail.pop3.socketFactory.classIf this parameter is set, it specifies the name of a class that implements the javax.net.SocketFactory interface. This class is used to create POP3 sockets.NoneValid String
mail.pop3.socketFactory.fallbackIf this parameter is set to true, failure to create a socket using the specified socket factory class results in the socket being created using the java.net.Socket class.falsetrue or false
mail.pop3.socketFactory.portThis specifies the port to connect to when using the specified socket factory.Default portValid port number
mail.pop3.ssl.checkserveridentityIf this parameter is set to true, check the server identity as specified by RFC 2595.falsetrue or false
mail.pop3.ssl.trustIf this parameter is set and a socket factory has not been specified, it is possible to use a MailSSLSocketFactory. If this parameter is set to *, all the hosts are trusted. If the parameter is set to a whitespace-separated list of hosts, only those hosts are trusted. If the parameter is not set to any of the values mentioned above, trust depends on the certificate presented by the server.*Valid String
mail.pop3.ssl.socketFactoryIf this parameter is set to a class that extends the javax.net.ssl.SSLSocketFactory class, this class is used to create POP3 SSL sockets.NoneSSL Socket Factory
mail.pop3.ssl.checkserveridentityIf this parameter is set to true, the system checks the server identity as specified by RFC 2595.falsetrue or false
mail.pop3.ssl.trustIf this parameter is set and a socket factory has not been specified, it is possible to use a MailSSLSocketFactory. If this parameter is set to *, all the hosts are trusted. If the parameter is set to a whitespace-separated list of hosts, only those hosts are trusted.Trust depends on the certificate presented by the server.Valid String
mail.pop3.ssl.socketFactoryIf this parameter is set to a class that extends the javax.net.ssl.SSLSocketFactory class, this class is used to create POP3 SSL sockets.NoneSSL Socket Factory
mail.pop3.ssl.socketFactory.classIf this parameter is set, it specifies the name of a class that extends the javax.net.ssl.SSLSocketFactory class. This class is used to create POP3 SSL sockets.NoneValid String
mail.pop3.ssl.socketFactory.pThis parameter pecifies the port to connect to when using the specified socket factory.995Valid Integer
mail.pop3.ssl.protocolsThis parameter specifies the SSL protocols that are enabled for SSL connections. The property value is a whitespace-separated list of tokens acceptable to the javax.net.ssl.SSLSocket.setEnabledProtocols method.NoneValid String
mail.pop3.starttls.enableIf this parameter is set to true, it is possible to use the STLS command (if supported by the server) to switch the connection to a TLS-protected connection before issuing any login commands.falsetrue or false
mail.pop3.starttls.requiredIf this parameter is set to true, it is required to use the STLS command. The connect method fails if the server does not support the STLS command or if the command fails.falsetrue or false
mail.pop3.socks.hostThis parameter specifies the host name of a SOCKS5 proxy server that can be used to connect to the mail server.NoneValid String
mail.pop3.socks.portThis parameter specifies the port number for the SOCKS5 proxy server.NoneValid String
mail.pop3.disabletopIf this parameter is set to true, the POP3 TOP command is not used to fetch message headers.falsetrue or false
mail.pop3.forgettopheadersIf this parameter is set to true, the headers that might have been retrieved using the POP3 TOP command is forgotten and replaced by the headers retrieved when the POP3 RETR command is executed.falsetrue or false
mail.pop3.filecache.enableIf this parameter is set to true, the POP3 provider caches message data in a temporary file instead of caching them in memory. Messages are only added to the cache when accessing the message content. Message headers are always cached in memory (on demand). The file cache is removed when the folder is closed or the JVM terminates.falsetrue or false
mail.pop3.filecache.dirIf the file cache is enabled, this property is used to override the default directory used by the JDK for temporary files.NoneValid String
mail.pop3.cachewritetoThis parameter controls the behavior of the writeTo method on a POP3 message object. If the parameter is set to true, the message content has not been cached yet, and the ignoreList is null, the message is cached before being written. If not, the message is streamed directly to the output stream without being cached.falsetrue or false
mail.pop3.keepmessagecontentIf this property is set to true, a hard reference to the cached content is retained, preventing the memory from being reused until the folder is closed, or until the cached content is explicitly invalidated (using the invalidate method).falsetrue or false

EXAMPLE 1

CREATE SOURCE inputStream WITH (type='email', map.type='xml', username='receiver.account', password='account.password') (name string, age int, country string);

This example illustrates how to receive events in xml format via the email source. In this example, only the required parameters are defined in the stream definition. The default values are taken for the other parameters. The search term is not defined, and therefore, all the new messages in the inbox folder are polled and taken.

EXAMPLE 2

CREATE SOURCE inputStream WITH (type='email', map.type='xml', username='receiver.account', password='account.password',store = 'imap',host = 'imap.gmail.com',port = '993',searchTerm = 'subject:Stream Processor, from: [email protected] , cc: cc.account',polling.interval='500',action.after.processed='DELETE',content.type='text/html') (name string, age int, country string);

This example illustrates how to receive events in xml format via the email source. The email source polls the mail account every 500 seconds to check whether any new mails have arrived. It processes new mails only if they satisfy the conditions specified for the email search term (the value for from of the email message should be [email protected]<host name>, and the message should contain cc.account in the cc receipient list and the word Stream Processor in the mail subject). in this example, the action after processing is DELETE. Therefore,after processing the event, corresponding mail is deleted from the mail folder.

grpc (Source)

This extension starts a grpc server during initialization time. The server listens to requests from grpc stubs. This source has a default mode of operation and custom user defined grpc service mode. By default this uses EventService. In the default mode this source will use EventService consume method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the streamprocessor-tooling folder if we use it with streamprocessor-tooling). This method will receive requests and injects them into stream through a mapper.

Syntax

CREATE SOURCE <NAME> WITH (type="grpc", map.type="<STRING>", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
receiver.urlThe url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
max.inbound.message.sizeSets the maximum message size in bytes allowed to be received on the server.4194304INTYesNo
max.inbound.metadata.sizeSets the maximum size of metadata in bytes allowed to be received.8192INTYesNo
server.shutdown.waiting.timeThe time in seconds to wait for the server to shutdown, giving up if the timeout is reached.5LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is enabled-STRINGYesNo
keystore.passwordthe password of keystore-STRINGYesNo
keystore.algorithmthe encryption algorithm to be used for client authentication-STRINGYesNo
enable.sslto enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
mutual.auth.enabledto enable mutual authentication. If set to true and keystore.file or truststore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
threadpool.sizeSets the maximum size of threadpool dedicated to serve requests at the gRPC server100INTYesNo
threadpool.buffer.sizeSets the maximum size of threadpool buffer server100INTYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
keyStoreFilePath of the key store file\${carbon.home}/resources/security/gdncarbon.jksvalid path for a key store file
keyStorePasswordThis is the password used with key store filegdncarbonvalid password for the key store file
keyStoreAlgorithmThe encryption algorithm to be used for client authenticationSunX509-
trustStoreFileThis is the trust store file with the path\${carbon.home}/resources/security/client-truststore.jks-
trustStorePasswordThis is the password used with trust store filegdncarbonvalid password for the trust store file
trustStoreAlgorithmthe encryption algorithm to be used for server authenticationSunX509-

EXAMPLE 1

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.EventService/consume', map.type='json') (message String);

Here the port is given as 8888. So a grpc server will be started on port 8888 and the server will expose EventService. This is the default service packed with the source. In EventService the consume method is

EXAMPLE 2

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.EventService/consume', map.type='json', map.attributes="name='trp:name', age='trp:age', message='message'") (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: Name:John, Age:23

EXAMPLE 3

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.MyService/send', map.type='protobuf') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the send RPC method in the MyService service.

EXAMPLE 4

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.MyService/send', map.type='protobuf', attributes="a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue'") (a string ,c long,b int, d bool,e float,f double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the send method in the MyService service. Since we provide mapping in the stream we can use any names for stream attributes, but we have to map those names with correct protobuf message attributes' names. If we want to send metadata, we should map the attributes.

EXAMPLE 5

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.StreamService/clientStream', map.type='protobuf') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here we receive a stream of requests to the grpc source. Whenever we want to use streaming with grpc source, we have to define the RPC method as client streaming method, when we define a stream method stream processor will identify it as a stream RPC method and ready to accept stream of request from the client.

grpc-call-response (Source)

This grpc source receives responses received from gRPC server for requests sent from a grpc-call sink. The source will receive responses for sink with the same sink.id. For example if you have a gRPC sink with sink.id 15 then we need to set the sink.id as 15 in the source to receives responses. Sinks and sources have 1:1 mapping

Syntax

CREATE SOURCE <NAME> WITH (type="grpc-call-response", map.type="<STRING>", sink.id="<INT>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
sink.ida unique ID that should be set for each grpc-call source. There is a 1:1 mapping between grpc-call sinks and grpc-call-response sources. Each sink has one particular source listening to the responses to requests published from that sink. So the same sink.id should be given when writing the sink also.INTNoNo

EXAMPLE 1

CREATE SOURCE BarStream WITH (type='grpc-call-response', sink.id= '1') (message String);

CREATE SINK FooStream WITH (type='grpc-call', publisher.url = 'grpc://194.23.98.100:8080/EventService/process', sink.id= '1', map.type='json') (message String);

Here we are listening to responses for requests sent from the sink with sink.id 1 will be received here. The results will be injected into BarStream

grpc-service (Source)

This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. In the default mode this will use the EventService process method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the streamprocessor-tooling folder if we use it with streamprocessor-tooling). This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.

Syntax

CREATE SOURCE <NAME> WITH (type="grpc-service", map.type="<STRING>", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", service.timeout="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>")

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
receiver.urlThe url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
max.inbound.message.sizeSets the maximum message size in bytes allowed to be received on the server.4194304INTYesNo
max.inbound.metadata.sizeSets the maximum size of metadata in bytes allowed to be received.8192INTYesNo
service.timeoutThe period of time in milliseconds to wait for stream processor to respond to a request received. After this time period of receiving a request it will be closed with an error message.10000INTYesNo
server.shutdown.waiting.timeThe time in seconds to wait for the server to shutdown, giving up if the timeout is reached.5LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is en