Go client

Obtaining the Go client library

The Pulsar Go client library is based on the C++ library implementation. In order to use it, you will need to have libpulsar installed in your system.

You can follow the instructions on how to install libpulsar from the C++ Client page

Since the Go library is The version number of the Go client must match the version number of the Pulsar C++ client library.

You can use any dependency tool for Go to fetch the dependency. For example, with Dep you can fetch a specific version of the Pulsar Go client library with:

dep ensure -add github.com/apache/pulsar/pulsar-client-go/pulsar@v2.3.1

Similarly, you can fetch the dependency with Go Mod tool.

Reference documentation

You can find more in-depth reference for the Pulsar Go client library at https://pulsar.apache.org/docs/en/client-libraries-go/ and the API reference is available at https://godoc.org/github.com/apache/pulsar/pulsar-client-go/pulsar.

Producer Example

producer.go
import ( "context"
"log"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar+ssl://prod.my-company.cloud.streamlio.com:6651",
Authentication: pulsar.NewAuthenticationToken("s.JzdWIiOiJKb2UifQ"),
})
if err != nil {
log.Fatalf("could not instantiate Pulsar client: %v", err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-tenant/my-namespace/my-topic",
})
if err != nil {
log.Fatalf("could not create producer: %v", err)
}
defer producer.Close()
err = producer.Send(context.Background(), pulsar.ProducerMessage{
Payload: []byte("Hello"),
})
if err != nil {
log.Fatalf("failed to publish message: %v", err)
}
}

Consumer Example

consumer.go
import (
"log"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar+ssl://prod.my-company.cloud.streamlio.com:6651",
Authentication: pulsar.NewAuthenticationToken("s.JzdWIiOiJKb2UifQ"),
})
if err != nil {
log.Fatalf("could not instantiate Pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-tenant/my-namespace/my-topic",
SubscriptionName: "my-subscription",
SubscriptionType: pulsar.Shared,
})
if err != nil {
log.Fatalf("could not create consumer: %v", err)
}
defer consumer.Close()
// Listen indefinitely on the topic
for {
msg, err := consumer.Receive(contex.Background())
if err != nil { log.Fatal(err) }
// Do something with the message
err = processMessage(msg)
if err == nil {
// Message processed successfully
consumer.Ack(msg)
} else {
// Failed to process messages
consumer.Nack(msg)
}
}
}

_____ Copyright 2019 Streamlio, Inc. Apache, Apache BookKeeper, Apache Pulsar and associated open source project names are trademarks of the Apache Software Foundation.