Skip to main content

Quickstart with Stream Processing

Macrometa GDN allows you to integrate streaming data and take appropriate actions. Most stream processing use cases involve collecting, analyzing, and integrating or acting on data generated during business activities by various sources.

StageDescription
CollectReceive or capture data from various data sources.
AnalyzeAnalyze data to identify interesting patterns and extract information.
ActTake actions based on the findings. For example, running simple code, calling an external service, or triggering a complex integration.
IntegrateProvide processed data for consumer consumption.

If you are new to Macrometa GDN, start by reading the essentials of Macrometa GDN.

GDN Essentials

You can process streams to perform the following actions with your data:

  • Transform data from one format to another. For example, from XML to JSON.
  • Enrich data received from a specific source by combining it with databases and services.
  • Correlate data by joining multiple streams to create an aggregate stream.
  • Clean data by filtering it and by modifying the content in messages. For example, obfuscating sensitive information.
  • Derive insights by identifying event patterns in data streams.
  • Summarize data with time windows and incremental aggregations.
  • Real-time ETL for collections, tailing files, and scraping HTTP endpoints.
  • Integrating stream data and trigger actions based on the data. This can be a single service request or a complex enterprise integration flow.

For the following examples, assume these credentials:

Driver Download

Download the appropriate drivers for your preferred language.

    With Yarn or NPM

yarn add jsc8
(or)
npm install jsc8

If you want to use the driver outside of the current directory, you can also install it globally using the `--global` flag:

npm install --global jsc8

From source,

git clone https://github.com/macrometacorp/jsc8.git
cd jsC8
npm install
npm run dist

Connect to GDN

Establish a connection to a local region. When this code runs, it initializes the server connection to the region URL you specified.

    from c8 import C8Client

print("--- Connecting to C8")
client = C8Client(protocol='https', host='gdn.paas.macrometa.io', port=443,
email='[email protected]', password='xxxxx',
geofabric='_system')

Validate Stream Application

Validate the stream application for syntax errors before saving.

    stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic Stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections will be created automatically if they do not already exist.')

/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.

2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}

3. Following messages would be shown on the SampleCargoAppDestStream Stream Console
[1]
[2]
[3]
[4]
[5]
*/

-- Create Table SampleCargoAppInputTable to process events having sensorId and temperature(F).
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);


-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);


-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""

print("--- Validating Stream Application Definition")
print(client.validate_stream_app(data=stream_app_definition))

Save Stream Application

By default, the stream application saves in the local region. Optionally, you can use dclist (domain component list) to deploy the stream application in other specified regions or all regions.

    print("--- Creating Stream Application")
print(client.create_stream_app(data=stream_app_definition))

Enable or Disable Stream Application

    print("Activate", client.activate_stream_app('Sample-Cargo-App', True))

print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))

To operate on created applications, you need to create an instance of the stream application.

Example: Update Stream Application

In this example, we update a stream application to store the input data into itself and another collection called SampleCargoAppDestTable.

    from c8 import C8Client

client = C8Client(protocol='https', host='gdn.paas.macrometa.io', port=443, email='[email protected]', password='xxxxx', geofabric='_system')

# To operate on created apps, you need to create an instance of the app
app = client.stream_app("Sample-Cargo-App")

# Update the app using
data = """@App:name('Sample-Cargo-App') @App:qlVersion('2')

-- Stream
CREATE SOURCE STREAM srcCargoStream (weight int);

-- Table
CREATE TABLE destCargoTable (weight int, totalWeight long);

-- Data Processing
@info(name='Query')
INSERT INTO destCargoTable
SELECT weight, sum(weight) as totalWeight
FROM srcCargoStream;"""
regions = []
result = fabric.update(data,regions)
print(result)

Now, the code to update an Stream Application will look like

    const updatedAppDefinition = `
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections will be created automatically if they do not already exist.')

/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.

2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}

3. Following messages would be shown on the SampleCargoAppDestStream Stream Console
[1]
[2]
[3]
[4]
[5]

4. Following messages would be stored into SampleCargoAppDestTable
{"weight":1}
{"weight":2}
{"weight":3}
{"weight":4}
{"weight":5}
*/

-- Defines Table SampleCargoAppInputTable to process events having sensorId and temperature(F).
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Define Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

CREATE STORE SampleCargoAppDestTable WITH (type = 'database', stream = "SampleCargoAppDestTable") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;

-- Data Processing
@info(name='Dump')
INSERT INTO SampleCargoAppDestTable
SELECT weight
FROM SampleCargoAppInputTable;`

console.log("--- Updating Stream Application `Sample-Cargo-App`");
result = await app.updateApplication([], updatedAppDefinition);

Run an Adhoc Query

Available in the advanced operations of python driver. Refer example at the end of the page.

    client = C8Client(protocol='https', host='gdn.paas.macrometa.io', port=443, email='[email protected]', password='xxxxx', geo_fabric='_system')

# To operate on created apps, you need to create an instance of the app
app = client.create_stream_app("Sample-Cargo-App")

# fire query on app using
q = "select * from SampleCargoAppDestTable limit 3"
result = app.query(q)
print(result)

Delete Stream Application

    print("--- Deleting Stream Application `Sample-Cargo-App`")
result = client.delete_stream_app('Sample-Cargo-App')

Get Sample Stream Applications

You can try out several Stream Apps which are preloaded and ready to run.

    print("--- You can try out several stream applications which are pre-loaded and ready to run.")
print("Samples", client.get_stream_app_samples())

Complete Example

The following example uses the code snippets provided in this tutorial.

    import time
import traceback
from c8 import C8Client

# Simple Approach
print("--- Connecting to C8")
client = C8Client(protocol='https', host='gdn.paas.macrometa.io', port=443,
email='[email protected]', password='xxxxx',
geofabric='_system')

stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections will be created automatically if they do not already exist.')

/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.

2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}

3. Following messages would be shown on the SampleCargoAppDestStream Stream Console
[1]
[2]
[3]
[4]
[5]
*/

-- Create Table SampleCargoAppInputTable to process events having sensorId and temperature(F).
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""
# Create a stream application
print(client.create_stream_app(data=stream_app_definition))
# Validate a stream application
print(client.validate_stream_app(data=stream_app_definition))
# Retrive a stream application
print("Retrive", client.retrieve_stream_app())
# Get a stream application handle for advanced operations
print("Get App", client.get_stream_app('Sample-Cargo-App'))
# Deactivate a stream application
print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))
# Activate a stream application
print("Activate", client.activate_stream_app('Sample-Cargo-App', True))
# Delete a stream application
print(client.delete_stream_app('Sample-Cargo-App'))
# Get stream application samples
print("Samples", client.get_stream_app_samples())