Skip to main content

Pub-Sub with Streams

This is about how to create geo-replicated streams and do queues & pub-sub messaging with local latencies across the globe.

Pre-requisite

Let's assume your

Installation

With Yarn or NPM

yarn add jsc8
(or)
npm install jsc8

If you want to use the SDK outside of the current directory, you can also install it globally using the `--global` flag:

npm install --global jsc8

From source,

git clone https://github.com/macrometacorp/jsc8.git
cd jsC8
npm install
npm run dist

Code Sample

const jsc8 = require("jsc8");
const readline = require("readline");
const globalUrl = "https://gdn.paas.macrometa.io";

// Create an authenticated instance with a token or API key
// const client = new jsc8({ url: gdnUrl, token: "XXXX", fabricName: "_system" });
const client = new jsc8({
url: globalUrl,
apiKey:
"XXXX",
fabricName: "_system"
});

// Or use email and password to authenticate a client instance
// const client = new jsc8(globalUrl);
// await client.login("[email protected]", "xxxxxx");

// Variables
const stream = "streamQuickstart";
let prefixText = "";
const prefixBool = false;

// Get the right prefix for the stream
if (prefixBool) {
prefixText = "c8locals.";
} else {
prefixText = "c8globals.";
}

async function getDCList () {
const dcListAll = await client.listUserFabrics();
const dcListObject = await dcListAll.find(function (o) {
return o.name === "_system";
});
const dcList = dcListObject.options.dcList.split(",");
console.log("dcList: ", dcList);
}

async function createMyStream () {
let streamName = { "stream-id": "" };
if (await client.hasStream(stream, prefixBool)) {
console.log("Stream already exists");
streamName["stream-id"] = prefixText + stream;
console.log(`OLD Producer = ${streamName["stream-id"]}`);
} else {
streamName = await client.createStream(stream, prefixBool);
console.log(`NEW Producer = ${streamName.result["stream-id"]}`);
}
}

async function sendData () {
console.log("\n ------- Publish Messages ------");
const producer = await client.createStreamProducer(stream);

producer.on("open", () => {
for (let i = 0; i < 10; i++) {
const msg1 = `Persistent hello from (${JSON.stringify(i)})`;
const data = {
payload: Buffer.from(msg1).toString("base64")
};

console.log(`Stream: ${msg1}`);
producer.send(JSON.stringify(data));
}
});
producer.onclose = function (e) {
console.log("Closed WebSocket:Producer connection for " + streamName);
};
}

async function receiveData () {
console.log("\n ------- Receive Messages ------");
const consumer = await client.createStreamReader(
stream,
"test-subscription-1"
);

consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + stream);
};
}

async function selectAction () {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});

input.question(
"Type 'w' or '1' to write data. Type 'r' or '0' to read data: ",
(userInput) => {
if (userInput === "w" || userInput === "1") {
sendData();
} else if (userInput === "r" || userInput === "0") {
receiveData();
} else {
console.log("Invalid user input. Stopping program.");
return false;
}
input.close();
}
);
}

(async function () {
await getDCList();
await createMyStream();
await selectAction();
})();