Skip to content

RabbitMQ Distributed Tracing

obskit propagates W3C trace context across RabbitMQ message boundaries so publisher and consumer spans appear as a single end-to-end trace in Tempo/Jaeger.

Publisher — inject trace context

Call inject_trace_context_to_headers before publishing. The current span's traceparent is written into the headers dict you pass to BasicProperties.

Python
import pika
from obskit import inject_trace_context_to_headers

headers: dict = {}
inject_trace_context_to_headers(headers)   # writes "traceparent" key

props = pika.BasicProperties(
    content_type="application/json",
    headers=headers,
)
channel.basic_publish(
    exchange="",
    routing_key="orders",
    body=payload,
    properties=props,
)

Consumer — extract trace context automatically

instrument_rabbitmq wraps basic_consume so every delivered message is:

  1. Processed inside a child OTel span (rabbitmq.consume.<queue_name>).
  2. Automatically linked to the publisher's span via the traceparent header.
Python
from obskit.integrations.queue.rabbitmq import instrument_rabbitmq

instrument_rabbitmq(channel, queue_name="orders")

def handle_order(ch, method, properties, body):
    # Active span is a child of the publisher span
    process_order(body)

channel.basic_consume(queue="orders", on_message_callback=handle_order)
channel.start_consuming()

Manual context extraction with use_span_context

New in v1.8.0. If you process messages outside instrument_rabbitmq (e.g. in a custom async consumer), use extract_trace_context_from_headers and use_span_context to re-parent your spans under the publisher's trace manually:

Python
from obskit import extract_trace_context_from_headers, use_span_context
from obskit.tracing import async_trace_span

async def on_message(message):
    headers = message.properties.headers or {}
    ctx = extract_trace_context_from_headers(headers)

    with use_span_context(ctx):
        async with async_trace_span("orders.process"):
            # This span is a child of the publisher's span
            await process_order(message.body)

use_span_context is a sync context manager — use it around async with blocks or regular with blocks. When ctx is None (no traceparent header found), it is a no-op and a fresh root span is created.

Span attributes

The consumer span carries these attributes:

Attribute Value
messaging.system "rabbitmq"
messaging.destination <queue_name>
messaging.message_id Value of properties.message_id

Without OTel installed

Both helpers degrade gracefully:

  • inject_trace_context_to_headers — no-op; headers dict is left unchanged.
  • Consumer callback — runs normally without a span; QueueTracker metrics still collected.

API Reference

obskit.integrations.queue.rabbitmq.inject_trace_context_to_headers

Python
inject_trace_context_to_headers(
    headers: dict[str, Any],
) -> None

Inject the current W3C traceparent into headers in-place.

Call this before publishing to RabbitMQ so the consumer can join the same distributed trace.

Parameters

headers : dict Mutable dict passed as BasicProperties(headers=...). Modified in-place with traceparent (and tracestate) keys.

Example

headers: dict = {} inject_trace_context_to_headers(headers) props = pika.BasicProperties(headers=headers) channel.basic_publish(exchange="", routing_key="q", body=b"...", properties=props)

Source code in src/obskit/integrations/queue/rabbitmq.py
Python
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def inject_trace_context_to_headers(headers: dict[str, Any]) -> None:
    """Inject the current W3C ``traceparent`` into *headers* in-place.

    Call this before publishing to RabbitMQ so the consumer can join the
    same distributed trace.

    Parameters
    ----------
    headers : dict
        Mutable dict passed as ``BasicProperties(headers=...)``.
        Modified in-place with ``traceparent`` (and ``tracestate``) keys.

    Example
    -------
    >>> headers: dict = {}
    >>> inject_trace_context_to_headers(headers)
    >>> props = pika.BasicProperties(headers=headers)
    >>> channel.basic_publish(exchange="", routing_key="q", body=b"...", properties=props)
    """
    try:
        from opentelemetry import propagate as _propagate  # noqa: PLC0415

        _propagate.inject(headers)
    except ImportError:  # pragma: no cover
        pass  # OTel not installed — no-op  # NOSONAR

obskit.integrations.queue.rabbitmq.extract_trace_context_from_headers

Python
extract_trace_context_from_headers(
    headers: dict[str, Any] | None,
) -> Any | None

Extract the W3C trace context from AMQP message headers.

Call this in your consumer before processing a message so that child spans are attached to the publisher's trace (end-to-end visibility in Jaeger / Tempo).

Parameters

headers : dict | None properties.headers from the AMQP message. None or empty dicts are safely handled — the function returns None (no context).

Returns

opentelemetry.context.Context | None Extracted OTel context, or None when no trace headers are present or opentelemetry-api is not installed.

Example

::

Text Only
from obskit.integrations.queue.rabbitmq import extract_trace_context_from_headers
from obskit.tracing.tracer import use_span_context

def _on_message(ch, method, properties, body):
    ctx = extract_trace_context_from_headers(properties.headers or {})
    with use_span_context(ctx):
        route_event(body)
Source code in src/obskit/integrations/queue/rabbitmq.py
Python
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def extract_trace_context_from_headers(headers: dict[str, Any] | None) -> Any | None:
    """Extract the W3C trace context from AMQP message headers.

    Call this in your consumer *before* processing a message so that child
    spans are attached to the publisher's trace (end-to-end visibility in
    Jaeger / Tempo).

    Parameters
    ----------
    headers : dict | None
        ``properties.headers`` from the AMQP message.  ``None`` or empty dicts
        are safely handled — the function returns ``None`` (no context).

    Returns
    -------
    opentelemetry.context.Context | None
        Extracted OTel context, or ``None`` when no trace headers are present
        or opentelemetry-api is not installed.

    Example
    -------
    ::

        from obskit.integrations.queue.rabbitmq import extract_trace_context_from_headers
        from obskit.tracing.tracer import use_span_context

        def _on_message(ch, method, properties, body):
            ctx = extract_trace_context_from_headers(properties.headers or {})
            with use_span_context(ctx):
                route_event(body)
    """
    return _extract_trace_context(headers)

obskit.integrations.queue.rabbitmq.instrument_rabbitmq

Python
instrument_rabbitmq(
    channel: Any,
    queue_name: str,
    consumer_tag: str | None = None,
) -> None

Instrument a RabbitMQ channel for automatic message tracking and tracing.

Wraps channel.basic_consume so every incoming message is:

  1. Tracked via :class:~obskit.integrations.queue.tracker.QueueTracker (processing duration + error counts).
  2. Processed inside an OTel span whose parent is extracted from the traceparent / tracestate headers in the AMQP message properties — enabling end-to-end distributed traces when the publisher uses :func:inject_trace_context_to_headers.

Parameters

channel : pika.channel.Channel RabbitMQ channel to instrument. queue_name : str Name of the queue being consumed. consumer_tag : str, optional Consumer tag for identification.

Example

import pika from obskit.integrations.queue.rabbitmq import instrument_rabbitmq

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()

instrument_rabbitmq(channel, queue_name="orders") channel.basic_consume(queue='orders', on_message_callback=callback)

Source code in src/obskit/integrations/queue/rabbitmq.py
Python
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def instrument_rabbitmq(
    channel: Any,
    queue_name: str,
    consumer_tag: str | None = None,
) -> None:
    """Instrument a RabbitMQ channel for automatic message tracking and tracing.

    Wraps ``channel.basic_consume`` so every incoming message is:

    1. Tracked via :class:`~obskit.integrations.queue.tracker.QueueTracker`
       (processing duration + error counts).
    2. Processed inside an OTel span whose parent is extracted from the
       ``traceparent`` / ``tracestate`` headers in the AMQP message
       properties — enabling end-to-end distributed traces when the
       publisher uses :func:`inject_trace_context_to_headers`.

    Parameters
    ----------
    channel : pika.channel.Channel
        RabbitMQ channel to instrument.
    queue_name : str
        Name of the queue being consumed.
    consumer_tag : str, optional
        Consumer tag for identification.

    Example
    -------
    >>> import pika
    >>> from obskit.integrations.queue.rabbitmq import instrument_rabbitmq
    >>>
    >>> connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    >>> channel = connection.channel()
    >>>
    >>> instrument_rabbitmq(channel, queue_name="orders")
    >>> channel.basic_consume(queue='orders', on_message_callback=callback)
    """
    try:
        from obskit.integrations.queue.tracker import QueueTracker  # noqa: PLC0415

        tracker = QueueTracker(queue_name)
        original_consume = channel.basic_consume

        def instrumented_consume(*args: Any, **kwargs: Any) -> Any:
            """Wrap consume to track messages."""
            callback = kwargs.get("on_message_callback") or (args[0] if args else None)

            if callback:

                def tracked_callback(
                    ch: Any,
                    method: Any,
                    properties: Any,
                    body: Any,
                ) -> None:
                    """Tracked message callback with W3C trace-context propagation."""
                    msg_headers: dict[str, Any] | None = (
                        properties.headers if hasattr(properties, "headers") else None
                    )
                    parent_ctx = _extract_trace_context(msg_headers)

                    msg_id: str | None = (
                        properties.message_id
                        if hasattr(properties, "message_id")
                        else None
                    )

                    if parent_ctx is not None:
                        try:
                            from opentelemetry import trace as _trace  # noqa: PLC0415
                            from opentelemetry.context import attach, detach  # noqa: PLC0415

                            token = attach(parent_ctx)
                            try:
                                tracer = _trace.get_tracer("obskit.rabbitmq")
                                with tracer.start_as_current_span(
                                    f"rabbitmq.consume.{queue_name}",
                                    attributes={
                                        "messaging.system": "rabbitmq",
                                        "messaging.destination": queue_name,
                                        "messaging.message_id": msg_id or "",
                                    },
                                ):
                                    with tracker.track_message_processing(
                                        operation="process_message",
                                        message_id=msg_id,
                                    ):
                                        callback(ch, method, properties, body)
                            finally:
                                detach(token)
                            return
                        except ImportError:  # pragma: no cover
                            pass  # NOSONAR

                    # Fallback — track without OTel span.
                    with tracker.track_message_processing(
                        operation="process_message",
                        message_id=msg_id,
                    ):
                        callback(ch, method, properties, body)

                kwargs["on_message_callback"] = tracked_callback

            return original_consume(*args, **kwargs)

        channel.basic_consume = instrumented_consume

        logger.info(
            "rabbitmq_instrumented",
            queue=queue_name,
            consumer_tag=consumer_tag,
        )

    except ImportError:  # pragma: no cover
        logger.warning(  # pragma: no cover
            "pika_not_available",
            message="pika (RabbitMQ client) not installed. Install with: pip install pika",
        )