Pub-Sub with Streams Example
This page describes how to create geo-replicated streams and set up queues and pub-sub messaging with local latencies across the globe.
Prerequisites
- A Macrometa account with sufficient permissions to create streams.
- Appropriate SDK installed. For more information, refer to Install SDKs.
Pub-Sub with Streams Code
- Copy and paste the code block below in your favorite IDE.
- Update constants with your values, such as the API key.
- Run the code.
- (Optional) Log in to the Macrometa console to view the streams.
- Javascript
- Python
// Connect to GDN.
const jsc8 = require("jsc8");
const readline = require("readline");
const globalUrl = "https://play.paas.macrometa.io";
const apiKey = "xxxx"; //Change this to your API Key
// Create an authenticated instance with an API key (recommended)
const client = new jsc8({
url: globalUrl,
apiKey: apiKey,
fabricName: "_system"
});
/* Authenticate via JSON Web Token (JWT)
const client = new jsc8({ url: globalUrl, token: "XXXX", fabricName: "_system" });
*/
/* Create an authenticated client instance via email and password
const client = new jsc8(globalUrl);
await client.login("your@email.com", "password");
*/
// Variables
const stream = "streamQuickstart";
let prefix_text = "";
const is_local = false; //For a local stream pass this variable as True, or False for a global stream
// Get the right prefix for the stream
if (is_local) {
prefix_text = "c8locals.";
} else {
prefix_text = "c8globals.";
}
async function createMyStream () {
let streamName = { "stream-id": "" };
if (await client.hasStream(stream, is_local)) {
console.log("This stream already exists!");
streamName["stream-id"] = prefix_text + stream;
console.log(`Old Producer = ${streamName["stream-id"]}`);
} else {
streamName = await client.createStream(stream, is_local);
console.log(`New Producer = ${streamName.result["stream-id"]}`);
}
}
async function sendData () {
console.log("\n ------- Publish Messages ------");
const producer = await client.createStreamProducer(stream, is_local);
producer.on("open", () => {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
// Repeatedly ask the user for message to be published to the stream. User can always exit by typing 0
var recursiveUserInput = () => {
input.question(
"Enter your message to publish or Type 0 to exit:\n",
(userInput) => {
if (userInput === "0") {
producer.close();
return input.close();
}
const data = {
payload: Buffer.from(userInput).toString("base64")
};
producer.send(JSON.stringify(data));
console.log(`Message sent: ${userInput}`);
recursiveUserInput();
}
);
}
recursiveUserInput();
});
producer.onclose = function () {
console.log("Closed WebSocket:Producer connection for " + stream);
};
}
async function receiveData () {
console.log("\n ------- Receive Messages ------");
const consumer = await client.createStreamReader(
stream,
"test-subscription-1",
is_local
);
// Close consumer connection when user types 0
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
input.question(
"Type '0' to exit anytime:\n",
(userInput) => {
if (userInput === "0") {
consumer.close();
return input.close();
}
}
);
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + stream);
};
}
async function selectAction () {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
input.question(
"Type 'w' to write data. Type 'r' to read data: ",
(userInput) => {
if (userInput === "w") {
sendData();
} else if (userInput === "r") {
receiveData();
} else {
console.log("Invalid user input. Stopping program.");
return false;
}
input.close();
}
);
}
(async function () {
await createMyStream();
await selectAction();
})();
""" This file is a demo to send data to/from a stream """
from operator import concat
import base64
import json
import warnings
from c8 import C8Client
warnings.filterwarnings("ignore")
# Connect to GDN.
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "xxxxx" # Change this to your API key
is_local = False
prefix_text = ""
demo_stream = 'streamQuickstart'
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)
# Get the right prefix for the stream.
if is_local:
prefix_text = "c8locals."
else:
prefix_text = "c8globals."
# Create global and local streams.
def createStream():
""" This function creates a stream """
stream_name = {"stream-id": ""}
if client.has_stream(demo_stream, local = is_local):
print("Stream already exists")
stream_name["stream-id"] = concat(prefix_text, demo_stream)
print ("Old Producer =", stream_name["stream-id"])
else:
stream_name = client.create_stream(demo_stream, local=is_local)
print ("New Producer =", stream_name["stream-id"])
# Create the producer and publish messages.
def sendData():
""" This function sends data through a stream """
producer = client.create_stream_producer(demo_stream, local=is_local)
while True:
user_input = input("Enter your message to publish: ")
if user_input == '0':
break
producer.send(user_input)
# Create the subscriber and receive data.
def receiveData():
""" This function receives data from a stream """
subscriber = client.subscribe(stream=demo_stream, local=is_local,
subscription_name="test-subscription-1")
while True:
print("\nListening for message...")
m1 = json.loads(subscriber.recv()) # Listen on stream for any receiving messages
msg1 = base64.b64decode(m1["payload"]).decode('utf-8')
print(F"Received message: '{msg1}'")
# Output the ID of the received message
# print(F"Message ID:'{m1['messageId']}'")
subscriber.send(json.dumps({'messageId': m1['messageId']})) # Acknowledge the received message
createStream()
# User enters choice.
# On one terminal use 'r' to start the subscriber to read data
# Then on another terminal use 'w' to start the producer and publish message
user_input = input("Type 'w' to write data, type 'r' read data, and type '0' to quit at any time: ")
if user_input == "w":
sendData()
elif user_input == "r":
receiveData()
else:
print ("Invalid user input. Stopping program")