Identify Protocol Demo

This example demonstrates how to use the libp2p identify protocol.

$ python -m pip install libp2p
Collecting libp2p
...
Successfully installed libp2p-x.x.x
$ identify-demo
First host listening. Run this from another console:

identify-demo -p 8889 -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM

Waiting for incoming identify request...

Copy the line that starts with identify-demo -p 8889 ...., open a new terminal in the same folder and paste it in:

$ identify-demo -p 8889 -d /ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
dialer (host_b) listening on /ip4/127.0.0.1/tcp/8889
Second host connecting to peer: QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM
Starting identify protocol...
Identify response:
Public Key (Base64): CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDC6c/oNPP9X13NDQ3Xrlp3zOj+ErXIWb/A4JGwWchiDBwMhMslEX3ct8CqI0BqUYKuwdFjowqqopOJ3cS2MlqtGaiP6Dg9bvGqSDoD37BpNaRVNcebRxtB0nam9SQy3PYLbHAmz0vR4ToSiL9OLRORnGOxCtHBuR8ZZ5vS0JEni8eQMpNa7IuXwyStnuty/QjugOZudBNgYSr8+9gH722KTjput5IRL7BrpIdd4HNXGVRm4b9BjNowvHu404x3a/ifeNblpy/FbYyFJEW0looygKF7hpRHhRbRKIDZt2BqOfT1sFkbqsHE85oY859+VMzP61YELgvGwai2r7KcjkW/AgMBAAE=
Listen Addresses: ['/ip4/127.0.0.1/tcp/8888/p2p/QmUiN4R3fNrCoQugGgmmb3v35neMEjKFNrsbNGVDsRHWpM']
Protocols: ['/ipfs/id/1.0.0', '/ipfs/ping/1.0.0']
Observed Address: ['/ip4/127.0.0.1/tcp/38082']
Protocol Version: ipfs/0.1.0
Agent Version: py-libp2p/0.2.0

The full source code for this example is below:

  1import argparse
  2import base64
  3import logging
  4import sys
  5
  6import multiaddr
  7import trio
  8
  9from libp2p import (
 10    new_host,
 11)
 12from libp2p.identity.identify.identify import (
 13    ID as IDENTIFY_PROTOCOL_ID,
 14    identify_handler_for,
 15    parse_identify_response,
 16)
 17from libp2p.identity.identify.pb.identify_pb2 import Identify
 18from libp2p.peer.envelope import debug_dump_envelope, unmarshal_envelope
 19from libp2p.peer.peerinfo import (
 20    info_from_p2p_addr,
 21)
 22
 23# Logging: libp2p configures the ``libp2p`` hierarchy at import from ``LIBP2P_DEBUG``
 24# / ``LIBP2P_DEBUG_FILE``. Do not cap ``libp2p`` to WARNING here or module DEBUG
 25# (e.g. ``host.observed_addr_manager:DEBUG``) will never appear.
 26# For a quiet default without env vars, uncomment:
 27# logging.basicConfig(level=logging.WARNING)
 28# logging.getLogger("multiaddr").setLevel(logging.WARNING)
 29# logging.getLogger("libp2p").setLevel(logging.WARNING)
 30
 31logger = logging.getLogger("libp2p.identity.identify-example")
 32
 33
 34def decode_multiaddrs(raw_addrs):
 35    """Convert raw listen addresses into human-readable multiaddresses."""
 36    decoded_addrs = []
 37    for addr in raw_addrs:
 38        try:
 39            decoded_addrs.append(str(multiaddr.Multiaddr(addr)))
 40        except Exception as e:
 41            decoded_addrs.append(f"Invalid Multiaddr ({addr}): {e}")
 42    return decoded_addrs
 43
 44
 45def print_identify_response(identify_response: Identify):
 46    """Pretty-print Identify response."""
 47    public_key_b64 = base64.b64encode(identify_response.public_key).decode("utf-8")
 48    listen_addrs = decode_multiaddrs(identify_response.listen_addrs)
 49    signed_peer_record = unmarshal_envelope(identify_response.signedPeerRecord)
 50    try:
 51        observed_addr_decoded = decode_multiaddrs([identify_response.observed_addr])
 52    except Exception:
 53        observed_addr_decoded = identify_response.observed_addr
 54    print(
 55        f"Identify response:\n"
 56        f"  Public Key (Base64): {public_key_b64}\n"
 57        f"  Listen Addresses: {listen_addrs}\n"
 58        f"  Protocols: {list(identify_response.protocols)}\n"
 59        f"  Observed Address: "
 60        f"{observed_addr_decoded if identify_response.observed_addr else 'None'}\n"
 61        f"  Protocol Version: {identify_response.protocol_version}\n"
 62        f"  Agent Version: {identify_response.agent_version}"
 63    )
 64
 65    debug_dump_envelope(signed_peer_record)
 66
 67
 68async def run(port: int, destination: str, use_varint_format: bool = True) -> None:
 69    from libp2p.utils.address_validation import (
 70        get_available_interfaces,
 71        get_optimal_binding_address,
 72    )
 73
 74    if not destination:
 75        # Create first host (listener)
 76        if port <= 0:
 77            from libp2p.utils.address_validation import find_free_port
 78
 79            port = find_free_port()
 80
 81        listen_addrs = get_available_interfaces(port)
 82        host_a = new_host()
 83
 84        # Set up identify handler with specified format
 85        # Set use_varint_format = False, if want to checkout the Signed-PeerRecord
 86        identify_handler = identify_handler_for(
 87            host_a, use_varint_format=use_varint_format
 88        )
 89        host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, identify_handler)
 90
 91        async with (
 92            host_a.run(listen_addrs=listen_addrs),
 93            trio.open_nursery() as nursery,
 94        ):
 95            # Start the peer-store cleanup task
 96            nursery.start_soon(host_a.get_peerstore().start_cleanup_task, 60)
 97
 98            # Get all available addresses with peer ID
 99            all_addrs = host_a.get_addrs()
100
101            if use_varint_format:
102                format_name = "length-prefixed"
103                print(f"First host listening (using {format_name} format).")
104                print("Listener ready, listening on:\n")
105                for addr in all_addrs:
106                    print(f"{addr}")
107
108                # Use optimal address for the client command
109                optimal_addr = get_optimal_binding_address(port)
110                optimal_addr_with_peer = (
111                    f"{optimal_addr}/p2p/{host_a.get_id().to_string()}"
112                )
113                print(
114                    f"\nRun this from the same folder in another console:\n\n"
115                    f"identify-demo -d {optimal_addr_with_peer}\n"
116                )
117                print("Waiting for incoming identify request...")
118            else:
119                format_name = "raw protobuf"
120                print(f"First host listening (using {format_name} format).")
121                print("Listener ready, listening on:\n")
122                for addr in all_addrs:
123                    print(f"{addr}")
124
125                # Use optimal address for the client command
126                optimal_addr = get_optimal_binding_address(port)
127                optimal_addr_with_peer = (
128                    f"{optimal_addr}/p2p/{host_a.get_id().to_string()}"
129                )
130                print(
131                    f"\nRun this from the same folder in another console:\n\n"
132                    f"identify-demo -d {optimal_addr_with_peer}\n"
133                )
134                print("Waiting for incoming identify request...")
135
136            # Add a custom handler to show connection events
137            async def custom_identify_handler(stream):
138                peer_id = stream.muxed_conn.peer_id
139                print(f"\n🔗 Received identify request from peer: {peer_id}")
140
141                # Show remote address in multiaddr format
142                try:
143                    from libp2p.identity.identify.identify import (
144                        _remote_address_to_multiaddr,
145                    )
146
147                    remote_address = stream.get_remote_address()
148                    if remote_address:
149                        observed_multiaddr = _remote_address_to_multiaddr(
150                            remote_address
151                        )
152                        # Add the peer ID to create a complete multiaddr
153                        complete_multiaddr = f"{observed_multiaddr}/p2p/{peer_id}"
154                        print(f"   Remote address: {complete_multiaddr}")
155                    else:
156                        print(f"   Remote address: {remote_address}")
157                except Exception:
158                    print(f"   Remote address: {stream.get_remote_address()}")
159
160                # Call the original handler
161                await identify_handler(stream)
162
163                print(f"✅ Successfully processed identify request from {peer_id}")
164
165            # Replace the handler with our custom one
166            host_a.set_stream_handler(IDENTIFY_PROTOCOL_ID, custom_identify_handler)
167
168            try:
169                await trio.sleep_forever()
170            except KeyboardInterrupt:
171                print("\n🛑 Shutting down listener...")
172                logger.info("Listener interrupted by user")
173                return
174
175    else:
176        # Create second host (dialer)
177        from libp2p.utils.address_validation import (
178            find_free_port,
179            get_available_interfaces,
180            get_optimal_binding_address,
181        )
182
183        if port <= 0:
184            port = find_free_port()
185
186        listen_addrs = get_available_interfaces(port)
187        host_b = new_host()
188
189        async with (
190            host_b.run(listen_addrs=listen_addrs),
191            trio.open_nursery() as nursery,
192        ):
193            # Start the peer-store cleanup task
194            nursery.start_soon(host_b.get_peerstore().start_cleanup_task, 60)
195
196            # Connect to the first host
197            print(f"dialer (host_b) listening on {host_b.get_addrs()[0]}")
198            maddr = multiaddr.Multiaddr(destination)
199            info = info_from_p2p_addr(maddr)
200            print(f"Second host connecting to peer: {info.peer_id}")
201
202            try:
203                await host_b.connect(info)
204            except Exception as e:
205                error_msg = str(e)
206                if "unable to connect" in error_msg or "SwarmException" in error_msg:
207                    print(f"\n❌ Cannot connect to peer: {info.peer_id}")
208                    print(f"   Address: {destination}")
209                    print(f"   Error: {error_msg}")
210                    print(
211                        "\n💡 Make sure the peer is running and the address is correct."
212                    )
213                    return
214                else:
215                    # Re-raise other exceptions
216                    raise
217
218            stream = await host_b.new_stream(info.peer_id, (IDENTIFY_PROTOCOL_ID,))
219
220            try:
221                print("Starting identify protocol...")
222
223                # Read the response using the utility function
224                from libp2p.utils.varint import read_length_prefixed_protobuf
225
226                response = await read_length_prefixed_protobuf(
227                    stream, use_varint_format
228                )
229                full_response = response
230
231                await stream.close()
232
233                # Parse the response using the robust protocol-level function
234                # This handles both old and new formats automatically
235                identify_msg = parse_identify_response(full_response)
236                print_identify_response(identify_msg)
237
238            except Exception as e:
239                error_msg = str(e)
240                print(f"Identify protocol error: {error_msg}")
241
242                # Check for specific format mismatch errors
243                if "Error parsing message" in error_msg or "DecodeError" in error_msg:
244                    print("\n" + "=" * 60)
245                    print("FORMAT MISMATCH DETECTED!")
246                    print("=" * 60)
247                    if use_varint_format:
248                        print(
249                            "You are using length-prefixed format (default) but the "
250                            "listener"
251                        )
252                        print("is using raw protobuf format.")
253                        print(
254                            "\nTo fix this, run the dialer with the --raw-format flag:"
255                        )
256                        print(f"identify-demo --raw-format -d {destination}")
257                    else:
258                        print("You are using raw protobuf format but the listener")
259                        print("is using length-prefixed format (default).")
260                        print(
261                            "\nTo fix this, run the dialer without the --raw-format "
262                            "flag:"
263                        )
264                        print(f"identify-demo -d {destination}")
265                    print("=" * 60)
266                else:
267                    import traceback
268
269                    traceback.print_exc()
270
271            return
272
273
274def main() -> None:
275    description = """
276    This program demonstrates the libp2p identify protocol.
277    First run 'identify-demo -p <PORT> [--raw-format]' to start a listener.
278    Then run 'identify-demo <ANOTHER_PORT> -d <DESTINATION>'
279    where <DESTINATION> is the multiaddress shown by the listener.
280
281    Use --raw-format to send raw protobuf messages (old format) instead of
282    length-prefixed protobuf messages (new format, default).
283    """
284
285    example_maddr = (
286        "/ip4/[HOST_IP]/tcp/8888/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
287    )
288
289    parser = argparse.ArgumentParser(description=description)
290    parser.add_argument("-p", "--port", default=0, type=int, help="source port number")
291    parser.add_argument(
292        "-d",
293        "--destination",
294        type=str,
295        help=f"destination multiaddr string, e.g. {example_maddr}",
296    )
297    parser.add_argument(
298        "--raw-format",
299        action="store_true",
300        help=(
301            "use raw protobuf format (old format) instead of "
302            "length-prefixed (new format)"
303        ),
304    )
305
306    args = parser.parse_args()
307
308    # Determine format: use varint (length-prefixed) if --raw-format is specified,
309    # otherwise use raw protobuf format (old format)
310    use_varint_format = not args.raw_format
311
312    try:
313        if args.destination:
314            # Run in dialer mode
315            trio.run(run, *(args.port, args.destination, use_varint_format))
316        else:
317            # Run in listener mode
318            trio.run(run, *(args.port, args.destination, use_varint_format))
319    except KeyboardInterrupt:
320        print("\n👋 Goodbye!")
321        logger.info("Application interrupted by user")
322    except Exception as e:
323        print(f"\n❌ Error: {str(e)}")
324        logger.error("Error: %s", str(e))
325        sys.exit(1)
326
327
328if __name__ == "__main__":
329    main()