Go C#

Cluster Pub-Sub

Proto.Actor has a feature that enables you to broadcast messages from a publisher to a number of subscribers. On the publisher side, the message is published to a “topic”, which is identified by its unique name. On the subscriber side, you subscribe to a topic by its name. Proto.Actor routes the published message to all topic subscribers, sending a copy of the message to each one.

flowchart LR
    Publisher1(Publisher <br /> device-1-building-123)
    Publisher2(Publisher <br /> device-2-building-123)
    Topic(Topic <br /> building-123)
    Subscriber1(Subscriber <br /> building-123-analytics)
    Subscriber2(Subscriber <br /> building-123-alerts)
    Subscriber3(Subscriber <br /> building-123-aggregations)

    class Topic green
    class Publisher1 yellow
    class Publisher2 yellow
    class Subscriber1 blue
    class Subscriber2 blue
    class Subscriber3 blue

    Publisher1 & Publisher2 --> Topic
    Topic --> Subscriber1 & Subscriber2 & Subscriber3

Both the publishers and the subscribers can be distributed across the cluster. This way it is possible to leverage the full power of virtual actors while while ensuring the communication is loosely coupled.

Publishing a message

The IPublisher interface can be used to publish a single message or a batch of messages.

var publisher = context.Cluster().Publisher();

// publish single message
await publisher.Publish("my-topic", new ChatMessage { Message = "hello" });

// publish a batch in one go
await publisher.PublishBatch("my-topic", ChatTopic, new ChatMessage[]
    {
        new() {Message = "hello"},
        new() {Message = "world"}
    });

The task returned by Publish will complete when the message is delivered and processed by all the topic subscribers. If you are interested in a “fire and forget” scenario, do not await the task, but be aware that this may lead to unbounded queueing in memory.

Subscribing to a topic

There are different options on how to subscribe to a topic, depending on whether you are doing this from an actor or from outside code. The simplest subscription could look like this:

var pid = await cluster.Subscribe("my-topic", context => {
        if (context.Message is ChatMessage)
        {
            // process
        }

        return Task.CompletedTask;
    }
);

This code will spawn a new actor that subscribes to specified topic. Alternatively, you can subscribe an existing actor, that will get the published messages in its Receive handler:

await cluster.Subscribe("my-topic", actorPID);

To subscribe from a code generated grain, you can use the variant of Subscribe that accepts cluster identity.

public class User : UserActorBase
{
    private const string ChatTopic = "chat";
    
    public User(IContext context) : base(context) { }

    public override Task OnStarted()
        => Context.Cluster().Subscribe("my-topic", Context.ClusterIdentity()!);

    public override Task OnStopping()
        => Context.Cluster().Unsubscribe("my-topic", Context.ClusterIdentity()!);

    // there is no code generated method to receive the published messages, 
    // so we have to use the generic `Receive` handler
    public override Task OnReceive()
    {
        if (Context.Message is ChatMessage msg)
            Console.WriteLine($"Received '{msg.Message}'");

        return Task.CompletedTask;
    }
}

Unsubscribing from a topic

Use a proper Unsubscribe overload to cancel topic subscription.

// with regular actor
cluster.Unsubscribe("my-topic", actorPID);

// with virtual ator
cluster.Unsubscribe("my-topic", ClusterIdentity.Create("my-id", "my-kind"));

What if I forget to unsubscribe?

If you stop the subscriber without first unsubscribing from the topic, two different situations may occur depending on whether virtual or regular actors are used:

  • When using virtal actors, they are assumed to always exist. If there is no activation available currently, the framework will spawn one when a message arrives to this actor. So a message published to the topic will activate the virtual actor again.
  • Regular actors have an explicit lifecycle. If you stop such actor, all messages directed to it will end up in the deadletter process. When this situation is detected, the subscriber will get automatically unsubscribed. It is still recommended to unsubscribe explicitly.

Persistent subscriptions

Subscriptions are collected in the TopicActor. By default they are not persisted. If this actor needs to restart, the subscriptions are lost.

You can provide an implementation of IKeyValueStore<Subscribers> to persist the subscriptions externally, e.g. in Redis.

var clusterConfig = ClusterConfig
    .Setup("MyCluster", 
        new TestProvider(
            new TestProviderOptions(), 
            new InMemAgent()), 
            new PartitionIdentityLookup())
    .WithClusterKind(
        TopicActor.Kind, 
        Props.FromProducer(() => new TopicActor(new MyRedisKVStore())));

This topic kind registration overrides the default one in the cluster.

BatchingProducer

The delivery of the messages can be optimized by sending them in batches. BatchingProducer will help you create message batches in scenarios, where you have multiple publishers publishing to single topic. The batching producer will collect messages from different sources, group them in batches, and send to subscribers.

flowchart LR
    Publisher1
    Publisher2
    Topic
    Subscriber1
    Subscriber2
    Subscriber3

    class Topic green
    class Publisher1 yellow
    class Publisher2 yellow
    class Subscriber1 blue
    class Subscriber2 blue
    class Subscriber3 blue

    subgraph BatchingProducer[Batching producer]
        direction RL
        Queue(Queue)
        InternalLoop(Internal Loop)

        class Queue green
        class InternalLoop green

        InternalLoop --> Queue
    end

    Publisher1 & Publisher2 --> BatchingProducer
    BatchingProducer ==> Topic
    Topic --> Subscriber1 & Subscriber2 & Subscriber3

Note: There is no delay waiting for the producer queue to reach certain threshold. All messages are published as soon as possible, with each iteration of the loop taking min 1, max MaxBatchSize messages from the queue.

await using var producer = cluster.BatchingProducer("my-topic");
var t1 = producer.ProduceAsync(new ChatMessage { Message = "Hello" });
var t2 = producer.ProduceAsync(new ChatMessage { Message = "Hi" });

await Task.WhenAll(t1, t2);

Notice how the produce tasks are not awaited in a sequence, but rather all at once. This is the pattern you should follow when using the BatchingProducer. The tasks will complete when the message is actually processed by all the subscribers, not when added to the internal producer queue.

Note: you can also decide to not await the task if you are interested in the “fire and forget” scenario.

Cancel a message

You can pass a cancellation token to the ProduceAsync method. It gives you the possibility to cancel the message. Cancellation can only happen as long as the message is in the internal queue of the producer. When actual publishing starts, the cancellation token is ignored.

var cts = new CancellationTokenSource();
var t1 = producer.ProduceAsync(new ChatMessage { Message = "Hello" }, cts.Token);
cts.Cancel();

// awaiting t1 will throw OperationCancelledException if the message was not yet published

Retry policy

By default, the BatchingProducer will fail and stop on publishing error. You can override this behavior by providing implementation of the PublishingErrorHandler in the producer config.

public delegate Task<PublishingErrorDecision> PublishingErrorHandler(
    int retries, 
    Exception e, 
    PubSubBatch batch
);

You can return following decisions from the handler:

  • FailBatchAndStop - the default policy, marks all pending tasks as failed and stops the producer
  • FailBatchAndContinue - fails current batch, and continues with the next one
  • RetryBatchAfter(TimeSpan delay) - retry publishing the batch after a delay
  • RetryBatchImmediately - immediately retries publishing

If the decision is to retry, the message batch will be sent to all the subscribers again, event if previously it was processed correctly by some of them.

Under the hood

The pub sub functionality is implemented using two actors: the TopicActor and the PubSubMemberDeliveryActor. The TopicActor knows about all the subscribers. It is a virtual actor, so there is no need to explicitly instantiate it. The delivery of messages is optimized by TopicActor, which sends only single over-the-network message to each of the members, that contain subscribers. Then PubSubMemberDeliveryActor (running on each member) distributes the messages locally within the member, to each of the subscribers.

flowchart LR
    Publisher
    TopicActor
    Subscriber1
    Subscriber2


    class Publisher yellow
    class Subscriber1 blue
    class Subscriber2 blue

    subgraph TopicActorMember[Member with topic actor]
        direction TB
        TopicActor(TopicActor <br /> contains subscriber list)

        class TopicActor green


    end

    subgraph SubscriberMember1[Member with subscriber 1]
        direction TB
        PubSubDeliveryActor1(PubSubDeliveryActor)

        class PubSubDeliveryActor1 green

        PubSubDeliveryActor1 --deliver--> Subscriber1
    end

    subgraph SubscriberMember2[Member with subscriber 2]
        direction TB
        PubSubDeliveryActor2(PubSubDeliveryActor)

        class PubSubDeliveryActor2 green

        PubSubDeliveryActor2 --deliver--> Subscriber2
    end

    subgraph PublisherMember[Member with publisher]

        Publisher(Topic Publisher) --publish--> TopicActor
        
    end

    TopicActor --deliver--> PubSubDeliveryActor1
    TopicActor --deliver--> PubSubDeliveryActor2


    Subscriber1 -.subscribe.-> TopicActor
    Subscriber2 -.subscribe.-> TopicActor
Icon