http-call-response
The http-call-response source receives the responses for the calls made by its corresponding http-call sink, and maps them from formats such as text
and JSON
.
To handle messages with different HTTP status codes having different formats, multiple http-call-response sources are
allowed to associate with a single http-call sink. It allows accessing the attributes of the event that initiated the call, and the response headers and properties via transport properties in the format trp:<attribute name>
and trp:<header/property>
respectively.
Syntax
CREATE SOURCE <NAME> WITH (type="http-call-response", map.type="<STRING>", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>")
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages. | STRING | No | No | |
http.status.code | The matching http responses status code regex, that is used to filter the the messages which will be processed by the source. Eg: http.status.code = '200' , http.status.code = '4\\d+' | 200 | STRING | Yes | No |
allow.streaming.responses | Enable consuming responses on a streaming manner. | false | BOOL | Yes | No |
Example 1
CREATE SINK EmployeeRequestStream WITH (type='http-call', method='POST', publisher.url='http://localhost:8005/registry/employee', sink.id='employee-info', map.type='json') (name string, id int);
CREATE SOURCE EmployeeResponseStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='2\\d+', map.type='json', map.attributes="name='trp:name', id='trp:id', location='$.town', age='$.age'") (name string, id int, location string, age int);
CREATE SOURCE EmployeeErrorStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='4\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="error='A[1]'") (error string);
When events arrive in EmployeeRequestStream
, http-call sink makes calls to endpoint on URL http://localhost:8005/registry/employee
with
POST
method and Content-Type application/json
.
If the arriving event has attributes name
:John
and id
:1423
it will send a message with default JSON mapping as follows:
{
"event": {
"name": "John",
"id": 1423
}
}
When the endpoint responds with status code in the range of 200 the message will be received by the http-call-response source associated
with the EmployeeResponseStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and as that expects
messages with http.status.code
in regex format 2\\d+
. If the response message is in the format:
{
"town": "NY",
"age": 24
}
the source maps the location
and age
attributes by executing JSON path on the message and maps the name
and id
attributes by
extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message are received by the http-call-response source associated with the
EmployeeErrorStream
stream, because it is correlated with the sink by the same sink.id
employee-info
and it expects messages with
http.status.code
in regex format 4\\d+
, and maps the error response to the error
attribute of the event.