Sink Mapping
Each sink.type
configuration can have a mapping denoted by the map.type
annotation that defines how to convert stream events to outgoing messages with the defined format.
The type
parameter of the map.type
defines the map type to be used in converting the outgoing events. The other parameters of map.type
annotation depends on the mapper selected, and some of its parameters can be optional and/or dynamic.
For detailed information about the parameters see the documentation of the relevant mapper.
Map Payload
map.payload
is an optional annotation used with map.type
to define custom mapping. When the map.payload
annotation is not provided, each mapper maps the outgoing events to its own default event format. The map.payload
annotation allow users to configure mappers to produce the output payload of their choice, and by using dynamic properties within the payload they can selectively extract and add data from the published Stream events.
There are two ways you to configure map.payload
annotation.
Some mappers such as
JSON
andTest
only accept one output payload:map.payload='This is a test message from {{user}}.'
Some mappers such
key-value
accept series of mapping values:map.payload= key1='mapping_1', 'key2'='user : {{user}}'
Here, the keys of payload mapping can be defined using the dot notation as
a.b.c
, or using any constant string value as'$abc'
.
Supported Sink Mapping Types
The following is a list of sink mapping types supported by stream workers:
Sink Mapping Type | Description |
---|---|
CSV | Converts CSV-like delimiter separated events to stream events. |
JSON | Converts JSON messages to stream events. |
Key-Value | Converts key-value hash maps to stream events. |
PassThrough | Omits data conversion on stream events. |
Text | Converts plain text messages to stream events. |
When the map.type
annotation is not provided, map.type='passThrough'
is used as default, that passes the outgoing events directly to the sinks without any data conversion.
Example 1
Publishes OutputStream
events by converting them to JSON
messages with the default format, and by sending to an HTTP
endpoint http://localhost:8005/endpoint1
, using POST
method, Accept
header, and basic authentication having admin
is both username and password.
The configuration of the HTTP
sink and JSON
sink mapper to achieve the above is as follows.
CREATE SINK OutputStream WITH (sink.type='http', publisher.url='http://localhost:8005/endpoint', method='POST', headers='Accept-Date:20/02/2017', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json') (name string, age int, country string);
This publishes a JSON
message on the following format:
{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}
Example 2
Publishes StockStream
events by converting them to user defined JSON
messages, and by sending to an HTTP
endpoint http://localhost:8005/stocks
.
The configuration of the HTTP
sink and custom JSON
sink mapping to achieve the above is as follows.
CREATE SINK StockStream WITH (sink.type='http', publisher.url='http://localhost:8005/stocks', map.type='json', validate.json='true', enclosing.element='$.Portfolio', map.payload="""{"StockData":{ "Symbol":"{{symbol}}", "Price":{{price}} }}""") (symbol string, price float, volume long);
This publishes a single event as the JSON
message on the following format:
{
"Portfolio":{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
}
}
This can also publish multiple events together as a JSON
message on the following format:
{
"Portfolio":[
{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
},
{
"StockData":{
"Symbol":"FB",
"Price":57.0
}
}
]
}
Example 3
Publishes events from the OutputStream
stream to multiple HTTP
endpoints using a partitioning strategy. Here the events are sent to either http://localhost:8005/endpoint1
or http://localhost:8006/endpoint2
based on the partitioning key country
. It uses default JSON
mapping, POST
method, and used admin
as both the username and the password when publishing to both the endpoints.
The configuration of the distributed HTTP
sink and JSON
sink mapper to achieve the above is:
CREATE SINK OutputStream WITH (sink.type='http', method='POST', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json', distribution.strategy='partitioned', partitionKey='country', destination.publisher.url='http://localhost:8005/endpoint1', destination.publisher.url='http://localhost:8006/endpoint2') (name string, age int, country string);
This partitions the outgoing events and publish all events with the same country attribute value to the same endpoint. The JSON
message published will be in the following format:
{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}