Create a basic pub-sub streams workflow
This page shows you how to perform a basic pub-sub streams workflow using the Macrometa API. For more information about using Macrometa APIs, refer to APIs.
Prerequisites
- A Macrometa account with admin permissions.
- An API key with admin permissions. Here's how to Create an API Key.
REST API Streams Example
- Copy and paste the code block below in your favorite IDE.
- Update constants with your values, such as the API key.
- Run the code.
- JavaScript
- Python
const WebSocket = require('ws');
class APIRequest {
_headers = {
Accept: "application/json",
"Content-Type": "application/json"
};
constructor (httpUrl, apiKey) {
this._url = httpUrl;
this._headers.authorization = `apikey ${apiKey}`; // apikey keyword is needed here
}
_handleResponse (response, resolve, reject) {
if (response.ok) {
resolve(response.json());
} else {
reject(response);
}
}
req (endpoint, { body, ...options } = {}) {
const self = this;
return new Promise(function (resolve, reject) {
fetch(self._url + endpoint, {
headers: self._headers,
body: body ? JSON.stringify(body) : undefined,
...options
}).then((response) => self._handleResponse(response, resolve, reject));
});
}
}
const apiKey = "XXXXX" // Use your API key here
const globalUrl = "api-play.paas.macrometa.io";
const httpUrl = `https://${globalUrl}`;
const tenant = "XXXXX" // Use your tenant name here
const stream = "api_tutorial_streams";
const consumerName = "api_tutorial_streams_consumer";
const isGlobal = true;
const run = async function () {
try {
const connection = new APIRequest(httpUrl, apiKey);
/* ------------------------------ Create stream ----------------------------- */
try {
await connection.req(
`/_fabric/_system/streams/${stream}?global=${isGlobal}`,
{
body: { name: stream },
method: "POST"
}
);
console.log("Stream created successfully");
} catch (e) {
if (e.status === 409) {
console.log("Stream already exists, skipping creation of stream");
} else {
console.log("Error while creating stream");
throw e;
}
}
/* ----------------- Publish and subscribe message to stream ---------------- */
const region = isGlobal ? "c8global" : "c8local";
const streamName = `${region}s.${stream}`;
// Fetching local URL in case the stream is local
const localDcDetails = await connection.req(`/datacenter/local`, {
method: "GET"
});
const dcUrl = localDcDetails.tags.url;
const url = isGlobal
? globalUrl
: `api-${dcUrl}`;
const otpConsumer = await connection.req(`/apid/otp`, {
method: "POST"
});
const otpProducer = await connection.req(`/apid/otp`, {
method: "POST"
});
const consumerUrl = `wss://${url}/_ws/ws/v2/consumer/persistent/${tenant}/${region}._system/${streamName}/${consumerName}?otp=${otpConsumer.otp}`;
const producerUrl = `wss://${url}/_ws/ws/v2/producer/persistent/${tenant}/${region}._system/${streamName}?otp=${otpProducer.otp}`;
let consumer;
let producer;
let producerInterval;
/* -------------------------- Initialize consumer -------------------------- */
const initConsumer = () => {
return new Promise((resolve) => {
consumer = new WebSocket(consumerUrl);
consumer.onopen = function () {
console.log("WebSocket:Consumer is open now for " + streamName);
resolve();
};
consumer.onerror = function () {
console.log(
"Failed to establish WebSocket:Consumer connection for " +
streamName
);
};
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + streamName);
};
consumer.onmessage = function (message) {
const receivedMsg = message.data && JSON.parse(message.data);
console.log(
`WebSocket:Consumer message received at ${new Date()}`,
receivedMsg
);
const ackMsg = { messageId: receivedMsg.messageId };
consumer.send(JSON.stringify(ackMsg));
};
});
};
/* -------------------------- Initialize producer -------------------------- */
const initProducer = () => {
producer = new WebSocket(producerUrl);
producer.onopen = function () {
console.log("WebSocket:Producer is open now for " + streamName);
producerInterval = setInterval(function () {
console.log(`WebSocket:Producer message sent at ${new Date()}`);
producer.send(JSON.stringify({ payload: `test` }));
}, 10000);
};
producer.onclose = function (e) {
console.log("Closed WebSocket:Producer connection for " + streamName);
clearInterval(producerInterval);
};
producer.onerror = function (e) {
console.log(
"Failed to establish WebSocket:Producer connection for " + streamName
);
};
};
initConsumer().then(() => {
initProducer();
});
await new Promise((resolve) => setTimeout(resolve, 1 * 40 * 1000));
consumer.close();
console.log("CONSUMER CLOSING...");
producer.close();
console.log("PRODUCER CLOSING...");
await new Promise((resolve) => setTimeout(resolve, 5000));
/* ------------------------ Unsubscribe from stream ------------------------ */
const consumerUnsubscribe = await connection.req(
`/_fabric/_system/_api/streams/subscription/${consumerName}`,
{
method: "DELETE"
}
);
console.log(
`${consumerName} unsubscribed successfully`,
consumerUnsubscribe
);
} catch (e) {
console.error(e);
}
};
run();
import base64
import json
import requests
import six
import threading
import time
from websocket import create_connection
# Constants
URL = "api-play.paas.macrometa.io"
HTTP_URL = f"https://{URL}"
API_KEY = "XXXXX" # Use your API key here
AUTH_TOKEN = f"apikey {API_KEY}"
FABRIC = "_system"
STREAM_NAME = "teststream"
TENANT_NAME = "XXXXX" # Add your tenant name here
CONSUMER_NAME = "testconsumer"
IS_GLOBAL = True # For a global stream pass True and False for local stream
stream_type = ""
if IS_GLOBAL:
stream_type = "c8global"
else:
stream_type = "c8local"
# Create a HTTPS session
session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})
# Create a stream
url = f"{HTTP_URL}/_fabric/{FABRIC}/_api/streams/{STREAM_NAME}?global={IS_GLOBAL}"
resp = session.post(url)
print("\nStream Created: ", resp.text)
# Publish messages
# Send message in body
producerurl = f"wss://{URL}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{stream_type}.{FABRIC}/{stream_type}s.{STREAM_NAME}"
# Enter your message here
msg = "Hello World"
def create_producer():
ws = create_connection(producerurl, header=[f"Authorization: {AUTH_TOKEN}"])
payload = {
"payload": base64.b64encode(
six.b(msg)
).decode("utf-8")
}
ws.send(json.dumps(payload))
print(f"Message sent: {msg}")
time.sleep(3)
response = json.loads(ws.recv())
if response['result'] == 'ok':
print("Received acknowledgement that message was delivered successfully")
else:
print(f"Failed to publish message: {response}")
ws.close()
# Or
# Use publish message api to publish message
#url = f"{HTTP_URL}/_fabric/{FABRIC}/_api/streams/{stream_type}s.{STREAM_NAME}/publish?global={IS_GLOBAL}"
#resp = session.post(url, data="Hello")
#print("\nMessage Posted: ", resp.text)
# Subscribe to stream
consumerurl = f"wss://{URL}/_ws/ws/v2/consumer/persistent/{TENANT_NAME}/{stream_type}.{FABRIC}/{stream_type}s.{STREAM_NAME}/{CONSUMER_NAME}"
def create_consumer():
ws = create_connection(consumerurl, header=[f"Authorization: {AUTH_TOKEN}"])
while True:
msg = json.loads(ws.recv())
if msg:
print(f"Message received: {base64.b64decode(msg['payload']).decode('utf-8')}")
# Acknowledge successful processing
ws.send(json.dumps({'messageId': msg['messageId']}))
break
ws.close()
# Threading is added here to open both producer and consumer connections simultaneously (not needed when using publish api)
t1 = threading.Thread(target=create_producer)
t2 = threading.Thread(target=create_consumer)
t1.start(), t2.start()
t1.join(), t2.join()
# Delete subscription
url = f"{HTTP_URL}/_fabric/{FABRIC}/_api/streams/{stream_type}s.{STREAM_NAME}/subscriptions/{CONSUMER_NAME}?global=true"
resp = session.delete(url)
print("Subscription deleted: ", resp.text)