PubSub Chat Demo

This example demonstrates how to create a chat application using libp2p’s PubSub implementation with the GossipSub protocol.

$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ pubsub-demo
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-06 23:59:17,471 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-06 23:59:17,472 - pubsub-demo - INFO - Using random available port: 33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Node started with peer ID: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/33269
2025-04-06 23:59:17,490 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Pubsub ready.
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-06 23:59:17,491 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7

2025-04-06 23:59:17,491 - pubsub-demo - INFO - Waiting for peers...
Type messages to send (press Enter to send):

Copy the line that starts with pubsub-demo -d, open a new terminal and paste it in:

$ pubsub-demo -d /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
2025-04-07 00:00:59,845 - pubsub-demo - INFO - Running pubsub chat example...
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Your selected topic is: pubsub-chat
2025-04-07 00:00:59,846 - pubsub-demo - INFO - Using random available port: 51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Node started with peer ID: QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Listening on: /ip4/127.0.0.1/tcp/51977
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Initializing PubSub and GossipSub...
2025-04-07 00:00:59,864 - pubsub-demo - INFO - Pubsub and GossipSub services started.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Pubsub ready.
2025-04-07 00:00:59,865 - pubsub-demo - INFO - Subscribed to topic: pubsub-chat
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Connecting to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7 using protocols: MultiAddrKeys(<Multiaddr /ip4/127.0.0.1/tcp/33269/p2p/QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7>)
2025-04-07 00:00:59,866 - pubsub-demo - INFO - Run this script in another console with:
pubsub-demo -d /ip4/127.0.0.1/tcp/51977/p2p/QmYQKCm95Ut1aXsjHmWVYqdaVbno1eKTYC8KbEVjqUaKaQ

2025-04-07 00:00:59,881 - pubsub-demo - INFO - Connected to peer: QmcJnocH1d1tz3Zp4MotVDjNfNFawXHw2dpB9tMYGTXJp7
Type messages to send (press Enter to send):

You can then start typing messages in either terminal and see them relayed to the other terminal. The messages will be distributed using the GossipSub protocol to all peers subscribed to the same topic. To exit the demo, type “quit” or send a keyboard interrupt (Ctrl+C) in either terminal.

Command Line Options

  • -t, --topic: Specify the topic name to subscribe to (default: “pubsub-chat”)

  • -d, --destination: Address of peer to connect to

  • -p, --port: Port to listen on (default: random available port)

  • -v, --verbose: Enable debug logging

The full source code for this example is below:

  1import argparse
  2import logging
  3
  4import multiaddr
  5import trio
  6
  7from libp2p import (
  8    new_host,
  9)
 10from libp2p.crypto.rsa import (
 11    create_new_key_pair,
 12)
 13from libp2p.custom_types import (
 14    TProtocol,
 15)
 16from libp2p.peer.peerinfo import (
 17    info_from_p2p_addr,
 18)
 19from libp2p.pubsub.gossipsub import (
 20    GossipSub,
 21)
 22from libp2p.pubsub.pubsub import (
 23    Pubsub,
 24)
 25from libp2p.stream_muxer.mplex.mplex import (
 26    MPLEX_PROTOCOL_ID,
 27    Mplex,
 28)
 29from libp2p.tools.async_service.trio_service import (
 30    background_trio_service,
 31)
 32from libp2p.utils.address_validation import (
 33    find_free_port,
 34)
 35
 36# Configure logging
 37logging.basicConfig(
 38    level=logging.INFO,  # Set default to DEBUG for more verbose output
 39    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 40)
 41logger = logging.getLogger("pubsub-demo")
 42CHAT_TOPIC = "pubsub-chat"
 43GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
 44
 45# Generate a key pair for the node
 46key_pair = create_new_key_pair()
 47
 48
 49async def receive_loop(subscription, termination_event):
 50    logger.debug("Starting receive loop")
 51    while not termination_event.is_set():
 52        try:
 53            message = await subscription.get()
 54            from libp2p.peer.id import ID
 55
 56            logger.info(f"From peer: {ID(message.from_id).to_base58()}")
 57            print(f"Received message: {message.data.decode('utf-8')}")
 58        except Exception:
 59            logger.exception("Error in receive loop")
 60            await trio.sleep(1)
 61
 62
 63async def publish_loop(pubsub, topic, termination_event):
 64    """Continuously read input from user and publish to the topic."""
 65    logger.debug("Starting publish loop...")
 66    print("Type messages to send (press Enter to send):")
 67    while not termination_event.is_set():
 68        try:
 69            # Use trio's run_sync_in_worker_thread to avoid blocking the event loop
 70            message = await trio.to_thread.run_sync(input)
 71            if message.lower() == "quit":
 72                termination_event.set()  # Signal termination
 73                break
 74            if message:
 75                logger.debug(f"Publishing message: {message}")
 76                await pubsub.publish(topic, message.encode())
 77                print(f"Published: {message}")
 78        except Exception:
 79            logger.exception("Error in publish loop")
 80            await trio.sleep(1)  # Avoid tight loop on error
 81
 82
 83async def monitor_peer_topics(pubsub, nursery, termination_event):
 84    """
 85    Monitor for new topics that peers are subscribed to and
 86    automatically subscribe the server to those topics.
 87    """
 88    # Keep track of topics we've already subscribed to
 89    subscribed_topics = set()
 90
 91    while not termination_event.is_set():
 92        # Check for new topics in peer_topics
 93        for topic in pubsub.peer_topics.keys():
 94            if topic not in subscribed_topics:
 95                logger.info(f"Auto-subscribing to new topic: {topic}")
 96                subscription = await pubsub.subscribe(topic)
 97                subscribed_topics.add(topic)
 98                # Start a receive loop for this topic
 99                nursery.start_soon(receive_loop, subscription, termination_event)
100
101        # Check every 2 seconds for new topics
102        await trio.sleep(2)
103
104
105async def run(topic: str, destination: str | None, port: int | None) -> None:
106    from libp2p.utils.address_validation import (
107        get_available_interfaces,
108        get_optimal_binding_address,
109    )
110
111    if port is None or port == 0:
112        port = find_free_port()
113        logger.info(f"Using random available port: {port}")
114
115    listen_addrs = get_available_interfaces(port)
116
117    # Create a new libp2p host
118    host = new_host(
119        key_pair=key_pair,
120        muxer_opt={MPLEX_PROTOCOL_ID: Mplex},
121    )
122    # Log available protocols
123    logger.debug(f"Host ID: {host.get_id()}")
124    logger.debug(
125        f"Host multiselect protocols: "
126        f"{host.get_mux().get_protocols() if hasattr(host, 'get_mux') else 'N/A'}"
127    )
128    # Create and start gossipsub with optimized parameters for testing
129    gossipsub = GossipSub(
130        protocols=[GOSSIPSUB_PROTOCOL_ID],
131        degree=3,  # Number of peers to maintain in mesh
132        degree_low=2,  # Lower bound for mesh peers
133        degree_high=4,  # Upper bound for mesh peers
134        direct_peers=None,  # Direct peers
135        time_to_live=60,  # TTL for message cache in seconds
136        gossip_window=2,  # Smaller window for faster gossip
137        gossip_history=5,  # Keep more history
138        heartbeat_initial_delay=2.0,  # Start heartbeats sooner
139        heartbeat_interval=5,  # More frequent heartbeats for testing
140    )
141
142    pubsub = Pubsub(host, gossipsub)
143    termination_event = trio.Event()  # Event to signal termination
144    async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
145        # Start the peer-store cleanup task
146        nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
147
148        logger.info(f"Node started with peer ID: {host.get_id()}")
149        logger.info("Initializing PubSub and GossipSub...")
150        async with background_trio_service(pubsub):
151            async with background_trio_service(gossipsub):
152                logger.info("Pubsub and GossipSub services started.")
153                await pubsub.wait_until_ready()
154                logger.info("Pubsub ready.")
155
156                # Subscribe to the topic
157                subscription = await pubsub.subscribe(topic)
158                logger.info(f"Subscribed to topic: {topic}")
159
160                if not destination:
161                    # Server mode
162                    # Get all available addresses with peer ID
163                    all_addrs = host.get_addrs()
164
165                    logger.info("Listener ready, listening on:")
166                    for addr in all_addrs:
167                        logger.info(f"{addr}")
168
169                    # Use optimal address for the client command
170                    optimal_addr = get_optimal_binding_address(port)
171                    optimal_addr_with_peer = (
172                        f"{optimal_addr}/p2p/{host.get_id().to_string()}"
173                    )
174                    logger.info(
175                        f"\nRun this from the same folder in another console:\n\n"
176                        f"pubsub-demo -d {optimal_addr_with_peer}\n"
177                    )
178                    logger.info("Waiting for peers...")
179
180                    # Start topic monitoring to auto-subscribe to client topics
181                    nursery.start_soon(
182                        monitor_peer_topics, pubsub, nursery, termination_event
183                    )
184
185                    # Start message publish and receive loops
186                    nursery.start_soon(receive_loop, subscription, termination_event)
187                    nursery.start_soon(publish_loop, pubsub, topic, termination_event)
188                else:
189                    # Client mode
190                    maddr = multiaddr.Multiaddr(destination)
191                    protocols_in_maddr = maddr.protocols()
192                    info = info_from_p2p_addr(maddr)
193                    logger.debug(f"Multiaddr protocols: {protocols_in_maddr}")
194                    logger.info(
195                        f"Connecting to peer: {info.peer_id} "
196                        f"using protocols: {protocols_in_maddr}"
197                    )
198                    try:
199                        await host.connect(info)
200                        logger.info(f"Connected to peer: {info.peer_id}")
201                        if logger.isEnabledFor(logging.DEBUG):
202                            await trio.sleep(1)
203                            logger.debug(
204                                f"After connection, pubsub.peers: {pubsub.peers}"
205                            )
206                            peer_protocols = [
207                                gossipsub.peer_protocol.get(p)
208                                for p in pubsub.peers.keys()
209                            ]
210                            logger.debug(f"Peer protocols: {peer_protocols}")
211
212                        # Start the loops
213                        nursery.start_soon(
214                            receive_loop, subscription, termination_event
215                        )
216                        nursery.start_soon(
217                            publish_loop, pubsub, topic, termination_event
218                        )
219                    except Exception:
220                        logger.exception(f"Failed to connect to peer: {info.peer_id}")
221                        return
222
223                await termination_event.wait()  # Wait for termination signal
224
225        # Ensure all tasks are completed before exiting
226        nursery.cancel_scope.cancel()
227
228    print("Application shutdown complete")  # Print shutdown message
229
230
231def main() -> None:
232    description = """
233    This program demonstrates a pubsub p2p chat application using libp2p with
234    the gossipsub protocol as the pubsub router.
235    To use it, first run 'python pubsub.py -p <PORT> -t <TOPIC>',
236    where <PORT> is the port number,
237    and <TOPIC> is the name of the topic you want to subscribe to.
238    Then, run another instance with 'python pubsub.py -p <ANOTHER_PORT> -t <TOPIC>
239    -d <DESTINATION>', where <DESTINATION> is the multiaddress of the previous
240    listener host. Messages typed in either terminal will be received by all peers
241    subscribed to the same topic.
242    """
243
244    parser = argparse.ArgumentParser(description=description)
245    parser.add_argument(
246        "-t",
247        "--topic",
248        type=str,
249        help="topic name to subscribe",
250        default=CHAT_TOPIC,
251    )
252
253    parser.add_argument(
254        "-d",
255        "--destination",
256        type=str,
257        help="Address of peer to connect to",
258        default=None,
259    )
260
261    parser.add_argument(
262        "-p",
263        "--port",
264        type=int,
265        help="Port to listen on",
266        default=None,
267    )
268
269    parser.add_argument(
270        "-v",
271        "--verbose",
272        action="store_true",
273        help="Enable debug logging",
274    )
275
276    args = parser.parse_args()
277
278    # Set debug level if verbose flag is provided
279    if args.verbose:
280        logger.setLevel(logging.DEBUG)
281        logger.debug("Debug logging enabled")
282
283    logger.info("Running pubsub chat example...")
284    logger.info(f"Your selected topic is: {args.topic}")
285
286    try:
287        trio.run(run, *(args.topic, args.destination, args.port))
288    except KeyboardInterrupt:
289        logger.info("Application terminated by user")
290
291
292if __name__ == "__main__":
293    main()