Skip to main content

http (Sink)

HTTP sink publishes messages via HTTP or HTTPS protocols using methods such as POST, GET, PUT, and DELETE on formats text, XML and JSON. It can also publish to endpoints protected by basic authentication or OAuth 2.0.

Syntax

CREATE SINK <NAME> WITH (type="http", map.type="<STRING>" publisher.url="<STRING>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", https.truststore.file="<STRING>", https.truststore.password="<STRING>", oauth.username="<STRING>", oauth.password="<STRING>", consumer.key="<STRING>", consumer.secret="<STRING>", token.url="<STRING>", refresh.token="<STRING>", headers="<STRING>", method="<STRING>", socket.idle.timeout="<INT>", chunk.disabled="<BOOL>", ssl.protocol="<STRING>", ssl.verification.disabled="<BOOL>", tls.store.type="<STRING>", ssl.configurations="<STRING>", proxy.host="<STRING>", proxy.port="<STRING>", proxy.username="<STRING>", proxy.password="<STRING>", client.bootstrap.configurations="<STRING>", max.pool.active.connections="<INT>", min.pool.idle.connections="<INT>", max.pool.idle.connections="<INT>", min.evictable.idle.time="<STRING>", time.between.eviction.runs="<STRING>", max.wait.time="<STRING>", test.on.borrow="<BOOL>", test.while.idle="<BOOL>", exhausted.action="<INT>", hostname.verification.enabled="<BOOL>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
publisher.urlThe URL to which the outgoing events should be published. Examples: http://localhost:8080/endpoint, https://localhost:8080/endpointSTRINGNoNo
basic.auth.usernameThe username to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.password property should be also set when using this property.-STRINGYesNo
basic.auth.passwordThe password to be included in the authentication header when calling endpoints protected by basic authentication. basic.auth.username property should be also set when using this property.-STRINGYesNo
https.truststore.fileThe file path of the client truststore when sending messages through https protocol.`\${carbon.home}/resources/security/client-truststore.jks`STRINGYesNo
https.truststore.passwordThe password for the client-truststore.gdncarbonSTRINGYesNo
oauth.usernameThe username to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.password property should be also set when using this property.-STRINGYesNo
oauth.passwordThe password to be included in the authentication header when calling endpoints protected by OAuth 2.0. oauth.username property should be also set when using this property.-STRINGYesNo
consumer.keyConsumer key used for calling endpoints protected by OAuth 2.0-STRINGYesNo
consumer.secretConsumer secret used for calling endpoints protected by OAuth 2.0-STRINGYesNo
token.urlToken URL to generate a new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
refresh.tokenRefresh token used for generating new access tokens when calling endpoints protected by OAuth 2.0-STRINGYesNo
headersHTTP request headers in format "'<key>:<value>','<key>:<value>'". When Content-Type header is not provided the system derives the Content-Type based on the provided sink mapper as following: - map.type='xml': application/xml - map.type='json': application/json - map.type='text': plain/text - map.type='keyvalue': application/x-www-form-urlencoded - For all other cases system defaults to plain/text Also the Content-Length header need not to be provided, as the system automatically defines it by calculating the size of the payload.Content-Type and Content-Length headersSTRINGYesNo
methodThe HTTP method used for calling the endpoint.POSTSTRINGYesNo
socket.idle.timeoutSocket timeout in millis.6000INTYesNo
chunk.disabledDisable chunked transfer encoding.falseBOOLYesNo
ssl.protocolSSL/TLS protocol.TLSSTRINGYesNo
ssl.verification.disabledDisable SSL verification.falseBOOLYesNo
tls.store.typeTLS store type.JKSSTRINGYesNo
ssl.configurationsSSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'-STRINGYesNo
proxy.hostProxy server host-STRINGYesNo
proxy.portProxy server port-STRINGYesNo
proxy.usernameProxy server username-STRINGYesNo
proxy.passwordProxy server password-STRINGYesNo
client.bootstrap.configurationsClient bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Client connect timeout in millis: 'client.bootstrap.connect.timeout:15000' - Client socket timeout in seconds: 'client.bootstrap.socket.timeout:15' - Client socket reuse: 'client.bootstrap.socket.reuse:true' - Enable TCP no delay: 'client.bootstrap.nodelay:true' - Enable client keep alive: 'client.bootstrap.keepalive:true' - Send buffer size: 'client.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'client.bootstrap.recievebuffersize:1048576'-STRINGYesNo
max.pool.active.connectionsMaximum possible number of active connection per client pool.-1INTYesNo
min.pool.idle.connectionsMinimum number of idle connections that can exist per client pool.0INTYesNo
max.pool.idle.connectionsMaximum number of idle connections that can exist per client pool.100INTYesNo
min.evictable.idle.timeMinimum time (in millis) a connection may sit idle in the client pool before it become eligible for eviction.300000STRINGYesNo
time.between.eviction.runsTime between two eviction operations (in millis) on the client pool.30000STRINGYesNo
max.wait.timeThe maximum time (in millis) the pool will wait (when there are no available connections) for a connection to be returned to the pool.60000STRINGYesNo
test.on.borrowEnable connections to be validated before being borrowed from the client pool.trueBOOLYesNo
test.while.idleEnable connections to be validated during the eviction operation (if any).trueBOOLYesNo
exhausted.actionAction that should be taken when the maximum number of active connections are being used. This action should be indicated as an int and possible action values are following. 0 - Fail the request. 1 - Block the request, until a connection returns to the pool. 2 - Grow the connection pool size.1 (Block when exhausted)INTYesNo
hostname.verification.enabledEnable hostname verification.trueBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
clientBootstrapClientGroupSizeNumber of client threads to perform non-blocking read and write to one or more channels.(Number of available processors) * 2Any positive integer
clientBootstrapBossGroupSizeNumber of boss threads to accept incoming connections.Number of available processorsAny positive integer
clientBootstrapWorkerGroupSizeNumber of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.(Number of available processors) * 2Any positive integer
trustStoreLocationThe default truststore file path.`\${carbon.home}/resources/security/client-truststore.jks`Path to client truststore `.jks` file
trustStorePasswordThe default truststore password.gdncarbonTruststore password as string

Example 1

CREATE SINK StockStream WITH (type = 'http', map.type = 'json', publisher.url = 'http://stocks.com/stocks') (symbol string, price float, volume long);

Events arriving on the StockStream will be published to the HTTP endpoint http://stocks.com/stocks using POST method with Content-Type application/json by converting those events to the default JSON format as following:

{
"event": {
"symbol": "FB",
"price": 24.5,
"volume": 5000
}
}

Example 2

CREATE SINK FooStream WITH (type='http', map.type='xml', publisher.url = 'http://localhost:8009/foo', client.bootstrap.configurations = "'client.bootstrap.socket.timeout:20'", max.pool.active.connections = '1', headers = "{{headers}}", map.payload="""<stock>{{payloadBody}}</stock>""") (payloadBody String, headers string);

Events arriving on FooStream will be published to the HTTP endpoint http://localhost:8009/foo using POST method with Content-Type application/xml and setting payloadBody and header attribute values. If the payloadBody contains

<symbol>gdn</symbol>
<price>55.6</price>
<volume>100</volume>

and header contains 'topic:foobar' values, then the system will generate an output with the body:

<stock>
<symbol>gdn</symbol>
<price>55.6</price>
<volume>100</volume>
</stock>

and HTTP headers: Content-Length:xxx, Content-Location:'xxx', Content-Type:'application/xml', HTTP_METHOD:'POST'