Connectors

Connectors are used to ingress and egress data to and from external systems into Streamlio Cloud. This section describes how to use connectors and which connectors are supported.

Sources and sinks

Pulsar IO connectors come in two types:

  • Sources feed data into Pulsar from other systems. Common sources include other messaging systems and "firehose"-style data pipeline APIs.

  • Sinks are fed data from Pulsar. Common sinks include other messaging systems and SQL and NoSQL databases.

This diagram illustrates the relationship between sources, sinks, and Pulsar:

Pulsar IO diagram

Sources supported by Streamlio Cloud

Sinks supported By Streamlio Cloud

Sources

Ingress data from external system to Pulsar. The guide below only covers the basics for using a Pulsar Source. Please refer to http://pulsar.apache.org/docs/en/io-overview/ for complete details

Creating a Source

The minimum set of required parameters needed to create a Pulsar Source with the Pulsar admin CLI is the following

  • --name

    • The name a user wishes to name the source. The name must be unique across all sources submitted. Names can only contain lowercase letters, numbers, -, and .

  • --source-type

    • The type of source this is e.g. aws-s3. The list of support sources by Streamlio Cloud are found here

  • --destinationTopicName

    • The Pulsar topic to write to

Optionally a list of Sink specific configs can be specified via a file

  • --source-config-file

Please note that while this parameter is not required by the Pulsar CLI, most sources require a set of configuration to run. Please reference this page for configurations for each source

  • --tenant

    • The tenant that the source should be under. If this parameter is not specified, the default tenant, "public", will be used.

  • --namespace

    • The namespace that the source should be under. If this parameter is not specified, the default namespace, "default", will be used

Creating a Pulsar sink using the Pulsar admin CLI is of the following format

pulsar-admin source create \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME> \
--source-type <SOURCE_TYPE> \
--destinationTopicName <INPUTS> \
--source-config-file <SOURCE_CONFIG_FILE>

The following is an example of creating the data-generatorsource that generates sample data

pulsar-admin source create \
--tenant test_tenant \
--namespace test_namespace \
--name generator \
--destinationTopicName generator_test \
--source-type data-generator

Below is an example of creating a source with a config file. The twitter source will be used as an example.

The following is an example of the source config file that will be submitted with the twitter source

twitter_configs.yaml
configs:
consumerKey: "<CONSUMER_KEY>"
consumerSecret: "<CONSUMER_SECRET>"
token: "<TOKEN>"
tokenSecret: "<TOKEN_SECRET>"

Above, secrets/sensitive information are passed via plaintext. If you wish to pass sensitive information more securely, please create secrets within the Streamlio Cloud and reference them in the config file as follows:

twitter_configs.yaml
secrets:
consumerKey:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>
consumerSecret:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>
token:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>
tokenSecret:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>

The sink can then be submitted via the following command

pulsar-admin source create \
--tenant test_tenant \
--namespace test_namespace \
--name twitter-source \
--source-type twitter \
--destinationTopicName tweets \
--source-config-file twitter_configs.yaml

Updating a Source

Once a Pulsar Source is submitted, it can be updated at anytime. Most of the parameters/configuration are update to be updated. The following is an example demonstrates how to change the destination topic for a source.

pulsar-admin source update \
--tenant test_tenant \
--namespace test_namespace \
--name twitter-source \
--destinationTopicName tweets-2

Deleting a Source

The format of the command to delete a source is the following

pulsar-admin source delete \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin source delete \
--tenant test_tenant \
--namespace test_namespace \
--name twitter-source

Get Source info

The format of the command to get the metadata of a source is following

pulsar-admin source get \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin source get \
--tenant test_tenant \
--namespace test_namespace \
--name test-source

An example of the data returned

{
"tenant": "public",
"namespace": "default",
"name": "test_source",
"className": "org.apache.pulsar.io.datagenerator.DataGeneratorSource",
"topicName": "output",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"archive": "builtin://data-generator"
}

Get Source status

The format of the command to get the status of a source is following

pulsar-admin source status \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin source status \
--tenant test_tenant \
--namespace test_namespace \
--name test-source

An example of the data returned

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 16537,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 16537,
"lastReceivedTime" : 1557899843532,
"workerId" : "c-standalone-fw-MacBook-Pro.local-8080"
}
} ]
}

Sinks

Egress data from Pulsar to external system. The guide below only covers the basics for using a Pulsar Source. Please refer to http://pulsar.apache.org/docs/en/io-overview/ for complete details

Creating a Sink

The minimum set of required parameters needed to create a Pulsar Sink with the Pulsar admin CLI is the following

  • --name

    • The name a user wishes to name the sink. The name must be unique across all sinks submitted. Names can only contain lowercase letters, numbers, -, and .

  • --sink-type

    • The type of sink this is e.g. aws-s3. The list of support Sinks by Streamlio Cloud are found here

  • --inputs

    • A list of comma separated Pulsar topics to read from.

Optionally a list of Sink specific configs can be specified via a file

  • --sink-config-file

Please note that while this parameter is not required by the Pulsar CLI, most sinks require a set of configuration to run. Please reference this page for configurations for each sink

  • --tenant

    • The tenant that the sink should be under. If this parameter is not specified, the default tenant, "public", will be used.

  • --namespace

    • The namespace that the sink should be under. If this parameter is not specified, the default namespace, "default", will be used

Creating a Pulsar sink using the Pulsar admin CLI is of the following format

pulsar-admin sink create \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME> \
--sink-type <SINK_TYPE> \
--inputs <INPUTS> \
--sink-config-file <SINK_CONFIG_FILE>

The following is an example of creating the data-generatorsink that print out data produced by the data-generator source

pulsar-admin sink create \
--tenant test_tenant \
--namespace test_namespace \
--name generator-print \
--sink-type data-generator \
--inputs generator_test

Below is an example of creating a sink with a config file. The aws-s3 sink will be used as an example.

The following is an example of the sink config file that will be submitted with the aws-s3 sink\

s3_configs.yaml
configs:
region : "US_WEST_2"
bucketName : "my-bucket"
keyPrefix : "my-key-pre"
rollInterval : "5 min"
rollSize : "500000"
awsAccessKeyId: <AWS_ACCESS_KEY_ID>
awsSecretAccessKey: <AWS_SECRET_ACCESS_KEY>

Above, secrets/sensitive information are passed via plaintext. If you wish to pass sensitive information more securely, please create secrets within the Streamlio Cloud and reference them in the config file as follows:

s3_configs.yaml
configs:
region : "US_WEST_2"
bucketName : "my-bucket"
keyPrefix : "my-key-pre"
rollInterval : "5 min"
rollSize : "500000"
secrets:
awsAccessKeyId:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>
awsSecretAccessKey:
path: secret/<USER>/<PATH_TO_SCECRET>
key: <KEY>

The sink can then be submitted via the following command

pulsar-admin sink create \
--tenant test_tenant \
--namespace test_namespace \
--name s3-sink \
--sink-type aws-s3 \
--inputs my-topic \
--sink-config-file s3_configs.yaml

Updating a Sink

Once a Pulsar Sink is submitted, it can be updated at anytime. Most of the parameters/configuration are update to be updated. The following is an example demonstrates how to change the parallelism of a sink.

pulsar-admin sink update \
--tenant test_tenant \
--namespace test_namespace \
--name test-sink \
--parallelism 2

Deleting a Sink

The format of the command to delete a sink is the following

pulsar-admin sink delete \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin sink delete \
--tenant test_tenant \
--namespace test_namespace \
--name test-sink

Get Sink info

The format of the command to get the metadata of a source is following

pulsar-admin sink get \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin sink get \
--tenant test_tenant \
--namespace test_namespace \
--name test-sink

An example of the data returned

{
"tenant": "public",
"namespace": "default",
"name": "test_sink",
"className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
"inputSpecs": {
"output": {
"isRegexPattern": false
}
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true,
"archive": "builtin://data-generator"
}

Get Sink status

The format of the command to get the status of a source is following

pulsar-admin sink status \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example

pulsar-admin sink status \
--tenant test_tenant \
--namespace test_namespace \
--name test-sink

An example of the data returned

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 147,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 147,
"lastReceivedTime" : 1557900296150,
"workerId" : "c-standalone-fw-MacBook-Pro.local-8080"
}
} ]
}

_____ Copyright 2019 Streamlio, Inc. Apache, Apache BookKeeper, Apache Pulsar and associated open source project names are trademarks of the Apache Software Foundation.