Skip to main content

Stream Workers Example

Assume the following credentials:

  • Tenant name: nemo@nautilus.com
  • Password: xxxxxx

SDK Download

Download the appropriate SDK for your preferred language.

    With Yarn or NPM

yarn add jsc8
(or)
npm install jsc8

If you want to use the SDK outside of the current directory, you can also install it globally using the `--global` flag:

npm install --global jsc8

From source,

git clone https://github.com/macrometacorp/jsc8.git
cd jsC8
npm install
npm run dist

Connect to GDN

Establish a connection to a local region. When this code runs, it initializes the server connection to the region URL you specified.

from c8 import C8Client
print("--- Connecting to C8")
client = C8Client(protocol='https', host='gdn.paasmacrometa.io', port=443,
email='nemo@nautilus.com', password='xxxxx',
geofabric='_system')

Validate Stream Application

Validate the stream application for syntax errors before saving.

stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic Stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections are automatically created if they do not already exist.')
/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages would be shown on the SampleCargoAppDestStream Stream Console
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
*/

-- Create Table SampleCargoAppInputTable to process events.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection ="SampleCargoAppInputTable", collection.type="doc", replication.type="global", maptype='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream ="SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""

print("--- Validating Stream Application Definition")
print(client.validate_stream_app(data=stream_app_definition))

Save Stream Application

By default, the stream application saves in the local region. Optionally, you can use dclist (domain component list) to deploy the stream application in other specified regions or all regions.

print("--- Creating Stream Application")
print(client.create_stream_app(data=stream_app_definition))

Enable or Disable Stream Application

print("Activate", client.activate_stream_app('Sample-Cargo-App', True))

print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))

To operate on created applications, you need to create an instance of the stream application.

Example: Update Stream Application

In this example, we update a stream application to store the input data into itself and another collection called SampleCargoAppDestTable.

from c8 import C8Client
from c8.fabric import StandardFabric

print("--- Connecting to C8")
client = C8Client(protocol='https', host='play.paas.macrometa.io', port=443, email='nemo@nautilus.com', password='xxxxxx', geofabric='_system')

# To operate on created apps, you need to create an instance of the app
app = client._fabric.stream_app("Sample-Cargo-App")

# Update the app using
data = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections are automatically created if they do not already exist.')
/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages would be shown on the `SampleCargoAppDestStream` Stream Console.
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
4. Following messages would be stored into SampleCargoAppDestTable
{"weight":1}
{"weight":2}
{"weight":3}
{"weight":4}
{"weight":5}
*/

-- Defines Table SampleCargoAppInputTable
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Define Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

-- Defining a Destination table to dump the data from the stream
CREATE STORE SampleCargoAppDestTable WITH (type = 'database', stream = "SampleCargoAppDestTable") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;

-- Data Processing
@info(name='Dump')
INSERT INTO SampleCargoAppDestTable
SELECT weight
FROM SampleCargoAppInputTable;
"""

# Optionally, specify a comma separated list of regions where stream application needs to be deployed
regions = []
print("--- Updating Stream Application `Sample-Cargo-App`")
result = app.update(data, regions)

#To Enable the stream app
print("Activate", client.activate_stream_app('Sample-Cargo-App', True))

Run an Ad Hoc Query

In this example, we run an ad hoc query on the store SampleCargoAppDestTable used in a stream application. It should get records which you inserted into SampleCargoAppInputTable.

from c8 import C8Client
from c8.fabric import StandardFabric

print("--- Connecting to C8")
client = C8Client(protocol='https', host='play.paas.macrometa.io', port=443, email='nemo@nautilus.com', password='xxxxxx', geofabric='_system')

# To operate on created apps, you need to create an instance of the app
app = client._fabric.stream_app("Sample-Cargo-App")
# Run query on application
q = "select * from SampleCargoAppDestTable limit 2"
result = app.query(q)
print(result)

Delete Stream Application

print("--- Deleting Stream Application `Sample-Cargo-App`")
result = client.delete_stream_app('Sample-Cargo-App')

Get Sample Stream Applications

You can try out several Stream Apps which are preloaded and ready to run.

print("--- You can try out several stream applications which are pre-loaded and ready to run")
print("Samples", client.get_stream_app_samples())

Complete Example

The following example uses the code snippets provided in this tutorial.

import time
import traceback
from c8 import C8Client
# Simple Approach
print("--- Connecting to C8")
client = C8Client(protocol='https', host='play.paas.macrometa.io', port=443,
email='nemo@nautilus.com', password='xxxxx',
geofabric='_system')
stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream application to demonstrate reading data from input stream and store it in the collection. The stream and collections are automatically created if they do not already exist.')
/**
Testing the Stream Application:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable C8DB Collection
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages would be shown on the SampleCargoAppDestStream Stream Console
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
*/
-- Create Table SampleCargoAppInputTable to process events.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""
# Validate a stream application
print(client.validate_stream_app(data=stream_app_definition))
# Create a stream application
print(client.create_stream_app(data=stream_app_definition))
# Retrive a stream application
print("Retrive", client.retrieve_stream_app())
# Get a stream application handle for advanced operations
print("Get App", client.get_stream_app('Sample-Cargo-App'))
# Deactivate a stream application
print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))
# Activate a stream application
print("Activate", client.activate_stream_app('Sample-Cargo-App', True))
# Delete a stream application
print(client.delete_stream_app('Sample-Cargo-App'))
# Get stream application samples
print("Samples", client.get_stream_app_samples())