PubSub Chat Demo
This example demonstrates how to create a chat application using libp2p’s PubSub implementation with the GossipSub protocol.
$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ pubsub-demo
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-06 23:59:17,472 - pubsub-demo - INFO - Using random available port: 33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Node started with peer ID: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub ready.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Waiting for peers...
Type messages to send (press Enter to send):
Copy the line that starts with pubsub-demo -d, open a new terminal and paste it in:
$ pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-07 00:00:59,845 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Using random available port: 51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Node started with peer ID: QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Pubsub ready.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Connecting to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7 using protocols: MultiAddrKeys(<Multiaddr /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7>)
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/51977/p2p/QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,881 - pubsub-demo - INFO - Connected to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
Type messages to send (press Enter to send):
You can then start typing messages in either terminal and see them relayed to the other terminal. The messages will be distributed using the GossipSub protocol to all peers subscribed to the same topic. To exit the demo, type “quit” or send a keyboard interrupt (Ctrl+C) in either terminal.
Command Line Options
-t, --topic: Specify the topic name to subscribe to (default: “pubsub-chat”)-d, --destination: Address of peer to connect to-p, --port: Port to listen on (default: random available port)-v, --verbose: Enable debug logging
The full source code for this example is below:
1import argparse
2import logging
3
4import multiaddr
5import trio
6
7from libp2p import (
8 new_host,
9)
10from libp2p.crypto.rsa import (
11 create_new_key_pair,
12)
13from libp2p.custom_types import (
14 TProtocol,
15)
16from libp2p.peer.peerinfo import (
17 info_from_p2p_addr,
18)
19from libp2p.pubsub.gossipsub import (
20 GossipSub,
21)
22from libp2p.pubsub.pubsub import (
23 Pubsub,
24)
25from libp2p.stream_muxer.mplex.mplex import (
26 MPLEX_PROTOCOL_ID,
27 Mplex,
28)
29from libp2p.tools.async_service.trio_service import (
30 background_trio_service,
31)
32from libp2p.utils.address_validation import (
33 find_free_port,
34)
35
36# Configure logging
37logging.basicConfig(
38 level=logging.INFO, # Set default to DEBUG for more verbose output
39 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
40)
41logger = logging.getLogger("pubsub-demo")
42CHAT_TOPIC = "pubsub-chat"
43GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
44
45# Generate a key pair for the node
46key_pair = create_new_key_pair()
47
48
49async def receive_loop(subscription, termination_event):
50 logger.debug("Starting receive loop")
51 while not termination_event.is_set():
52 try:
53 message = await subscription.get()
54 from libp2p.peer.id import ID
55
56 logger.info(f"From peer: {ID(message.from_id).to_base58()}")
57 print(f"Received message: {message.data.decode('utf-8')}")
58 except Exception:
59 logger.exception("Error in receive loop")
60 await trio.sleep(1)
61
62
63async def publish_loop(pubsub, topic, termination_event):
64 """Continuously read input from user and publish to the topic."""
65 logger.debug("Starting publish loop...")
66 print("Type messages to send (press Enter to send):")
67 while not termination_event.is_set():
68 try:
69 # Use trio's run_sync_in_worker_thread to avoid blocking the event loop
70 message = await trio.to_thread.run_sync(input)
71 if message.lower() == "quit":
72 termination_event.set() # Signal termination
73 break
74 if message:
75 logger.debug(f"Publishing message: {message}")
76 await pubsub.publish(topic, message.encode())
77 print(f"Published: {message}")
78 except Exception:
79 logger.exception("Error in publish loop")
80 await trio.sleep(1) # Avoid tight loop on error
81
82
83async def monitor_peer_topics(pubsub, nursery, termination_event):
84 """
85 Monitor for new topics that peers are subscribed to and
86 automatically subscribe the server to those topics.
87 """
88 # Keep track of topics we've already subscribed to
89 subscribed_topics = set()
90
91 while not termination_event.is_set():
92 # Check for new topics in peer_topics
93 for topic in pubsub.peer_topics.keys():
94 if topic not in subscribed_topics:
95 logger.info(f"Auto-subscribing to new topic: {topic}")
96 subscription = await pubsub.subscribe(topic)
97 subscribed_topics.add(topic)
98 # Start a receive loop for this topic
99 nursery.start_soon(receive_loop, subscription, termination_event)
100
101 # Check every 2 seconds for new topics
102 await trio.sleep(2)
103
104
105async def run(topic: str, destination: str | None, port: int | None) -> None:
106 from libp2p.utils.address_validation import (
107 get_available_interfaces,
108 get_optimal_binding_address,
109 )
110
111 if port is None or port == 0:
112 port = find_free_port()
113 logger.info(f"Using random available port: {port}")
114
115 listen_addrs = get_available_interfaces(port)
116
117 # Create a new libp2p host
118 host = new_host(
119 key_pair=key_pair,
120 muxer_opt={MPLEX_PROTOCOL_ID: Mplex},
121 )
122 # Log available protocols
123 logger.debug(f"Host ID: {host.get_id()}")
124 logger.debug(
125 f"Host multiselect protocols: "
126 f"{host.get_mux().get_protocols() if hasattr(host, 'get_mux') else 'N/A'}"
127 )
128 # Create and start gossipsub with optimized parameters for testing
129 gossipsub = GossipSub(
130 protocols=[GOSSIPSUB_PROTOCOL_ID],
131 degree=3, # Number of peers to maintain in mesh
132 degree_low=2, # Lower bound for mesh peers
133 degree_high=4, # Upper bound for mesh peers
134 direct_peers=None, # Direct peers
135 time_to_live=60, # TTL for message cache in seconds
136 gossip_window=2, # Smaller window for faster gossip
137 gossip_history=5, # Keep more history
138 heartbeat_initial_delay=2.0, # Start heartbeats sooner
139 heartbeat_interval=5, # More frequent heartbeats for testing
140 )
141
142 pubsub = Pubsub(host, gossipsub)
143 termination_event = trio.Event() # Event to signal termination
144 async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
145 # Start the peer-store cleanup task
146 nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
147
148 logger.info(f"Node started with peer ID: {host.get_id()}")
149 logger.info("Initializing PubSub and GossipSub...")
150 async with background_trio_service(pubsub):
151 async with background_trio_service(gossipsub):
152 logger.info("Pubsub and GossipSub services started.")
153 await pubsub.wait_until_ready()
154 logger.info("Pubsub ready.")
155
156 # Subscribe to the topic
157 subscription = await pubsub.subscribe(topic)
158 logger.info(f"Subscribed to topic: {topic}")
159
160 if not destination:
161 # Server mode
162 # Get all available addresses with peer ID
163 all_addrs = host.get_addrs()
164
165 logger.info("Listener ready, listening on:")
166 for addr in all_addrs:
167 logger.info(f"{addr}")
168
169 # Use optimal address for the client command
170 optimal_addr = get_optimal_binding_address(port)
171 optimal_addr_with_peer = (
172 f"{optimal_addr}/p2p/{host.get_id().to_string()}"
173 )
174 logger.info(
175 f"\nRun this from the same folder in another console:\n\n"
176 f"pubsub-demo -d {optimal_addr_with_peer}\n"
177 )
178 logger.info("Waiting for peers...")
179
180 # Start topic monitoring to auto-subscribe to client topics
181 nursery.start_soon(
182 monitor_peer_topics, pubsub, nursery, termination_event
183 )
184
185 # Start message publish and receive loops
186 nursery.start_soon(receive_loop, subscription, termination_event)
187 nursery.start_soon(publish_loop, pubsub, topic, termination_event)
188 else:
189 # Client mode
190 maddr = multiaddr.Multiaddr(destination)
191 protocols_in_maddr = maddr.protocols()
192 info = info_from_p2p_addr(maddr)
193 logger.debug(f"Multiaddr protocols: {protocols_in_maddr}")
194 logger.info(
195 f"Connecting to peer: {info.peer_id} "
196 f"using protocols: {protocols_in_maddr}"
197 )
198 try:
199 await host.connect(info)
200 logger.info(f"Connected to peer: {info.peer_id}")
201 if logger.isEnabledFor(logging.DEBUG):
202 await trio.sleep(1)
203 logger.debug(
204 f"After connection, pubsub.peers: {pubsub.peers}"
205 )
206 peer_protocols = [
207 gossipsub.peer_protocol.get(p)
208 for p in pubsub.peers.keys()
209 ]
210 logger.debug(f"Peer protocols: {peer_protocols}")
211
212 # Start the loops
213 nursery.start_soon(
214 receive_loop, subscription, termination_event
215 )
216 nursery.start_soon(
217 publish_loop, pubsub, topic, termination_event
218 )
219 except Exception:
220 logger.exception(f"Failed to connect to peer: {info.peer_id}")
221 return
222
223 await termination_event.wait() # Wait for termination signal
224
225 # Ensure all tasks are completed before exiting
226 nursery.cancel_scope.cancel()
227
228 print("Application shutdown complete") # Print shutdown message
229
230
231def main() -> None:
232 description = """
233 This program demonstrates a pubsub p2p chat application using libp2p with
234 the gossipsub protocol as the pubsub router.
235 To use it, first run 'python pubsub.py -p <PORT> -t <TOPIC>',
236 where <PORT> is the port number,
237 and <TOPIC> is the name of the topic you want to subscribe to.
238 Then, run another instance with 'python pubsub.py -p <ANOTHER_PORT> -t <TOPIC>
239 -d <DESTINATION>', where <DESTINATION> is the multiaddress of the previous
240 listener host. Messages typed in either terminal will be received by all peers
241 subscribed to the same topic.
242 """
243
244 parser = argparse.ArgumentParser(description=description)
245 parser.add_argument(
246 "-t",
247 "--topic",
248 type=str,
249 help="topic name to subscribe",
250 default=CHAT_TOPIC,
251 )
252
253 parser.add_argument(
254 "-d",
255 "--destination",
256 type=str,
257 help="Address of peer to connect to",
258 default=None,
259 )
260
261 parser.add_argument(
262 "-p",
263 "--port",
264 type=int,
265 help="Port to listen on",
266 default=None,
267 )
268
269 parser.add_argument(
270 "-v",
271 "--verbose",
272 action="store_true",
273 help="Enable debug logging",
274 )
275
276 args = parser.parse_args()
277
278 # Set debug level if verbose flag is provided
279 if args.verbose:
280 logger.setLevel(logging.DEBUG)
281 logger.debug("Debug logging enabled")
282
283 logger.info("Running pubsub chat example...")
284 logger.info(f"Your selected topic is: {args.topic}")
285
286 try:
287 trio.run(run, *(args.topic, args.destination, args.port))
288 except KeyboardInterrupt:
289 logger.info("Application terminated by user")
290
291
292if __name__ == "__main__":
293 main()