Skip to main content

Create Stream Application

Introduction

Stream applications are declarative specs that define the processing logic to process the events sent to the stream processor. A stream app definition contains the following configurations:

ConfigurationDescription
StreamA logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema.
SourceThis consumes data from external sources (such as `TCP` , ` Kafka ` , ` HTTP ` , etc) in the form of events, then converts each event (that can be in `XML` , ` JSON` , ` binary` , etc. format) to a stream event, and passes that to a stream for processing.
SinkThis takes events arriving at a stream, maps them to a predefined data format (such as ` XML ` , `JSON,` `binary` , etc), and publishes them to external endpoints (such as ` E-mail ` , ` TCP ` , ` Kafka ` , `HTTP ` , etc).
TableA structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime.
Executional Element

An executional element can be one of the following:

  • Stateless query: Queries that only consider currently incoming events when generating an output. e.g., filters
  • Stateful query: Queries that consider both currently incoming events as well as past events when generating an output. e.g., windows, sequences, patterns, etc.
  • Partitions: Collections of stream definitions and queries separated from each other within a Stream application for the purpose of processing events in parallel and in isolation

Macrometa provide in-build source, sink and store explained in the later section of this document.

Creating a Stream Application

To create a stream application follow the steps below:

  1. Open the GUI. Click the Stream Apps tab.
  2. Click New to define a new stream application.
  3. Type a Name for the stream application. For example, SweetProductionAnalysis.
  4. Type a Description.
  5. Add the following sample stream application.
CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', map.type='json') (name string, amount double);

CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json'

INSERT INTO ProductionAlertStream
SELECT *
FROM SweetProductionStream;
  1. Click Save to save the stream app.
  2. Select all the regions to deploy your application in.
  3. Click on Save.

Source

C8Streams

Syntax:

CREATE SOURCE SourceName WITH (type="stream", stream.list="STRING", replication.type="STRING", map.type='type') (strings);

Example:

CREATE SOURCE OrderStream WITH (type="stream", stream.list="OrderStream", replication.type="local", map.type='json') (product_id string, quantity integer);

Stream application will use the stream with the default query parameters explained in the chart below.

Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
stream.listThis specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. `stream_one,stream_two`STRINGNo
replication.typeSpecifies if the replication type of the streams. Possible values can be `local` and `global`localSTRINGYes

C8DB

Syntax:

CREATE SOURCE SourceName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type') (strings);

Example:

CREATE SOURCE SweetProductionStream WITH (type="database", map.type='json') (name string, amount double);

Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
collectionThis specifies the name of the c8db collection to which the source must listen.STRINGNo
replication.typeSpecifies if the replication type of the c8db collection. Possible values can be `local` and `global`localSTRINGYes
collection.typeThis specifies the type of the data collection contains. Possible values can be `doc` and `edge`.docSTRINGYes

Sink

C8Streams

Syntax:

CREATE SINK SinkName WITH (type="stream", stream="STRING", replication.type="STRING", map.type='type') (strings);
@sink(type="c8streams", stream="<STRING>", replication.type="<STRING>", @map(...)))

Example:

CREATE SINK ProductionAlertStream WITH (type="stream", stream='ProductionAlertStream', map.type='json`) (name string, amount double);

Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
streamThe streams to which the C8Stream sink needs to publish events.STRINGNo
replication.typeSpecifies if the replication type of the stream. Possible values can be `local` and `global`localSTRINGYes

Table

C8DB

Syntax:

CREATE STORE StoreName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type', from="STRING", to="STRING") (strings);

Example:

CREATE STORE SweetProductionCollection WITH (type="database", collection="SweetProductionCollection", replication.type="local", map.type='json') (strings);

Stream applications will use the c8db with the default query parameters explained in the chart below.

NameDescriptionDefault ValuePossible Data TypesOptional
collectionThis specifies the name of the c8db collection to which events must written.STRINGNo
replication.typeSpecifies if the replication type of the c8db collection. Possible values can be `local` and `global`localSTRINGYes
collection.typeThis specifies the type of the data collection contains. Possible values can be `doc` and `edge`.docSTRINGYes
fromIf `collection.type` is specified as `edge`, this field indicates which field to be considered as a source node of the edge._fromSTRINGYes
toIf `collection.type` is specified as `edge`, this field indicates which field to be considered as a destination node of the edge._toSTRINGYes

Tutorials

Following tutorials cover various user scenarios using Macrometa Stream Processing.

Refer to Reference for additional stream processing examples.