Concepts

Before using Streamlio Cloud, a user needs to understand some basic concepts of messaging, storage and processing. This section describes the key concepts for using the Streamlio Cloud.

Messaging Concepts

The service provides the semantics of a persisted pub sub. In this pattern, producers publish messages to topics. Consumers can then subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete.

Messages

Messages are the basic "unit" of the streamlio service. They're what producers publish to topics and what consumers then consume from topics (and acknowledge when the message has been processed).

Common forms of compression and batching are also supported . They contain various fields including the payload data , a key, metadata like timestamps producer name, and optional property tags.

Producers

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing. It can send data either synchronously (sync) or asynchronously (async).

Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages. It can receive messaged either synchronously (sync) or asynchronously (async). Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.

Topics

In the streamlio service, topics are the named channels for transmitting messages from producers to consumers Topic names are URLs that have a well-defined structure:

{persistent|non-persistent}://tenant/namespace/topic

No need to explicitly create new topics

If a client attempts to write or receive messages to/from a topic that does not yet exist, it will automatically be created that under the namespace provided in the topic name.

Namespaces

A namespace is a logical nomenclature within a tenant. A tenant can within it hold multiple namespaces For instance, a tenant with different applications can have a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. The topic my-tenant/app1 is a namespace for the application app1 for my-tenant.

Subscription modes

A subscription is a named configuration rule that determines how messages are delivered to consumers. There are three available subscription modes : exclusive, shared, and failover. exclusive being the default subscription mode.

Exclusive

In exclusive mode, only a single consumer is allowed to attach to the subscription. If more than one consumer attempts to subscribe to a topic using the same subscription, the consumer receives an error.

Shared

In shared or round robin mode, multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.

Failover

In failover mode, multiple consumers can attach to the same subscription. The consumers will be lexically sorted by the consumer's name and the first consumer will initially be the only one receiving messages. This consumer is called the master consumer.

When the master consumer disconnects, all (non-acked and subsequent) messages will be delivered to the next consumer in line.

Multi-topic subscriptions

When a consumer subscribes to a topic, by default it subscribes to one specific topic, such as persistent://public/default/my-topic. Consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways:

  • On the basis of a regular expression (regex), for example persistent://public/default/finance-.*

  • By explicitly defining a list of topics

When subscribing to multiple topics by regex, all topics must be in the same namespace

When subscribing to multiple topics, the client will discover the topics that match the regex pattern/list and then subscribe to all of them. If any of the topics don't currently exist, the consumer will auto-subscribe to them once the topics are created.

No ordering guarantees are provided when subscribing to multiple topics

Here are some multi-topic subscription examples for Java:

import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient pulsarClient = // Instantiate Pulsar client object
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = pulsarClient.subscribe(allTopicsInNamespace, "subscription-1");
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer someTopicsConsumer = pulsarClient.subscribe(someTopicsInNamespace, "subscription-1");

Partitioned topics

Normal topics can be served only by a single broker, which limits the topic's maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.

Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.

The diagram below illustrates this:

Here, the topic Topic1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).

Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.

Decisions about routing and subscription modes can be made separately in most cases. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.

There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.

The number of partitions can be specified when creating the topic.

Routing modes

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition---that is, which internal topic---each message should be published to.

There are three MessageRoutingMode available:

Mode

Description

RoundRobinPartition

If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition. This is the default mode.

SinglePartition

If no key is provided, the producer will randomly pick one single partition and publish all the messages into that partition. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition.

CustomPartition

Use custom message router implementation that will be called to determine the partition for a particular message. User can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

Ordering guarantee

The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.

If there is a key attached to message, the messages will be routed to corresponding partitions based on the hashing scheme specified by HashingScheme in ProducerBuilder , when using either SinglePartition or RoundRobinPartition mode.

Ordering guarantee

Description

Routing Mode and Key

Per-key-partition

All the messages with the same key will be in order and be placed in same partition.

Use either SinglePartition or RoundRobinPartition mode, and Key is provided by each message.

Per-producer

All the messages from the same producer will be in order.

Use SinglePartition mode, and no Key is provided for each message.

Hashing scheme

HashingScheme is an enum that represent sets of standard hashing functions available when choosing the partition to use for a particular message.

There are 2 types of standard hashing functions available: JavaStringHash and Murmur3_32Hash. The default hashing function for producer is JavaStringHash. Please pay attention that JavaStringHashis not useful when producers can be from different multiple language clients, under this use case, it is recommended to use Murmur3_32Hash.

Non-persistent topics

By default, Streamlio persistently stores all unacknowledged messages on multiple BookKeeper bookies (storage nodes). Data for messages on persistent topics can thus survive broker restarts and subscriber failover.

Streamlio also, however, supports non-persistent topics, which are topics on which messages are never persisted to disk and live only in memory. When using non-persistent delivery, killing a Streamlio broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.

Non-persistent topics have names of this form (note the non-persistent in the name):

non-persistent://tenant/namespace/topic

Client API

Producers and consumers can connect to non-persistent topics in the same way as persistent topics, with the crucial difference that the topic name must start with non-persistent. All three subscription modes---exclusive, shared, and failover---are supported for non-persistent topics.

Here's an example Java consumer for a non-persistent topic:

PulsarClient client = PulsarClient.create("pulsar://localhost:6650");
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";
Consumer consumer = client.subscribe(npTopic, subscriptionName);

Here's an example Java producer for the same non-persistent topic:

Producer producer = client.createProducer(npTopic);

Message retention and expiry

By default, the Streamlio service

  • immediately delete all messages that have been acknowledged by a consumer, and

  • persistently store all unacknowledged messages in a message backlog.

This behavior however, can be overridden via these properties.

  • Message retention enables you to store messages that have been acknowledged by a consumer

  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry cookbook.

The diagram below illustrates both concepts:

Message retention and expiry

With message retention, shown at the top, a retention policy applied to all topics in a namespace dictates that some messages are durably stored in Streamlio even though they've already been acknowledged. Acknowledged messages that are not covered by the retention policy are deleted. Without a retention policy, all of the acknowledged messages would be deleted.

With message expiry, shown at the bottom, some messages are deleted, even though they haven't been acknowledged, because they've expired according to the TTL applied to the namespace (for example because a TTL of 5 minutes has been applied and the messages haven't been acknowledged but are 10 minutes old).

Message deduplication

Message duplication occurs when a message is persisted by Pulsar more than once. Message deduplication is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, even if the message is received more than once.

The following diagram illustrates what happens when message deduplication is disabled vs. enabled:

Pulsar message deduplication

Message deduplication is disabled in the scenario shown at the top. Here, a producer publishes message 1 on a topic; the message reaches a Pulsar broker and is persisted to BookKeeper. The producer then sends message 1 again (in this case due to some retry logic), and the message is received by the broker and stored in BookKeeper again, which means that duplication has occurred.

In the second scenario at the bottom, the producer publishes message 1, which is received by the broker and persisted, as in the first scenario. When the producer attempts to publish the message again, however, the broker knows that it has already seen message 1 and thus does not persist the message.

Message deduplication is handled at the namespace level. For more instructions, see the message deduplication cookbook.

Producer idempotency

The other available approach to message deduplication is to ensure that each message is only produced once. This approach is typically called producer idempotency. The drawback of this approach is that it defers the work of message deduplication to the application. In Pulsar, this is handled at the broker level, which means that you don't need to modify your Pulsar client code. Instead, you only need to make administrative changes (see the Managing message deduplication cookbook for a guide).

Deduplication and effectively-once semantics

Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide effectively-once processing semantics. Messaging systems that don't offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplication. With Pulsar, strict ordering guarantees come at no application-level cost.

Client library

Streamlio exposes a pulsar compatible client API with language bindings for Java, Go, Python and C++. The client libraries support transparent reconnection and/or connection failover to Service, queuing of messages until acknowledged by, and heuristics such as connection retries with backoff.

Reader interface

In Streamlio, the "standard" consumer interface involves using consumers to listen on topics, process incoming messages, and finally acknowledge those messages when they've been processed. Whenever a consumer connects to a topic, it automatically begins reading from the earliest un-acked message onward because the topic's cursor is automatically managed by the service.

The reader interface for Streamlio enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify which message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:

  • The earliest available message in the topic

  • The latest available message in the topic

  • Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.

The reader interface is helpful for use cases like using Streamlio to provide effectively-once processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Streamlio clients with the low-level abstraction necessary to "manually position" themselves within a topic.

Non-partitioned topics only

The reader interface for Pulsar cannot currently be used with partitioned topics.

Here's a Java example that begins reading from the earliest available message on a topic:

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
.topic("reader-api-test")
.startMessageId(MessageId.earliest)
.create();
while (true) {
Message message = reader.readNext();
// Process the message
}

To create a reader that will read from the latest available message:

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.latest)
.create();

To create a reader that will read from some message between earliest and latest:

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();

Multi Tenancy

Tenants and namespaces are two key concepts of Streamlio to support multi-tenancy. Tenants can be spread across clusters and can each have their own authentication and authorization scheme applied to them. They are also the administrative unit at which storage quotas, message TTL, and isolation policies can be managed.

A namespace is the administrative unit nomenclature within a tenant. The configuration policies set on a namespace apply to all the topics created in that namespace. A tenant may create multiple namespaces.

The multi-tenant nature of Streamlio is reflected mostly visibly in topic URLs, which have this structure:

persistent://tenant/namespace/topic

Names for topics in the same namespace will look like this:

persistent://tenant/namespace/topic-1
persistent://tenant/namespace/topic-2
persistent://tenant/namespace/topic-3

Topic Compaction

Pulsar was built with highly scalable persistent storage of message data as a primary objective. Pulsar topics enable you to persistently store as many unacknowledged messages as you need while preserving message ordering. By default, Pulsar stores all unacknowledged/unprocessed messages produced on a topic. Accumulating many unacknowledged messages on a topic is necessary for many Pulsar use cases but it can also be very time intensive for Pulsar consumers to "rewind" through the entire log of messages.

For a more practical guide to topic compaction, see the Topic compaction cookbook.

For some use cases consumers don't need a complete "image" of the topic log. They may only need a few values to construct a more "shallow" image of the log, perhaps even just the most recent value. For these kinds of use cases Pulsar offers topic compaction. When you run compaction on a topic, Pulsar goes through a topic's backlog and removes messages that are obscured by later messages, i.e. it goes through the topic on a per-key basis and leaves only the most recent message associated with that key.

Pulsar's topic compaction feature:

  • Allows for faster "rewind" through topic logs

  • Applies only to persistent topics

  • Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the Topic compaction cookbook

  • Is conceptually and operationally distinct from retention and expiry. Topic compaction does, however, respect retention. If retention has removed a message from the message backlog of a topic, the message will also not be readable from the compacted topic ledger.

Topic compaction example: the stock ticker

An example use case for a compacted Pulsar topic would be a stock ticker topic. On a stock ticker topic, each message bears a timestamped dollar value for stocks for purchase (with the message key holding the stock symbol, e.g. AAPL or GOOG). With a stock ticker you may care only about the most recent value(s) of the stock and have no interest in historical data (i.e. you don't need to construct a complete image of the topic's sequence of messages per key). Compaction would be highly beneficial in this case because it would keep consumers from needing to rewind through obscured messages.

Schema Registry

Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level lest a wide variety of potential problems arise (for example serialization and deserialization issues). Applications typically adopt one of two basic approaches to type safety in messaging:

  1. A "client-side" approach in which message producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. If a producer is sending temperature sensor data on the topic topic-1, consumers of that topic will run into trouble if they attempt to parse that data as, say, moisture sensor readings.

  2. A "server-side" approach in which producers and consumers inform the system which data types can be transmitted via the topic. With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced.

Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis.

  1. For the "client-side" approach, producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis.

  2. For the "server-side" approach, Pulsar has a built-in schema registry that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic.

Schemas are automatically uploaded when you create a typed Producer with a Schema. Additionally, Schemas can be manually uploaded to, fetched from, and updated via the REST API.

Streamlio schemas are applied and enforced at the topic level (schemas cannot be applied at the namespace or tenant level). Producers and consumers upload schemas to the Streamlio service.

Schema versions

Below is a sample on how to build a type safe client in java.

PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-data")
.sendTimeout(3, TimeUnit.SECONDS)
.create();

The table below lists the possible scenarios when this connection attempt occurs and what will happen in light of each scenario:

Scenario

What happens

No schema exists for the topic

The producer is created using the given schema. The schema is transmitted to the broker and stored (since no existing schema is "compatible" with the SensorReading schema). Any consumer created using the same schema/topic can consume messages from the sensor-data topic.

A schema already exists; the producer connects using the same schema that's already stored

The schema is transmitted to the Streamlio service. The server determines that the schema is compatibleThe serve attempts to store the schema in BookKeeper but then determines that it's already stored, so it's then used to tag produced messages.

A schema already exists; the producer connects using a new schema that is compatible

The producer transmits the schema to the broker. The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).

Schemas are versioned in succession. Schema storage happens in the broker that handles the associated topic so that version assignments can be made. Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.

Supported schema formats

The following formats are supported by the Streamlio schema registry:

  • None (raw bytes).

  • String (used for UTF-8-encoded strings)

  • JSON

  • Protobuf

  • Avro

_____ Copyright 2019 Streamlio, Inc. Apache, Apache BookKeeper, Apache Pulsar and associated open source project names are trademarks of the Apache Software Foundation.