Skip to main content

Pub-Sub with Streams Example

This page describes how to create geo-replicated streams and set up queues and pub-sub messaging with local latencies across the globe.

Prerequisites

Pub-Sub with Streams Code

  1. Copy and paste the code block below in your favorite IDE.
  2. Update constants with your values, such as the API key.
  3. Run the code.
  4. (Optional) Log in to the Macrometa console to view the streams.
const jsc8 = require("jsc8");
const readline = require("readline");
const globalUrl = "https://play.paas.macrometa.io";

// Create an authenticated instance with an API key (recommended) or JSON web token (JWT).
const client = new jsc8({
url: globalUrl,
apiKey:
"XXXX",
fabricName: "_system"
});
// const client = new jsc8({ url: gdnUrl, token: "XXXX", fabricName: "_system" });

// Or use email and password to authenticate a client instance
// const client = new jsc8(globalUrl);
// await client.login("your@email.com", "password");

// Variables
const stream = "streamQuickstart";
let prefix_text = "";
const is_local = false; //For a global stream pass True and False for local stream

// Get the right prefix for the stream
if (is_local) {
prefix_text = "c8locals.";
} else {
prefix_text = "c8globals.";
}

async function getDCList () {
const dcListAll = await client.listUserFabrics();
const dcListObject = await dcListAll.find(function (o) {
return o.name === "_system";
});
const dcList = dcListObject.options.dcList.split(",");
console.log("dcList: ", dcList);
}

async function createMyStream () {
let streamName = { "stream-id": "" };
if (await client.hasStream(stream, is_local)) {
console.log("Stream already exists");
streamName["stream-id"] = prefix_text + stream;
console.log(`Old Producer = ${streamName["stream-id"]}`);
} else {
streamName = await client.createStream(stream, is_local);
console.log(`New Producer = ${streamName.result["stream-id"]}`);
}
}

async function sendData () {
console.log("\n ------- Publish Messages ------");
const producer = await client.createStreamProducer(stream);

producer.on("open", () => {
for (let i = 0; i < 10; i++) {
const msg1 = `Persistent hello from (${JSON.stringify(i)})`;
const data = {
payload: Buffer.from(msg1).toString("base64")
};

console.log(`Stream: ${msg1}`);
producer.send(JSON.stringify(data));
}
});
producer.onclose = function (e) {
console.log("Closed WebSocket:Producer connection for " + streamName);
};
}

async function receiveData () {
console.log("\n ------- Receive Messages ------");
const consumer = await client.createStreamReader(
stream,
"test-subscription-1"
);

consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + stream);
};
}

async function selectAction () {
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});

input.question(
"Type 'w' or '1' to write data. Type 'r' or '0' to read data: ",
(userInput) => {
if (userInput === "w" || userInput === "1") {
sendData();
} else if (userInput === "r" || userInput === "0") {
receiveData();
} else {
console.log("Invalid user input. Stopping program.");
return false;
}
input.close();
}
);
}

(async function () {
await getDCList();
await createMyStream();
await selectAction();
})();