Skip to main content

Using Rest APIs

Modern applications need to be highly responsive, always online, and able to access data instantly across the globe. At the same time, they need to be deployed on datacenters close to their users. Macrometa global data network (GDN) is a real-time materialized view engine that provides instant data to applications and APIs in a simple interface.

If you are new to Macrometa GDN, we strongly recommend reading What is Macrometa.

Prerequisites:

A Macrometa GDN tenant account and credentials.

API Browser

Your main tool for using REST APIs is the API reference in the GDN web browser interface. Use the built-in API reference to run various calls and view their input and output.

GDN API Browser

Working with Documents

A document is a JSON-serializable dictionary object with the following properties:

  • _key identifies a document within a collection.
  • _id identifies a document across all collections in a fabric with the following format: {collection name}/{document key}. This is also known as a handle.
  • _rev indicates the latest revision of a document. GDN supports MVCC (Multiple Version Concurrency Control) and stores each document in multiple revisions. This field is automatically populated, but you can use it to validate a document against its current revision.

For example:

{
"_id": "students/bruce",
"_key": "bruce",
"_rev": "_Wm3dzEi--_",
"first_name": "Bruce",
"last_name": "Wayne",
"address": {
"street": "1007 Mountain Dr.",
"city": "Gotham",
"state": "NJ"
},
"is_rich": True,
"friends": ["robin", "gordon"]
}

Tutorial

 import json
import requests

# Set constants
FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
COLLECTION_NAME = 'testcollection'
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
AUTH_TOKEN = "bearer "
FABRIC = "_system"
URL = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD,

}
headers = {
'content-type': 'application/json'
}

response = requests.post(URL, data=json.dumps(payload), headers=headers)

if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. "
f"Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Get list of all regions
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/datacenter/all"
dcl_resp = session.get(URL)
dcl_list = json.loads(dcl_resp.text)
regions = []
for dcl in dcl_list:
dcl_url = dcl['tags']['url']
regions.append(dcl_url)
print("\nList of regions: ", regions)

# Create a document collection
# Note: Create a test collection. Set "type" to 2 for documents or 3 for edges
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/collection"
payload = {
"name": COLLECTION_NAME,
"type": 2
}
resp = session.post(URL, data=json.dumps(payload))
resp = json.loads(resp.text)
if 'error' in resp and resp['error']:
print("ERROR: " + resp["errorMessage"])
else:
print("\nCollection created: ", resp['name'])

# Insert a document into a collection
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}"
payload = {'GPA': 3.5, 'first': 'Lola', 'last': 'Martin', '_key': 'Lola'}
resp = session.post(URL, data=json.dumps(payload))
print("\nDocument inserted: ", resp.text)

# Data can either be a single document or a list of documents
# Insert multiple documents
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}"
data = [
{'GPA': 3.2, 'first': 'Abby', 'last': 'Page', '_key': 'Abby'},
{'GPA': 3.6, 'first': 'John', 'last': 'Kim', '_key': 'John'},
{'GPA': 4.0, 'first': 'Emma', 'last': 'Park', '_key': 'Emma'}
]
resp = session.post(URL, data=json.dumps(data))
print("\nMultiple documents inserted: ", resp.text)

# Read a document with its ID
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}" + "/Lola"
resp = session.get(URL)
print("\nDocument with ID Lola is: ", resp.text)

# Read multiple documents
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/simple/lookup-by-keys"
payload = {"collection": COLLECTION_NAME,
"keys": ["Abby", "John", "Emma"]}
resp = session.put(URL, data=json.dumps(payload))
resp = json.loads(resp.text)
print("\nDocuments: ", resp["documents"])

# Update a single document with its ID
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}/John"
payload = {'GPA': 3.6, 'first': 'John', 'last': 'Andrews', '_key': 'John'}
resp = session.patch(URL, data=json.dumps(payload))
print("\nUpdated document with ID John: ", resp.text)

# Update documents
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}"
payload = [
{'GPA': 4.6, 'first': 'Lola', 'last': 'Martin', '_key': 'Lola'},
{'GPA': 3.2, 'first': 'Abby', 'last': 'Stutguard', '_key': 'Abby'}
]
resp = session.patch(URL, data=json.dumps(payload))
print("\nUpdated documents: ", resp.text)

# Remove single document with its ID
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}/John"
resp = session.delete(URL)
print("\nDeleted document with ID John: ", resp.text)

# Remove a multiple document
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME}"
payload = [
{'GPA': 4.6, 'first': 'Lola', 'last': 'Martin', '_key': 'Lola'},
{'GPA': 3.2, 'first': 'Abby', 'last': 'Stutguard', '_key': 'Abby'},
{'GPA': 4.0, 'first': 'Emma', 'last': 'Park', '_key': 'Emma'}
]
resp = session.delete(URL, data=json.dumps(payload))
print("\nDeleted Documents: ", resp.text)

Query using C8QL

You can use C8QL to run CRUD Operations.

Tutorial

#Using C8QL
import json
import requests

# Constants
FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
AUTH_TOKEN = "bearer "


# Create HTTPS session
url = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD
}
headers = {
'content-type': 'application/json'
}

response = requests.post(url, data=json.dumps(payload), headers=headers)
if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. "
f"Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})
url = f"{FED_URL}/_fabric/_system/_api/cursor"


# Insert documents to the collection
resp = session.post(url, json={
"query": "INSERT{'name' : 'Julie', 'company' : 'ABC', '_key' : 'Julie'}" \
"INTO testcollection"
})

# Read from the collection
resp = session.post(url, json={
"query": "FOR doc IN testcollection RETURN doc"
})
print(resp.text)

# Update documents in the collection
resp = session.post(url, json={
"query": "FOR c IN testcollection UPDATE c WITH{'company':'XYZ'} IN testcollection"
})
print(resp.text)
# Upsert documents in the collection
resp = session.post(url, json={
"query": "UPSERT {name: 'John'} INSERT "
"{_key:'John', name: 'John', logins:1, updatedAt: DATE_NOW()}"
" UPDATE {'logins': OLD.logins + 1, updatedAt: DATE_NOW()} IN testcollection"
})
print(resp.text)
# Delete documents in the collection
resp = session.post(url, json={
"query": "FOR c IN testcollection REMOVE c IN testcollection"
})
print(resp.text)

Publish-Subscribe with Streams

GDN streams are a high-performance solution for server-to-server messaging. Streams are built on the publish-subscribe (pub-sub) pattern in which producers publish messages to streams, and consumers can subscribe to those streams, process incoming messages, and send an acknowledgment to the producer when finished.

Streams provide:

  • Seamless geo-replication of messages across regions.
  • Low publish and end-to-end latency.
  • Seamless scalability to over a million topics.
  • Multiple subscription modes (exclusive, shared, and failover) for streams.
  • Guaranteed message delivery with persistent message storage.

Tutorial

import json
import base64
import requests
from websocket import create_connection
import six

# Constants
FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
FABRIC = "_system"
STREAM_NAME = "teststream"
AUTH_TOKEN = "bearer "
TENANT_NAME = ""
CONSUMER_NAME = "testconsumer"
STREAM_TYPE = "c8global"

# Create HTTPS session with auth endpoint
url = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD
}
headers = {
'content-type': 'application/json'
}
response = requests.post(url, data=json.dumps(payload), headers=headers)
if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. "
f"Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create a stream
# Set global=true for a global stream or global=false for a local stream
url = f"{FED_URL}/_fabric/{FABRIC}/_api/streams/{STREAM_NAME}?global=true"
resp = session.post(url)
print("\nStream created: ", resp.text)

# Publish messages, Send message in body
url = f"{FED_URL}/_fabric/{FABRIC}/_api/streams/{STREAM_TYPE}s.{STREAM_NAME}/publish?global=true"
resp = session.post(url, data="Message")
print("\nStream received message: ", resp.text)

# or using web sockets
PRODUCER_URL = f"wss://{FEDERATION}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{STREAM_TYPE}.{FABRIC}/{STREAM_TYPE}s.{STREAM_NAME}"
ws = create_connection(PRODUCER_URL, header=[f"authorization: {AUTH_TOKEN}"])

payload = {
"payload": base64.b64encode(
six.b("Hello World")
).decode("utf-8")
}
ws.send(json.dumps(payload))
response = json.loads(ws.recv())

if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()

# Subscribe
CONSUMER_URL = f"wss://{FEDERATION}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{STREAM_TYPE}.{FABRIC}/{STREAM_TYPE}s.{STREAM_NAME}/{CONSUMER_NAME}"
ws = create_connection(CONSUMER_URL, header=[f"authorization: {AUTH_TOKEN}"])
while True:
msg = json.loads(ws.recv())
if msg:
print(f"received: {base64.b64decode(msg['payload'])}")
# Acknowledge successful processing
ws.send(json.dumps({'messageId': msg['messageId']}))
break
ws.close()

# Delete Subscription (unsubscribe)
url = f"{FED_URL}/_api/streams/subscription/{CONSUMER_NAME}"
resp = session.delete(url)
print("Subscription deleted: ", resp.text)

Query as API (RestQL)

Globally distributed applications need a fast data platform that can transparently replicate data anywhere in the world, enabling users to access applications on the closest copy of their data. Additionally, these applications need both geo-replicated and local streams to handle pub-sub, ETL, and real-time updates.

Macrometa GDN provides turnkey global distribution and transparent multi-master replication. You can run globally distributed, low-latency workloads with GDN.

Tutorial

# Using RESTQL
import json
import time

import requests

# Set constants
FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
FABRIC = "_system"
AUTH_TOKEN = "bearer "
READ_QUERY = "FOR doc IN @@collection RETURN doc"
QUERY_NAME = "read"
COLLECTION_NAME = "api_query_tutorial"
QUERY_PARAMS = {"@collection": f"{COLLECTION_NAME}"}
INSERT_QUERY = "FOR i IN 1..100 INSERT { result: i } INTO @@collection"
UPDATE_QUERY = "FOR doc IN @@collection FILTER doc.result >= 35 " \
"UPDATE doc._key WITH { qualified :true } IN @@collection"
DELETE_QUERY = "FOR c IN @@collection REMOVE c IN @@collection"
UPDATE_READ_QUERY = "FOR doc IN @@collection FILTER doc.result < 10 RETURN doc"

# Create HTTPS session

URL = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD
}
headers = {
'content-type': 'application/json'
}

response = requests.post(URL, data=json.dumps(payload), headers=headers)

if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. Code:"
f"{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create a document collection
# Note: Create a test collection. Set "type" to 2 for documents or 3 for edges
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/collection"
payload = {
"name": COLLECTION_NAME,
"type": 2
}
resp = session.post(URL, data=json.dumps(payload))
resp = json.loads(resp.text)
if 'error' in resp and resp['error']:
print("ERROR: " + resp["errorMessage"])
else:
print("\nCollection created: ", resp['name'])

# Create RESTQL
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql"

# Save read query
payload = {
"query": {
"name": QUERY_NAME,
"parameter": QUERY_PARAMS,
"value": READ_QUERY
}
}

resp = session.post(URL, data=json.dumps(payload))
print("\nRead query saved: ", resp.text)
time.sleep(1)
# Save insert query
payload = {
"query": {
"name": "insert",
"value": INSERT_QUERY,
"parameter": QUERY_PARAMS,

}
}

resp = session.post(URL, data=json.dumps(payload))
print("\nInsert query saved: ", resp.text)
time.sleep(1)
# Save update query
payload = {
"query": {
"name": "update",
"value": UPDATE_QUERY,
"parameter": QUERY_PARAMS,

}
}

resp = session.post(URL, data=json.dumps(payload))
print("\nUpdate query saved: ", resp.text)
time.sleep(1)
# Save delete query
payload = {
"query": {
"name": "delete",
"value": DELETE_QUERY,
"parameter": QUERY_PARAMS,

}
}
resp = session.post(URL, data=json.dumps(payload))
print("\nDelete query saved: ", resp.text)
time.sleep(1)
# Execute insert query
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/execute/insert"
payload = {
"bindVars": QUERY_PARAMS,
}
resp = session.post(URL, data=json.dumps(payload))
print("\nInsert query executed: ", resp.text)
time.sleep(1)
# Execute read query
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/execute/" + QUERY_NAME
payload = {
"bindVars": QUERY_PARAMS,
}
resp = session.post(URL, data=json.dumps(payload))
print("\nRead query executed: ", resp.text)
time.sleep(1)
# Execute update query
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/execute/update"
payload = {
"bindVars": QUERY_PARAMS,
}
resp = session.post(URL, data=json.dumps(payload))
print("\nUpdate query executed: ", resp.text)
time.sleep(1)
# Update saved query
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/" + QUERY_NAME
payload = {
"query": {
"parameter": QUERY_PARAMS,
"value": UPDATE_READ_QUERY
}
}
resp = session.put(URL, data=json.dumps(payload))
print("Query updated: ", resp.text)
time.sleep(1)
# Execute delete query
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/execute/delete"
payload = {
"bindVars": QUERY_PARAMS,
}
resp = session.post(URL, data=json.dumps(payload))
print("\nDelete query executed: ", resp.text)
time.sleep(1)
# Delete saved queries
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/{QUERY_NAME}"
resp = session.delete(URL)
print("Read query deleted: ", resp.text)
time.sleep(1)
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/insert"
resp = session.delete(URL)
print("Insert query deleted: ", resp.text)
time.sleep(1)
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/update"
resp = session.delete(URL)
print("Update query deleted: ", resp.text)
time.sleep(1)
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/restql/delete"
resp = session.delete(URL)
print("Delete query deleted: ", resp.text)

Working with Graphs

Graphs enable you to do the following:

  • Structure your data models to make them consistent with your domain.
  • Group your data models into logical collections and query them.

A graph consists of vertices and edges that are all stored as documents in collections. You can store vertices in document collections or edge collections, enabling you to use an edge as a vertex. Edges can only be stored in edge collections. An edge collection is similar to a relation table that stores many-to-many relationships between two data tables, and a vertex collection is like one of the data tables with connected objects. A graph can use one or more edge definitions to determine which collections are used.

An edge collection contains edge documents and shares its namespace with all other collection types. You can manage edge documents with REST API wrappers for regular documents, but edge collection API wrappers provide the following benefits:

  • Perform modifications as transactions.
  • When inserting edge documents, check them against edge definitions.

Edge documents have two additional required fields:

  • _from
  • _to

Edges use these fields to point from one document to another stored in vertex collections. The values of _from and _to must match the IDs of vertex documents linked by the edge document. For example:

{
"_id": "friends/001",
"_key": "001",
"_rev": "_Wm3dyle--_",
"_from": "students/john", // This value must match the ID of the "from" document.
"_to": "students/jane", // This value must match the ID of the "to" document.
"closeness": 9.5
}

In queries, you can define permissions for the direction of edge relations. For example:

  • OUTBOUND: _from_to
  • INBOUND: _from_to
  • ANY: _from_to

While simple graph queries with a fixed number of hops via the relation table may be doable in SQL with several nested joins, graph databases can handle an arbitrary number of these hops over edge collections - this is called traversal. Also edges in one edge collection may point to several vertex collections. It is common to have attributes attached to edges, i.e. a label naming this interconnection.

Tutorial

To create edge collection use same endpoint /_fabric/{fabric_name}/_api/collection and pass type:3 in payload.

import json
import requests

# Constants

FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
FABRIC = "_system"
AUTH_TOKEN = "bearer "
TENANT_NAME = ""
COLLECTION_NAME_1 = "teachers"
COLLECTION_NAME_2 = "lectures"
EDGE_COLL_NAME = "teach"
GRAPH_NAME = "lectureteacher"

# Create HTTPS session

URL = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD
}
headers = {
'content-type': 'application/json'
}

response = requests.post(URL, data=json.dumps(payload), headers=headers)

if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. "
f"Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create document collections and insert data


URL = f"{FED_URL}/_fabric/{FABRIC}/_api/collection"
payload = {'name': COLLECTION_NAME_1}

resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nDocument collection 1 created: ", result)

payload = {'name': COLLECTION_NAME_2}

resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nDocument collection 2 created: ", result)

payload = [
{
'_key': 'Jean',
'firstname': 'Jean',
'lastname': 'Picard',
'email': '[email protected]'
},
{
'_key': 'James',
'firstname': 'James',
'lastname': 'Kirk',
'email': '[email protected]'
},
{
'_key': 'Han',
'firstname': 'Han',
'lastname': 'Solo',
'email': '[email protected]'
},
{
'_key': 'Bruce',
'firstname': 'Bruce',
'lastname': 'Wayne',
'email': '[email protected]'
}
]

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME_1}"
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nDocuments inserted: ", result)

payload = [
{'_id': 'lectures/CSC101', 'difficulty': 'easy', '_key': 'CSC101', 'firstname': 'Jean'},
{'_id': 'lectures/CSC102', 'difficulty': 'hard', '_key': 'CSC102', 'firstname': 'Jean'},
{'_id': 'lectures/CSC103', 'difficulty': 'hard', '_key': 'CSC103', 'firstname': 'Jean'},
{'_id': 'lectures/CSC104', 'difficulty': 'moderate', '_key': 'CSC104', 'firstname': 'Jean'}

]

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{COLLECTION_NAME_2}"
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nDocuments inserted: ", result)

# Create edge collection
payload = {'name': EDGE_COLL_NAME, "type": 3}

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/collection"
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nEdge collection created: ", result)
payload = [
{
'_key': 'Jean-CSC101',
'_from': 'teachers/Jean',
'_to': 'lectures/CSC101',
'online': False
},
{
'_key': 'Jean-CSC102',
'_from': 'teachers/Jean',
'_to': 'lectures/CSC102',
'online': True
},
{
'_key': 'Jean-CSC103',
'_from': 'teachers/Jean',
'_to': 'lectures/CSC103',
'online': False
},
{
'_key': 'Bruce-CSC101',
'_from': 'teachers/Bruce',
'_to': 'lectures/CSC101',
'online': True
}

]

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/document/{EDGE_COLL_NAME}"
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nDocuments inserted: ", result)

# Create a graph
payload = {
"edgeDefinitions": [
{
"collection": EDGE_COLL_NAME,
"from": [
"teachers"
],
"to": [
"lectures"
]
}
],
"name": GRAPH_NAME,
"options": {}
}

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/graph"
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nGraph created: ", result)

# Graph traversal
# To use outbound traversal, set direction to `out`.
# To use inbound traversal, set direction to `in`.
params = {
"vertex": "teachers/Jean",
"direction": "out"
}

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/edges/{EDGE_COLL_NAME}"

resp = session.get(URL, params=params)
result = json.loads(resp.text)
print("\nGraph traversal: ", result)

# Delete graph and collections
# To delete the graph and save the collections, set dropCollection to `false`.
params = {"dropCollections": True}
URL = f"{FED_URL}/_fabric/{FABRIC}/_api/graph/{GRAPH_NAME}"
resp = session.delete(URL, params=params)
result = json.loads(resp.text)
print("Graph and collections deleted: ", result)

Stream Processing

Macrometa Stream processing enables you to integrate streaming data into your tenant and enables you to automatically respond to events. A stream processing engine must collect and analyze data generated by business activities, then integrate or act on the data.

  • Collect: Capture or receive data from various data sources.

  • Analyze: Analyze data to identify interesting patterns and extract information.

  • Act: Take actions based on processing results. For example, you can execute code, call an external service, or trigger a complex integration.

  • Integrate: Make processed data globally available for consumers in the correct format with low latency.

import json
import time
import base64
import requests
from websocket import create_connection
import six

# Set constants

FEDERATION = "api-gdn.paas.macrometa.io"
FED_URL = f"https://{FEDERATION}"
EMAIL = "[email protected]"
PASSWORD = "xxxxx"
FABRIC = "_system"
AUTH_TOKEN = "bearer "
TENANT_NAME = ""
STREAM_NAME = "tutorialAppInputStream"
STREAM_APP_NAME = "stream_app_tutorial"
STREAM_APP = '''
@App:name('stream_app_tutorial')
@App:qlVersion("2")
CREATE FUNCTION concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var response = str1 + str2 + str3;
return response;
};
-- Stream
CREATE SOURCE STREAM tutorialAppInputStream (deviceID string, roomNo int, temperature double);
-- Table
CREATE TABLE tutorialAppOutputTable (id string, temperature double);
@info(name='Query')
INSERT INTO tutorialAppOutputTable
SELECT concatFn(roomNo,'-',deviceID) as id, temperature
FROM tutorialAppInputStream;
'''
INPUT_DATA = [
{
"deviceID": "AD11",
"roomNo": 200,
"temperature": 18,
},
{"deviceID": "AD11",
"roomNo": 201,
"temperature": 47},
]
SELECT_QUERY = "FOR doc IN tutorialAppOutputTable return doc"

# Create HTTPS session

URL = f"{FED_URL}/_open/auth"
payload = {
'email': EMAIL,
'password': PASSWORD
}
headers = {
'content-type': 'application/json'
}

response = requests.post(URL, data=json.dumps(payload), headers=headers)

if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. "
f"Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create stream application

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/streamapps"
payload = {
"definition": STREAM_APP,
"regions": ["devsuccess2-us-east", "devsuccess2-us-central", "devsuccess2-ap-west"]

}

resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nStream application created: ", result)

# Activate stream application

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/streamapps/{STREAM_APP_NAME}/active?active=true"
resp = session.patch(URL)
result = json.loads(resp.text)
print("\nStream application activated: ", result)

# Wait for all inputs and outputs to initialize
time.sleep(20)

# Publish messages to the input stream
STREAM_TYPE = "c8local"
PRODUCER_URL = f"wss://{FEDERATION}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{STREAM_TYPE}.{FABRIC}/{STREAM_TYPE}s.{STREAM_NAME}"

ws = create_connection(PRODUCER_URL, header=[f"Authorization:{AUTH_TOKEN}"])
payload = {
"payload": base64.b64encode(
six.b(json.dumps(INPUT_DATA[0]))
).decode("utf-8")
}
ws.send(json.dumps(payload))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)

payload = {
"payload": base64.b64encode(
six.b(json.dumps(INPUT_DATA[1]))
).decode("utf-8")
}
ws.send(json.dumps(payload))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)

ws.close()

# Verify results from the collection

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/cursor"
payload = {
"id": "tutorialStreamAppQuery",
"query": SELECT_QUERY,
"bindVars": {},
}
resp = session.post(URL, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nStream application results: ", result)

# Delete stream application

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/streamapps/{STREAM_APP_NAME}"
resp = session.delete(URL)
result = json.loads(resp.text)
print("\nStream application deleted: ", result)

# Delete stream

URL = f"{FED_URL}/_fabric/{FABRIC}/_api/streams/{STREAM_TYPE}s.{STREAM_NAME}"
resp = session.delete(URL)
result = json.loads(resp.text)
print("\nStream deleted: ", result)