rabbitmq (Sink)
The rabbitmq sink pushes the events into a rabbitmq broker using the AMQP protocol
Syntax
CREATE SINK <NAME> WITH (type="rabbitmq", map.type="<STRING>", uri="<STRING>", heartbeat="<INT>", exchange.name="<STRING>", exchange.type="<STRING>", exchange.durable.enabled="<BOOL>", exchange.autodelete.enabled="<BOOL>", delivery.mode="<INT>", content.type="<STRING>", content.encoding="<STRING>", priority="<INT>", correlation.id="<STRING>", reply.to="<STRING>", expiration="<STRING>", message.id="<STRING>", timestamp="<STRING>", type="<STRING>", user.id="<STRING>", app.id="<STRING>", routing.key="<STRING>", headers="<STRING>", tls.enabled="<BOOL>", tls.truststore.path="<STRING>", tls.truststore.password="<STRING>", tls.truststore.type="<STRING>", tls.version="<STRING>")
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
uri | The URI that used to connect to an AMQP server. If no URI is specified, an error is logged in the CLI.e.g., amqp://guest:guest , amqp://guest:guest@localhost:5672 | STRING | No | No | |
heartbeat | The period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries. | 60 | INT | Yes | No |
exchange.name | The name of the exchange that decides what to do with a message it sends.If the exchange.name already exists in the RabbitMQ server, then the system uses that exchange.name instead of redeclaring. | STRING | No | Yes | |
exchange.type | The type of the exchange.name. The exchange types available are direct , fanout , topic and headers . For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html) | direct | STRING | Yes | Yes |
exchange.durable.enabled | If this is set to true , the exchange remains declared even if the broker restarts. | false | BOOL | Yes | Yes |
exchange.autodelete.enabled | If this is set to true , the exchange is automatically deleted when it is not used anymore. | false | BOOL | Yes | Yes |
delivery.mode | This determines whether the connection should be persistent or not. The value must be either 1 or 2 .If the delivery.mode = 1, then the connection is not persistent. If the delivery.mode = 2, then the connection is persistent. | 1 | INT | Yes | No |
content.type | The message content type. This should be the MIME content type. | null | STRING | Yes | No |
content.encoding | The message content encoding. The value should be MIME content encoding. | null | STRING | Yes | No |
priority | Specify a value within the range 0 to 9 in this parameter to indicate the message priority. | 0 | INT | Yes | Yes |
correlation.id | The message correlated to the current message. e.g., The request to which this message is a reply. When a request arrives, a message describing the task is pushed to the queue by the front end server. After that the frontend server blocks to wait for a response message with the same correlation ID. A pool of worker machines listen on queue, and one of them picks up the task, performs it, and returns the result as message. Once a message with right correlation ID arrives, thefront end server continues to return the response to the caller. | null | STRING | Yes | Yes |
reply.to | This is an anonymous exclusive callback queue. When the RabbitMQ receives a message with the reply.to property, it sends the response to the mentioned queue. This is commonly used to name a reply queue (or any other identifier that helps a consumer application to direct its response). | null | STRING | Yes | No |
expiration | The expiration time after which the message is deleted. The value of the expiration field describes the TTL (Time To Live) period in milliseconds. | null | STRING | Yes | No |
message.id | The message identifier. If applications need to identify messages, it is recommended that they use this attribute instead of putting it into the message payload. | null | STRING | Yes | Yes |
timestamp | Timestamp of the moment when the message was sent. If you do not specify a value for this parameter, the system automatically generates the current date and time as the timestamp value. The format of the timestamp value is dd/mm/yyyy . | current timestamp | STRING | Yes | No |
type | The type of the message. e.g., The type of the event or the command represented by the message. | null | STRING | Yes | No |
user.id | The user ID specified here is verified by RabbitMQ against theuser name of the actual connection. This is an optional parameter. | null | STRING | Yes | No |
app.id | The identifier of the application that produced the message. | null | STRING | Yes | No |
routing.key | The key based on which the excahnge determines how to route the message to the queue. The routing key is similar to an address for the message. | empty | STRING | Yes | Yes |
headers | The headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding. | null | STRING | Yes | Yes |
tls.enabled | This parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to true , the tls.truststore.path and tls.truststore.password parameters are initialized. | false | BOOL | Yes | No |
tls.truststore.path | The file path to the location of the truststore of the client that sends the RabbitMQ events via the AMQP protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the ${carbon.home}/resources/security directory. | \${carbon.home}/resources/security/client-truststore.jks | STRING | Yes | No |
tls.truststore.password | The password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses gdncarbon as the default password. | gdncarbon | STRING | Yes | No |
tls.truststore.type | The type of the truststore. | JKS | STRING | Yes | No |
tls.version | The version of the tls/ssl. | SSL | STRING | Yes | No |
Example 1
@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);
@info(name = 'query1')
CREATE SINK BarStream WITH (type ='rabbitmq', uri = 'amqp://guest:guest@localhost:5672', exchange.name = 'direct', routing.key= 'direct', map.type='xml') (symbol string, price float, volume long);
insert into BarStream
from FooStream select symbol, price, volume ;
This query publishes events to the direct
exchange with the direct
exchange type and the directTest
routing key.