The publish/subscribe (pub/sub) model is a messaging pattern in which publishers send messages to a message broker, and subscribers express interest in receiving certain messages. The message broker is responsible for delivering the messages to the subscribed clients.
Publishers provide messages to a message broker using the publish/subscribe (pub/sub) model, and subscribers indicate their interest in receiving particular messages. Delivering the messages to the clients who have subscribed is the responsibility of the message broker.
Publishers and subscribers are separated in the pub/sub paradigm, so they can communicate without being aware of one another’s existence. Because publishers and subscribers can be added or removed without affecting others, this enables a more flexible and scalable system.
Message queues, event buses, and topic-based publish/subscribe systems are just a few of the technologies that can be used to construct pub/sub systems. They are frequently employed in event-driven, distributed, and microservices designs.
In this tutorial, we are going to learn by example, how to implement pub/sub in Python.
Topics in Pub/Sub :
In a publish/subscribe system, topics are used to categorize messages and allow subscribers to express interest in specific types of messages.
In a topic-based publish/subscribe system, publishers send messages to specific topics, and subscribers express interest in one or more topics. The message broker is responsible for delivering the messages to the subscribed clients.
For example, In a news publishing system, there may be topics for different categories of news, such as politics, sports, and entertainment. Subscribers can express interest in receiving messages on specific topics, such as only sports and entertainment. When a publisher sends a message on the sports topic, it will be delivered to all subscribers interested in that topic.
Using topics allows for more fine-grained control over the messages that are delivered to subscribers, as they can choose to receive only the messages that are relevant to them. It also allows for a more scalable system, as publishers and subscribers do not need to be aware of each other’s specific identities.
Events in pub/sub :
Events are messages that are published by a publisher and delivered to subscribed clients. An event is typically a notification of something that has happened or is about to happen. It can contain data about the event, such as the event type, a description, and any relevant details. In an event-driven architecture, events are used to trigger actions or behaviours in other parts of the system. For example, an event might be used to notify a service that a new user has signed up, or that a payment has been processed. The service can then respond to the event by performing some action, such as sending a welcome email or updating a database. Using events allows for a more flexible and scalable system, as publishers and subscribers do not need to be directly coupled and can communicate asynchronously. It also allows for a more decoupled and modular system, as components can respond to events without the need to be aware of the specific identities of the publishers.
Event Bus :
An event bus is a messaging system that allows publishers to send events to subscribers. It is a type of publish/subscribe system that is commonly used in event-driven architectures.
In an event bus, events are published to a central message broker, which is responsible for delivering the events to the subscribed clients. Subscribers express interest in receiving events by registering with the event bus and specifying a list of events that they want to receive.
Examples :
Example1:
Basic publish/subscribe model implemented using Python’s built-in queue module:
Here the Publisher class has a message queue and a list of subscribers. When a message is published using the publishing method, it is added to the queue and delivered to all subscribed clients by calling their receive method. The Subscriber class has a receive method that simply prints the received message.
Python3
import queue class Publisher: def __init__( self ): self .message_queue = queue.Queue() self .subscribers = [] def subscribe( self , subscriber): self .subscribers.append(subscriber) def publish( self , message): self .message_queue.put(message) for subscriber in self .subscribers: subscriber.receive(message) class Subscriber: def __init__( self , name): self .name = name def receive( self , message): print (f "{self.name}" + "received message:" + f "{message}" ) publisher = Publisher() subscriber_1 = Subscriber( "Subscriber 1" ) subscriber_2 = Subscriber( "Subscriber 2" ) publisher.subscribe(subscriber_1) publisher.subscribe(subscriber_2) publisher.publish( "Hello World" ) |
Output:
Example2:
Publish/Subscribe model implemented using Python’s threading module :
In this example, the Publisher class has a dictionary of subscribers, where the keys are topics and the values are lists of subscribers. The subscribe method adds a subscriber to the list for the specified topic. The publish method checks if there are any subscribers for the specified topic and, if there are, sets the event and stores the message for each subscriber. The Subscriber class and receive method are the same as in the previous example.
Python3
import threading class Publisher: def __init__( self ): self .subscribers = {} def subscribe( self , subscriber, topic): if topic not in self .subscribers: self .subscribers[topic] = [] self .subscribers[topic].append(subscriber) def publish( self , message, topic): if topic in self .subscribers: for subscriber in self .subscribers[topic]: subscriber.event. set () subscriber.message = message class Subscriber: def __init__( self , name): self .name = name self .event = threading.Event() self .message = None def receive( self ): self .event.wait() print (f "{self.name}" + "received message:" f{ self .message}") self .event.clear() publisher = Publisher() subscriber_1 = Subscriber( "Subscriber 1" ) subscriber_2 = Subscriber( "Subscriber 2" ) subscriber_3 = Subscriber( "Subscriber 3" ) publisher.subscribe(subscriber_1, "sports" ) publisher.subscribe(subscriber_2, "entertainment" ) publisher.subscribe(subscriber_3, "sports" ) publisher.publish( "Soccer match result" , "sports" ) subscriber_1.receive() |
Output: