Common operations

Users routinely interact with Streamlio Cloud to get status about various topics in the system. This section describes a set of common operations that users can perform on the topics.

Topic Stats

Users can access the stats for a topic through the CLI tools. This will give insight on the current state of the topic:

pulsar-admin topics stats persistent://my-tenant/my-namespace/my-topic

The answer will look like:

{
"msgRateIn" : 999.2772588502835, // Topic publish rate (msg/s)
"msgThroughputIn" : 1048591.495435816, // Topic publish rate (bytes/s)
"msgRateOut" : 999.9622314969689, // Topic delivery rate (msg/s)
"msgThroughputOut" : 1049695.2695744513, // Topic delivery rate (bytes/s)
"averageMsgSize" : 1049.3499037917375, // Average message size (bytes)
"storageSize" : 101425, // Storage used by the topic (bytes)
// List of publishers currently connected
"publishers" : [ {
"msgRateIn" : 999.2772588502835, // Publish rate (msg/s)
"msgThroughputIn" : 1048591.495435816, // Publish rate (bytes/s)
"averageMsgSize" : 1049.0, // Average message size (bytes)
"producerId" : 0,
"metadata" : { }, // Eventual producer metadata set by application
"address" : "/127.0.0.1:59367", // TCP address of producer
"producerName" : "standalone-7-0", // Producer name
"connectedSince" : "2019-05-17T15:48:45.594-07:00", // Connected since
"clientVersion" : "2.3.1" // Pulsar client library version
} ],
// Map of subscriptions on the topic
"subscriptions" : {
// Subscription name
"my-subscription" : {
"msgRateOut" : 999.9622314969689, // Subscription delivery rate (msg/s)
"msgThroughputOut" : 1049695.2695744513, // Subscription delivery rate (bytes/s)
"msgRateRedeliver" : 0.0, // Subscription re-Delivery rate (msg/s)
"msgBacklog" : 38, // Number of messages that need to be processed
"blockedSubscriptionOnUnackedMsgs" : false, // Whether the subscription is
// blocked because of too many unacknowledged messages
"unackedMessages" : 0, // Number of unacknowledged messages (Messages pushed to
// consumers that were not yet acknowledged).
"type" : "Exclusive", // Subscription type (eg. Exclusive, Shared, Failover)
"activeConsumerName" : "9683d", // Name of the active consumer
"msgRateExpired" : 0.0, // Rate at which messages are being expired for TTL
// List of consumers attached to the subscription
"consumers" : [ {
"msgRateOut" : 999.9622314969689, // Consumer delivery rate (msg/s)
"msgThroughputOut" : 1049695.2695744513, // Consumer delivery rate (bytes/s)
"msgRateRedeliver" : 0.0, // Consumer re-Delivery rate (msg/s)
"consumerName" : "9683d", // Name of the consumer
"availablePermits" : 766, // Number of flow-control permits that Pulsar
// has currently from a consumer. When > 0, it
// means Pulsar can push more messages. When it's
// <= 0, the broker will pause the delivery to
// adjust to consumer processing speed
"unackedMessages" : 0, // Number of unacknowledged messages (Messages pushed to
// consumers that were not yet acknowledged).
"blockedConsumerOnUnackedMsgs" : false, // Whether the consumer is
// blocked because of too many unacknowledged messages
"metadata" : { }, // Eventual consumer metadata set by application
"address" : "/127.0.0.1:59388", // TCP address of consumer
"connectedSince" : "2019-05-17T15:49:01.838-07:00", // Connected since
"clientVersion" : "2.3.1" // Pulsar client library version
} ]
}
},
"replication" : { }, // When geo-replication is enabled, this section contains
// the stats of replication across clusters for the topic
"deduplicationStatus" : "Disabled" // Whether message de-duplication is enabled
}

Creating subscriptions

Normally, when time retention is not set, if a topic has no subscription created, any message publish on the topic will be dropped. A consumer connecting, will automatically create a subscription, with the given name.

Additionally, it is also possible to create subscriptions from the CLI:

pulsar-admin topics create-subscription \
persistent://my-tenant/my-namespace/my-topic \
--subscription my-subscription

Deleting subscriptions

In many cases it is useful to use the tools to delete subscriptions that are no longer needed, in order to avoid retaining the messages published on a particular topic.

pulsar-admin topics unsubscribe \
persistent://my-tenant/my-namespace/my-topic \
--subscription my-subscription

Skipping messages

If there is a problem with a particular message, that makes it impossible for consumers to skip it, it's possible to forcefully skipping it.

You can take a peek at the content for the oldest unacknowledged message:

pulsar-admin topics peek-messages \
persistent://my-tenant/my-namespace/my-topic \
--subscription my-subscription

And then skip it:

pulsar-admin topics skip persistent://my-tenant/my-namespace/my-topic \
--subscription my-subscription \
--count 1

Clearing backlog

You can drop completely the backlog on a given topic with:

pulsar-admin topics clear-backlog \
persistent://my-tenant/my-namespace/my-topic \
--subscription my-subscription

The same operation can also be done, for a given subscription, for all the topics present on a namespace:

pulsar-admin namespaces clear-backlog my-tenant/my-namespace \
-s my-subscription

_____ Copyright 2019 Streamlio, Inc. Apache, Apache BookKeeper, Apache Pulsar and associated open source project names are trademarks of the Apache Software Foundation.