avro
This extension is an Avro to Event input mapper. Transports that accept Avro messages can use this extension to convert the incoming Avro messages to stream worker events. The Avro schema to be used for creating Avro messages can be specified as a parameter in the stream definition.
If no Avro schema is specified, a flat Avro schema of the record
type is generated with the stream attributes as schema fields. The generated/specified Avro schema is used to convert Avro messages to stream worker events.
Syntax
CREATE SOURCE <NAME> WITH (map.type="avro", map.schema.def="<STRING>", map.schema.registry="<STRING>", map.schema.id="<STRING>", map.fail.on.missing.attribute="<BOOL>")
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
schema.def | This specifies the schema of the Avro message. The full schema used to create the Avro message needs to be specified as a quoted JSON string. | STRING | No | No | |
schema.registry | This specifies the URL of the schema registry. | STRING | No | No | |
schema.id | This specifies the ID of the Avro schema. This ID is the global ID that is returned from the schema registry when posting the schema to the registry. The schema is retrieved from the schema registry via the specified ID. | STRING | No | No | |
fail.on.missing.attribute | If this parameter is set to true , a JSON execution failing or returning a null value results in that message being dropped by the system. If this parameter is set to false , a JSON execution failing or returning a null value results in the system being prompted to send the event with a null value to Stream App so that the user can handle it as required (i.e., by assigning a default value. | true | BOOL | Yes | No |
Example 1
CREATE SOURCE UserStream WITH (type='stream', topic='user', map.type='avro', map.schema.def = """{"type":"record","name":"userInfo","namespace":"user.example","fields":[{"name":"name","type":"string"}, {"name":"age","type":"int"}]}""") (name string, age int );
The above stream worker query performs a default Avro input mapping. The input Avro message that contains user information is converted to a stream worker event. The expected input is a byte array or ByteBuffer.
Example 2
CREATE SOURCE userStream WITH (type='stream', topic='user', map.type='avro', map.schema.def = """{"type":"record","name":"userInfo","namespace":"avro.userInfo","fields":[{"name":"username","type":"string"}, {"name":"age","type":"int"}]}""", map.attributes="name="username",age="age"") (name string, age int );
The above stream worker query performs a custom Avro input mapping. The input Avro message that contains user information is converted to a stream worker event. The expected input is a byte array or ByteBuffer.
Example 3
CREATE SOURCE UserStream WITH (type='stream', topic='user', map.type='avro',schema.registry='http://192.168.2.5:9090', schema.id='1', map.attributes="name='username', age='age'") (name string, age int );
The above stream worker query performs a custom Avro input mapping. The input Avro message that contains user information is converted to a stream worker event via the schema retrieved from the given schema registry(localhost:8081). The expected input is a byte array or ByteBuffer.