Functions

Functions are used to describe computation in the Streamlio Cloud. This section describes some basic operations for managing and running functions.

What is a Pulsar Function?

Pulsar Functions are lightweight compute processes that

  • consume messages from one or more Pulsar topics,

  • apply a user-supplied processing logic to each message,

  • publish the results of the computation to another topic

For more information please visit: http://pulsar.apache.org/docs/en/functions-overview

Languages supported

  • Java

  • Python

Creating a function

The minimum set of required parameters needed to create a Pulsar Function with the Pulsar admin CLI is the following

  • --name The name of the function. The name must be unique across all functions submitted.

  • --output The Pulsar topic that the output of the function will be written to

  • --inputs A comma delimited list of Pulsar topics that will be read and delivered as inputs for the function

If submitting a Java function, the following additional parameters needs to be passed

  • --jar The path to the java JAR file that contains the Java function.

  • --classname The fully-qualified class name of the Java function

If submitting a Python function, the following additional parameters needs to be passed

  • --py The path to the Python file that contains the Python function.

  • --classname The fully-qualified class name of the Python function

Optionally, a list of function specific configs can be specified via a file

  • --tenant The tenant that the function should be under. If this parameter is not specified, the default tenant, public, will be used.

  • --namespace The namespace that the function should be under. If this parameter is not specified, the default namespace, default, will be used

The general format of the command to submit a function is the following

pulsar-admin functions create \
--jar <JAR_PATH> or --py <PY_PATH> \
--classname <FUNCTION_CLASSNAME> \
--inputs <INPUT_TOPIC> \
--output <OUTPUT_TOPIC \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

Below is an example of creating a Java function

pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/input \
--output persistent://public/default/output \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

Below is an example of creating a Python function

pulsar-admin functions create \
--py examples/python-examples/exclamation_function.py \
--classname exclamation_function.ExclamationFunction \
--inputs persistent://public/default/input \
--output persistent://public/default/output \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

Updating a function

Once a Pulsar Function is submitted, it can be updated at anytime. Most of the parameters in the configuration can be updated. The following is an example that demonstrates how to change the parallelism of a function

pulsar-admin functions update \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation \
--parallelism 2

Deleting a function

The format of the command to delete a function is as follows:

pulsar-admin functions delete \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example, to delete a function named exclamation do the following:

pulsar-admin functions delete \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

Get information about a function

The format of the command to get the metadata of a function is as follows:

pulsar-admin functions get \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example, to get a function named exclamation do the following:

pulsar-admin functions get \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

An example of the data returned

{
"tenant": "public",
"namespace": "default",
"name": "exclamation",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"inputSpecs": {
"persistent://public/default/exclamation-input": {
"isRegexPattern": false
}
},
"output": "persistent://public/default/exclamation-output",
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"userConfig": {},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"cleanupSubscription": true
}

Get status about a function

The format of the command to get the status of a function is following

pulsar-admin functions status \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example, to get the status of a function named exclamation do the following:

pulsar-admin functions status \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

An example of the data returned

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-MacBook-Pro.local-8080"
}
} ]
}

Get statistics about a function

The format of the command to get the status of a function is following

pulsar-admin functions stats \
--tenant <TENANT> \
--namespace <NAMESPACE> \
--name <NAME>

For example to get statistics about the function exclamation do the following:

pulsar-admin functions stats \
--tenant test_tenant \
--namespace test_namespace \
--name exclamation

An example of the data returned

{
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : null,
"instances" : [ {
"instanceId" : 0,
"metrics" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : null,
"userMetrics" : { }
}
} ]
}

_____ Copyright 2019 Streamlio, Inc. Apache, Apache BookKeeper, Apache Pulsar and associated open source project names are trademarks of the Apache Software Foundation.