Skip to content

Dead Letter Queue Tracking

DLQTracker records Prometheus metrics for messages that exhaust retries and land in a Dead Letter Queue (DLQ). Use it in any background worker that routes failed messages to a DLQ.

Quick Start

Python
from obskit import DLQTracker, DLQReason

dlq = DLQTracker("orders_dlq")

# In your retry loop — when retries are exhausted:
dlq.track_message_sent(
    original_queue="orders",
    reason=DLQReason.MAX_RETRIES.value,
    message_id="msg-abc123",
    message_age_seconds=300,
    retry_count=5,
)

Metrics emitted

Metric Type Labels
dlq_messages_total Counter dlq_name, original_queue, reason
dlq_message_age_seconds Histogram dlq_name, original_queue
dlq_size Gauge dlq_name
dlq_oldest_message_age_seconds Gauge dlq_name
dlq_processing_total Counter dlq_name, status
dlq_processing_latency_seconds Histogram dlq_name

DLQ Reasons

DLQReason is an enum with common reason codes:

Value Meaning
MAX_RETRIES Message exhausted retry budget
PARSE_ERROR Payload could not be decoded
VALIDATION_ERROR Payload failed schema validation
HANDLER_ERROR Business-logic exception
TIMEOUT Handler exceeded time limit
REJECTED Explicitly rejected by handler
EXPIRED TTL expired before processing
UNKNOWN Catch-all for unexpected errors

Tracking reprocessing

Python
with dlq.track_processing("msg-abc123"):
    reprocess(message)
# Records dlq_processing_total{status="success"} and processing latency

Global registry

Python
from obskit import get_dlq_tracker

dlq = get_dlq_tracker("orders_dlq")       # creates if first call
same_dlq = get_dlq_tracker("orders_dlq")  # returns cached instance

Threshold alerts

Python
def on_threshold(name, size):
    alert_pagerduty(f"DLQ {name} has {size} messages!")

dlq = DLQTracker("critical_dlq", alert_threshold=50, on_threshold_exceeded=on_threshold)

API Reference

obskit.integrations.queue.dlq.DLQTracker

Track Dead Letter Queue metrics.

Parameters

dlq_name : str Name of the DLQ alert_threshold : int Number of messages that triggers alert on_threshold_exceeded : callable, optional Callback when threshold exceeded

Source code in src/obskit/integrations/queue/dlq.py
Python
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class DLQTracker:
    """
    Track Dead Letter Queue metrics.

    Parameters
    ----------
    dlq_name : str
        Name of the DLQ
    alert_threshold : int
        Number of messages that triggers alert
    on_threshold_exceeded : callable, optional
        Callback when threshold exceeded
    """

    def __init__(
        self,
        dlq_name: str,
        alert_threshold: int = 100,
        on_threshold_exceeded: Callable[[str, int], None] | None = None,
    ):
        self.dlq_name = dlq_name
        self.alert_threshold = alert_threshold
        self.on_threshold_exceeded = on_threshold_exceeded

        self._messages: dict[str, DLQMessage] = {}
        self._total_messages = 0
        self._processing_times: list[float] = []
        self._processing_success = 0
        self._processing_failure = 0
        self._lock = threading.Lock()

    def track_message_sent(
        self,
        original_queue: str,
        reason: str,
        message_id: str | None = None,
        message_age_seconds: float = 0.0,
        retry_count: int = 0,
        error_message: str | None = None,
        **metadata: Any,
    ) -> None:
        """
        Track a message sent to DLQ.

        Parameters
        ----------
        original_queue : str
            Original queue the message came from
        reason : str
            Reason for DLQ (use DLQReason values)
        message_id : str, optional
            Message ID
        message_age_seconds : float
            How old the message was when sent to DLQ
        retry_count : int
            Number of retries before DLQ
        error_message : str, optional
            Error message
        **metadata
            Additional metadata
        """
        msg_id = message_id or f"dlq-{time.time()}"

        message = DLQMessage(
            message_id=msg_id,
            original_queue=original_queue,
            reason=reason,
            age_seconds=message_age_seconds,
            retry_count=retry_count,
            error_message=error_message,
            metadata=metadata,
        )

        with self._lock:
            self._messages[msg_id] = message
            self._total_messages += 1
            current_size = len(self._messages)

        # Update metrics
        DLQ_MESSAGES_TOTAL.labels(
            dlq_name=self.dlq_name, original_queue=original_queue, reason=reason
        ).inc()

        DLQ_MESSAGE_AGE.labels(dlq_name=self.dlq_name, original_queue=original_queue).observe(
            message_age_seconds
        )

        DLQ_SIZE.labels(dlq_name=self.dlq_name).set(current_size)

        # Update oldest message age
        self._update_oldest_age()

        # Log
        logger.warning(
            "message_sent_to_dlq",
            dlq_name=self.dlq_name,
            original_queue=original_queue,
            reason=reason,
            message_id=msg_id,
            message_age_seconds=message_age_seconds,
            retry_count=retry_count,
        )

        # Check threshold
        if current_size >= self.alert_threshold:
            logger.error(
                "dlq_threshold_exceeded",
                dlq_name=self.dlq_name,
                current_size=current_size,
                threshold=self.alert_threshold,
            )
            if self.on_threshold_exceeded:
                self.on_threshold_exceeded(self.dlq_name, current_size)

    @contextmanager
    def track_processing(
        self,
        message_id: str,
    ) -> Generator[None, None, None]:
        """
        Track processing of a DLQ message.

        Parameters
        ----------
        message_id : str
            ID of message being processed
        """
        start_time = time.perf_counter()
        success = True

        try:
            yield
        except Exception:
            success = False
            raise
        finally:
            duration = time.perf_counter() - start_time

            with self._lock:
                self._processing_times.append(duration)
                if len(self._processing_times) > 1000:
                    self._processing_times = self._processing_times[-1000:]

                if success:
                    self._processing_success += 1
                    # Remove from local tracking
                    if message_id in self._messages:
                        del self._messages[message_id]
                else:
                    self._processing_failure += 1

            status = "success" if success else "failure"

            DLQ_PROCESSING_TOTAL.labels(dlq_name=self.dlq_name, status=status).inc()

            DLQ_PROCESSING_LATENCY.labels(dlq_name=self.dlq_name).observe(duration)

            DLQ_REPROCESSED_TOTAL.labels(dlq_name=self.dlq_name, success=str(success).lower()).inc()

            # Update size
            DLQ_SIZE.labels(dlq_name=self.dlq_name).set(len(self._messages))

            logger.info(
                "dlq_message_processed",
                dlq_name=self.dlq_name,
                message_id=message_id,
                success=success,
                duration_seconds=duration,
            )

    def track_message_removed(self, message_id: str, reason: str = "processed") -> None:
        """Track a message removed from DLQ."""
        with self._lock:
            if message_id in self._messages:
                del self._messages[message_id]

        DLQ_SIZE.labels(dlq_name=self.dlq_name).set(len(self._messages))
        self._update_oldest_age()

    def set_dlq_size(self, size: int) -> None:
        """Manually set DLQ size (from external source)."""
        DLQ_SIZE.labels(dlq_name=self.dlq_name).set(size)

    def set_oldest_message_age(self, age_seconds: float) -> None:
        """Manually set oldest message age."""
        DLQ_OLDEST_MESSAGE_AGE.labels(dlq_name=self.dlq_name).set(age_seconds)

    def _update_oldest_age(self) -> None:
        """Update oldest message age metric."""
        with self._lock:
            if not self._messages:
                DLQ_OLDEST_MESSAGE_AGE.labels(dlq_name=self.dlq_name).set(0)
                return

            oldest = min(self._messages.values(), key=lambda m: m.timestamp)
            age = (datetime.now(UTC) - oldest.timestamp).total_seconds()
            DLQ_OLDEST_MESSAGE_AGE.labels(dlq_name=self.dlq_name).set(age)

    def get_stats(self) -> DLQStats:
        """Get current DLQ statistics."""
        with self._lock:
            messages_by_reason: dict[str, int] = {}
            messages_by_queue: dict[str, int] = {}

            for msg in self._messages.values():
                messages_by_reason[msg.reason] = messages_by_reason.get(msg.reason, 0) + 1
                messages_by_queue[msg.original_queue] = (
                    messages_by_queue.get(msg.original_queue, 0) + 1
                )

            oldest_age = 0.0
            if self._messages:
                oldest = min(self._messages.values(), key=lambda m: m.timestamp)
                oldest_age = (datetime.now(UTC) - oldest.timestamp).total_seconds()

            total_processed = self._processing_success + self._processing_failure
            success_rate = 1.0
            if total_processed > 0:
                success_rate = self._processing_success / total_processed

            avg_processing = 0.0
            if self._processing_times:
                avg_processing = sum(self._processing_times) / len(self._processing_times)

            return DLQStats(
                dlq_name=self.dlq_name,
                total_messages=self._total_messages,
                current_size=len(self._messages),
                oldest_message_age_seconds=oldest_age,
                messages_by_reason=messages_by_reason,
                messages_by_queue=messages_by_queue,
                processing_success_rate=success_rate,
                avg_processing_time_seconds=avg_processing,
            )

    def get_messages(self, limit: int = 100) -> list[DLQMessage]:
        """Get messages currently in DLQ."""
        with self._lock:
            messages = list(self._messages.values())
            messages.sort(key=lambda m: m.timestamp)
            return messages[:limit]

track_message_sent

Python
track_message_sent(
    original_queue: str,
    reason: str,
    message_id: str | None = None,
    message_age_seconds: float = 0.0,
    retry_count: int = 0,
    error_message: str | None = None,
    **metadata: Any,
) -> None

Track a message sent to DLQ.

Parameters

original_queue : str Original queue the message came from reason : str Reason for DLQ (use DLQReason values) message_id : str, optional Message ID message_age_seconds : float How old the message was when sent to DLQ retry_count : int Number of retries before DLQ error_message : str, optional Error message **metadata Additional metadata

Source code in src/obskit/integrations/queue/dlq.py
Python
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def track_message_sent(
    self,
    original_queue: str,
    reason: str,
    message_id: str | None = None,
    message_age_seconds: float = 0.0,
    retry_count: int = 0,
    error_message: str | None = None,
    **metadata: Any,
) -> None:
    """
    Track a message sent to DLQ.

    Parameters
    ----------
    original_queue : str
        Original queue the message came from
    reason : str
        Reason for DLQ (use DLQReason values)
    message_id : str, optional
        Message ID
    message_age_seconds : float
        How old the message was when sent to DLQ
    retry_count : int
        Number of retries before DLQ
    error_message : str, optional
        Error message
    **metadata
        Additional metadata
    """
    msg_id = message_id or f"dlq-{time.time()}"

    message = DLQMessage(
        message_id=msg_id,
        original_queue=original_queue,
        reason=reason,
        age_seconds=message_age_seconds,
        retry_count=retry_count,
        error_message=error_message,
        metadata=metadata,
    )

    with self._lock:
        self._messages[msg_id] = message
        self._total_messages += 1
        current_size = len(self._messages)

    # Update metrics
    DLQ_MESSAGES_TOTAL.labels(
        dlq_name=self.dlq_name, original_queue=original_queue, reason=reason
    ).inc()

    DLQ_MESSAGE_AGE.labels(dlq_name=self.dlq_name, original_queue=original_queue).observe(
        message_age_seconds
    )

    DLQ_SIZE.labels(dlq_name=self.dlq_name).set(current_size)

    # Update oldest message age
    self._update_oldest_age()

    # Log
    logger.warning(
        "message_sent_to_dlq",
        dlq_name=self.dlq_name,
        original_queue=original_queue,
        reason=reason,
        message_id=msg_id,
        message_age_seconds=message_age_seconds,
        retry_count=retry_count,
    )

    # Check threshold
    if current_size >= self.alert_threshold:
        logger.error(
            "dlq_threshold_exceeded",
            dlq_name=self.dlq_name,
            current_size=current_size,
            threshold=self.alert_threshold,
        )
        if self.on_threshold_exceeded:
            self.on_threshold_exceeded(self.dlq_name, current_size)

track_processing

Python
track_processing(
    message_id: str,
) -> Generator[None, None, None]

Track processing of a DLQ message.

Parameters

message_id : str ID of message being processed

Source code in src/obskit/integrations/queue/dlq.py
Python
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@contextmanager
def track_processing(
    self,
    message_id: str,
) -> Generator[None, None, None]:
    """
    Track processing of a DLQ message.

    Parameters
    ----------
    message_id : str
        ID of message being processed
    """
    start_time = time.perf_counter()
    success = True

    try:
        yield
    except Exception:
        success = False
        raise
    finally:
        duration = time.perf_counter() - start_time

        with self._lock:
            self._processing_times.append(duration)
            if len(self._processing_times) > 1000:
                self._processing_times = self._processing_times[-1000:]

            if success:
                self._processing_success += 1
                # Remove from local tracking
                if message_id in self._messages:
                    del self._messages[message_id]
            else:
                self._processing_failure += 1

        status = "success" if success else "failure"

        DLQ_PROCESSING_TOTAL.labels(dlq_name=self.dlq_name, status=status).inc()

        DLQ_PROCESSING_LATENCY.labels(dlq_name=self.dlq_name).observe(duration)

        DLQ_REPROCESSED_TOTAL.labels(dlq_name=self.dlq_name, success=str(success).lower()).inc()

        # Update size
        DLQ_SIZE.labels(dlq_name=self.dlq_name).set(len(self._messages))

        logger.info(
            "dlq_message_processed",
            dlq_name=self.dlq_name,
            message_id=message_id,
            success=success,
            duration_seconds=duration,
        )

track_message_removed

Python
track_message_removed(
    message_id: str, reason: str = "processed"
) -> None

Track a message removed from DLQ.

Source code in src/obskit/integrations/queue/dlq.py
Python
328
329
330
331
332
333
334
335
def track_message_removed(self, message_id: str, reason: str = "processed") -> None:
    """Track a message removed from DLQ."""
    with self._lock:
        if message_id in self._messages:
            del self._messages[message_id]

    DLQ_SIZE.labels(dlq_name=self.dlq_name).set(len(self._messages))
    self._update_oldest_age()

set_dlq_size

Python
set_dlq_size(size: int) -> None

Manually set DLQ size (from external source).

Source code in src/obskit/integrations/queue/dlq.py
Python
337
338
339
def set_dlq_size(self, size: int) -> None:
    """Manually set DLQ size (from external source)."""
    DLQ_SIZE.labels(dlq_name=self.dlq_name).set(size)

set_oldest_message_age

Python
set_oldest_message_age(age_seconds: float) -> None

Manually set oldest message age.

Source code in src/obskit/integrations/queue/dlq.py
Python
341
342
343
def set_oldest_message_age(self, age_seconds: float) -> None:
    """Manually set oldest message age."""
    DLQ_OLDEST_MESSAGE_AGE.labels(dlq_name=self.dlq_name).set(age_seconds)

get_stats

Python
get_stats() -> DLQStats

Get current DLQ statistics.

Source code in src/obskit/integrations/queue/dlq.py
Python
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def get_stats(self) -> DLQStats:
    """Get current DLQ statistics."""
    with self._lock:
        messages_by_reason: dict[str, int] = {}
        messages_by_queue: dict[str, int] = {}

        for msg in self._messages.values():
            messages_by_reason[msg.reason] = messages_by_reason.get(msg.reason, 0) + 1
            messages_by_queue[msg.original_queue] = (
                messages_by_queue.get(msg.original_queue, 0) + 1
            )

        oldest_age = 0.0
        if self._messages:
            oldest = min(self._messages.values(), key=lambda m: m.timestamp)
            oldest_age = (datetime.now(UTC) - oldest.timestamp).total_seconds()

        total_processed = self._processing_success + self._processing_failure
        success_rate = 1.0
        if total_processed > 0:
            success_rate = self._processing_success / total_processed

        avg_processing = 0.0
        if self._processing_times:
            avg_processing = sum(self._processing_times) / len(self._processing_times)

        return DLQStats(
            dlq_name=self.dlq_name,
            total_messages=self._total_messages,
            current_size=len(self._messages),
            oldest_message_age_seconds=oldest_age,
            messages_by_reason=messages_by_reason,
            messages_by_queue=messages_by_queue,
            processing_success_rate=success_rate,
            avg_processing_time_seconds=avg_processing,
        )

get_messages

Python
get_messages(limit: int = 100) -> list[DLQMessage]

Get messages currently in DLQ.

Source code in src/obskit/integrations/queue/dlq.py
Python
393
394
395
396
397
398
def get_messages(self, limit: int = 100) -> list[DLQMessage]:
    """Get messages currently in DLQ."""
    with self._lock:
        messages = list(self._messages.values())
        messages.sort(key=lambda m: m.timestamp)
        return messages[:limit]

obskit.integrations.queue.dlq.DLQReason

Bases: Enum

Reasons for sending to DLQ.

Source code in src/obskit/integrations/queue/dlq.py
Python
88
89
90
91
92
93
94
95
96
97
98
class DLQReason(Enum):
    """Reasons for sending to DLQ."""

    MAX_RETRIES = "max_retries_exceeded"
    PARSE_ERROR = "parse_error"
    VALIDATION_ERROR = "validation_error"
    HANDLER_ERROR = "handler_error"
    TIMEOUT = "timeout"
    REJECTED = "rejected"
    EXPIRED = "expired"
    UNKNOWN = "unknown"

obskit.integrations.queue.dlq.get_dlq_tracker

Python
get_dlq_tracker(dlq_name: str, **kwargs: Any) -> DLQTracker

Get or create a DLQ tracker.

Source code in src/obskit/integrations/queue/dlq.py
Python
409
410
411
412
413
414
415
416
417
418
419
def get_dlq_tracker(
    dlq_name: str,
    **kwargs: Any,
) -> DLQTracker:
    """Get or create a DLQ tracker."""
    if dlq_name not in _dlq_trackers:
        with _dlq_lock:
            if dlq_name not in _dlq_trackers:
                _dlq_trackers[dlq_name] = DLQTracker(dlq_name, **kwargs)

    return _dlq_trackers[dlq_name]