Getting Started with Streams
Before working with streams, you need to know the following concepts:
- Producers/Publishers: Producers publish messages to streams, which are stored once. However, consumers can consume these messages as many times as needed. The stream is the source of truth for consumption.
- Consumers: Streams group consumers together to consume messages. Each group of consumers is a subscription on a stream. Each consumer group can have its way of consuming the messages: exclusively, shared, or failover.
- Messages: GDN streams retain all messages after creating a subscription, even after the consumer disconnects from the server. Retained messages are only discarded when a consumer acknowledges successful processing.
- Streams: A stream is a named channel for sending messages. A distributed append-only log backs each stream and can be local (at one edge location only) or global (across all edge locations in the fabric).
This guide demonstrates how to get started using Streams with the pyC8 and jsC8 SDKs and GDN CLI.
Objectives
At the end of this guide, you would have achieved the following:
- Created your first stream
- Published messages to the stream
- Subscribed to messages in a stream
- Client SDK
- GDN CLI
Prerequisites
- A Macrometa account with admin permissions.
- An API key with admin permissions. Here's how you can create an API key.
- Install the SDK. Macrometa offers different SDKs to enable you work and interact with GDN.
Before proceeding
- Create a new JavaScript (.js) or Python (.py) file in your favorite IDE.
- Copy the code block below and paste it into your JavaScript or Python file.
- With each subsequent step, append the code block to the existing file and then run it.
Note: The code snippets in each step aren't the same as the code in the full demo file at the end.
If you want to skip the explanation and just run the code, here is the Full Demo File.
Step 1. Connect to GDN
To use streams with Macrometa Global Data Network (GDN), you must first establish a connection to a local region.
Running this code initializes a server connection to the specified region URL. Before attempting a connection, ensure to authenticate your account and have your details ready to enable a successful connection.
- Python SDK
- JavaScript SDK
# Import libraries
from c8 import C8Client
# Define constants. You can find these constants in the GUI
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key
print("--- Connecting to GDN")
# Choose one of the following methods to access the GDN. API key is recommended.
# Authenticate with API key
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)
# Authenticate with JWT
# client = C8Client(protocol='https', host=URL, port=443, token=<your token>, geofabric=GEO_FABRIC))
# Authenticate with email and password
# client = C8Client(protocol='https', host=URL, port=443, email=<your email id>, password=<your password>, geofabric=GEO_FABRIC)
const jsc8 = require("jsc8");
// Choose one of the following methods to access the GDN. API key is recommended.
// API key
const client = new jsc8({url: "https://play.paas.macrometa.io", apiKey: "XXXX", fabricName: '_system'});
// JSON Web Token
// const client = new jsc8({url: "https://play.paas.macrometa.io", token: "XXXX", fabricName: '_system'});
// Or use email and password to authenticate client instance
// const client = new jsc8("https://play.paas.macrometa.io");
// Replace values with your email and password (use it inside an async function).
// await client.login("nemo@nautilus.com", "xxxxxx");
Step 2. Get Fabric Details
Get fabric details, including the name and associated regions.
- Python SDK
- JavaScript SDK
# Importing stuff you'll need later
from operator import concat
import base64
import json
import warnings
warnings.filterwarnings("ignore")
# Get the fabric details
print("Getting fabric details...")
print(client.get_fabric_details())
async function getFabric() {
try {
await console.log("Getting the fabric details...");
let result = await client.get();
await console.log("result is: ", result);
} catch(e){
await console.log("Fabric details could not be fetched because "+ e);
}
}
getFabric();
Step 3. Create Global and Local Streams
The streams in GDN can be either local or globally geo-replicated. The code below allows you to create either or both and then get the stream details.
- Python SDK
- JavaScript SDK
prefix_text = ""
is_local = False # If false, then the stream created below is global
demo_stream = 'streamQuickstart'
# Get the right prefix for the streamName
if is_local:
prefix_text = "c8locals."
else:
prefix_text = "c8globals."
# Create the stream if it doesn't already exist
# To create both a global and local stream, run the code twice, once with is_local = True, once False
stream_name = {"stream-id": ""}
if client.has_stream(demo_stream, local = is_local):
print("Stream already exists")
stream_name["stream-id"] = concat(prefix_text, demo_stream)
print ("Old Producer =", stream_name["stream-id"])
else:
stream_name = client.create_stream(demo_stream, local=is_local)
print ("New Producer =", stream_name["stream-id"])
# Get and print stream details
print("Get streams: ", client.get_streams())
async function streams() {
try{
await console.log("Creating local stream...");
const stream_local = await client.createStream("testStream-local", true);
console.log('Local Stream:', stream_local.result["stream-id"]);
await console.log("\nCreating global stream...");
const stream_global = await client.createStream("testStream-global", false);
console.log('Global Stream:', stream_global.result["stream-id"]);
} catch(e){
await console.log("Streams could not be fetched because "+ e);
}
}
streams();
Step 4. Publish Messages
Publish messages to a stream with a Producer. The stream can be a local stream or could be a geo-replicated stream.
- Python SDK
- JavaScript SDK
producer = client.create_stream_producer(demo_stream, local=is_local)
for i in range(10):
msg1 = "Persistent Hello from " + "("+ str(i) +")"
print("Stream: ", msg1)
producer.send(msg1)
async function streams() {
try {
await console.log("Creating local stream...");
const stream = client.stream("my-stream", true);
await stream.createStream();
const producerOTP = await stream.getOtp();
const producer = await stream.producer("play.paas.macrometa.io", {
otp: producerOTP,
});
producer.on("open", () => {
// If your message is an object, convert the object to a string.
// e.g. const message = JSON.stringify({message:'Hello World'});
const message = "Hello World";
const payloadObj = { payload: Buffer.from(message).toString("base64") };
producer.send(JSON.stringify(payloadObj));
});
producer.on("message", (msg) => {
console.log(msg, "Sent Successfully");
});
} catch(e) {
await console.log("Publishing could not be done because "+ e);
}
}
streams()
Step 5. Subscribe to Stream
Subscribe and receive messages from a local or global stream.
- Python SDK
- JavaScript SDK
subscriber = client.subscribe(stream=demo_stream, local=is_local,
subscription_name="test-subscription-1")
for i in range(10):
print("In ",i)
m1 = json.loads(subscriber.recv()) # Listen on stream for any receiving messages
msg1 = base64.b64decode(m1["payload"]).decode('utf-8')
print(F"Received message '{msg1}' id='{m1['messageId']}'") # Print the received message
subscriber.send(json.dumps({'messageId': m1['messageId']})) # Acknowledge the received message
async function getDCList() {
const geo_fabric = "_system"
let dcListAll = await client.listUserFabrics();
let dcListObject = await dcListAll.find(function(o) { return o.name === geo_fabric; });
return dcListObject.options.dcList.split(",");
}
(async function() {
const dcList = await getDCList();
await console.log("dcList: ", dcList);
await client.createStream("my-stream", true);
//Here the last boolean value tells if the stream is local or global. false means that it is global.
const consumer = await client.createStreamReader("my-stream", "my-subscription", true);
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
// Received message payload
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
})();
Full Demo File
It's time to see streams in action!
Replace the contents of your .js or .py file from above with the code block below.
Log in to the GDN console from your browser and click Data > Streams.
- Select your recently created stream (c8globals.streamQuickstart) to view the output of the message within the console.
Open two terminal windows and start the program in each window
- In one terminal, type 'r' to begin listening for messages, while in the other terminal, type 'w' to begin writing messages
- Upon each write, you should see the message received in the second terminal window, as well as the message displayed in the GDN console output
- Python SDK
- JavaScript SDK
""" This file is a demo to send data to/from a stream """
from operator import concat
import base64
import json
import warnings
from c8 import C8Client
warnings.filterwarnings("ignore")
# Connect to GDN.
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "xxxxx" # Change this to your API key
is_local = False
prefix_text = ""
demo_stream = 'streamQuickstart'
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)
# Get the right prefix for the stream.
if is_local:
prefix_text = "c8locals."
else:
prefix_text = "c8globals."
# Create global and local streams.
def createStream():
""" This function creates a stream """
stream_name = {"stream-id": ""}
if client.has_stream(demo_stream, local = is_local):
print("Stream already exists")
stream_name["stream-id"] = concat(prefix_text, demo_stream)
print ("Old Producer =", stream_name["stream-id"])
else:
stream_name = client.create_stream(demo_stream, local=is_local)
print ("New Producer =", stream_name["stream-id"])
# Create the producer and publish messages.
def sendData():
""" This function sends data through a stream """
producer = client.create_stream_producer(demo_stream, local=is_local)
while True:
user_input = input("Enter your message to publish: ")
if user_input == '0':
break
producer.send(user_input)
# Create the subscriber and receive data.
def receiveData():
""" This function receives data from a stream """
subscriber = client.subscribe(stream=demo_stream, local=is_local,
subscription_name="test-subscription-1")
while True:
print("\nListening for message...")
m1 = json.loads(subscriber.recv()) # Listen on stream for any receiving messages
msg1 = base64.b64decode(m1["payload"]).decode('utf-8')
print(F"Received message: '{msg1}'")
# Output the ID of the received message
# print(F"Message ID:'{m1['messageId']}'")
subscriber.send(json.dumps({'messageId': m1['messageId']})) # Acknowledge the received message
createStream()
# User enters choice.
# On one terminal use 'r' to start the subscriber to read data
# Then on another terminal use 'w' to start the producer and publish message
user_input = input("Type 'w' to write data, type 'r' read data, and type '0' to quit at any time: ")
if user_input == "w":
sendData()
elif user_input == "r":
receiveData()
else:
print ("Invalid user input. Stopping program")
// Connect to GDN.
const jsc8 = require("jsc8");
const readline = require("readline");
const globalUrl = "https://play.paas.macrometa.io";
const apiKey = "xxxx"; //Change this to your API Key
// Create an authenticated instance with an API key (recommended)
const client = new jsc8({
url: globalUrl,
apiKey: apiKey,
fabricName: "_system"
});
/* Authenticate via JSON Web Token (JWT)
const client = new jsc8({ url: globalUrl, token: "XXXX", fabricName: "_system" });
*/
/* Create an authenticated client instance via email and password
const client = new jsc8(globalUrl);
await client.login("your@email.com", "password");
*/
// Variables
const stream = "streamQuickstart";
let prefix_text = "";
const is_local = false; //For a local stream pass this variable as True, or False for a global stream
// Get the right prefix for the stream
if (is_local) {
prefix_text = "c8locals.";
} else {
prefix_text = "c8globals.";
}
async function createMyStream () {
let streamName = { "stream-id": "" };
if (await client.hasStream(stream, is_local)) {
console.log("This stream already exists!");
streamName["stream-id"] = prefix_text + stream;
console.log(`Old Producer = ${streamName["stream-id"]}`);
} else {
streamName = await client.createStream(stream, is_local);
console.log(`New Producer = ${streamName.result["stream-id"]}`);
}
}
async function sendData () {
console.log("\n ------- Publish Messages ------");
const producer = await client.createStreamProducer(stream, is_local);
producer.on("open", () => {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
// Repeatedly ask the user for message to be published to the stream. User can always exit by typing 0
var recursiveUserInput = () => {
input.question(
"Enter your message to publish or Type 0 to exit:\n",
(userInput) => {
if (userInput === "0") {
producer.close();
return input.close();
}
const data = {
payload: Buffer.from(userInput).toString("base64")
};
producer.send(JSON.stringify(data));
console.log(`Message sent: ${userInput}`);
recursiveUserInput();
}
);
}
recursiveUserInput();
});
producer.onclose = function () {
console.log("Closed WebSocket:Producer connection for " + stream);
};
}
async function receiveData () {
console.log("\n ------- Receive Messages ------");
const consumer = await client.createStreamReader(
stream,
"test-subscription-1",
is_local
);
// Close consumer connection when user types 0
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
input.question(
"Type '0' to exit anytime:\n",
(userInput) => {
if (userInput === "0") {
consumer.close();
return input.close();
}
}
);
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + stream);
};
}
async function selectAction () {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});
input.question(
"Type 'w' to write data. Type 'r' to read data: ",
(userInput) => {
if (userInput === "w") {
sendData();
} else if (userInput === "r") {
receiveData();
} else {
console.log("Invalid user input. Stopping program.");
return false;
}
input.close();
}
);
}
(async function () {
await createMyStream();
await selectAction();
})();
To start creating, subscribing and receiving streams with the CLI, you need to install the gdn CLI.
After installing the CLI, you can then use any of the gdnsl stream commands to create, subscribe, and publish messages.