Skip to content

Resilience Instrumentation

obskit provides Prometheus metrics for circuit breakers and rate limiters so failures, state transitions, and rate-limit hits become visible in dashboards without modifying your business logic.


Circuit Breaker Metrics

instrument_pybreaker — pybreaker integration

New in v1.8.0. Attach obskit metrics to any pybreaker CircuitBreaker instance:

Python
import pybreaker
from obskit import instrument_pybreaker

breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
instrument_pybreaker(breaker, name="payments-api")

That's it — no other code changes needed. The listener is registered with breaker.add_listener and starts recording metrics immediately.

instrument_circuit_breaker — generic breaker

Any object with an add_listener(listener) method works:

Python
from obskit.resilience.circuit_breaker import instrument_circuit_breaker

listener = instrument_circuit_breaker(my_breaker, name="twitter-api")

Emitted metrics

Metric Type Labels Description
circuit_breaker_state{name} Gauge name Current state: 0=closed, 1=open, 2=half-open
circuit_breaker_calls_total{name,outcome} Counter name, outcome Total calls — outcome is success or failure
circuit_breaker_failures_total{name} Counter name Failed calls
circuit_breaker_transitions_total{name,from_state,to_state} Counter name, from_state, to_state State transitions

Manual recording

The ObskitCircuitBreakerListener also exposes standalone helpers for custom integration:

Python
from obskit.resilience.circuit_breaker import ObskitCircuitBreakerListener

listener = ObskitCircuitBreakerListener("custom-breaker")

# Record outcomes
listener.record_success()
listener.record_failure(exc=RuntimeError("timeout"))

# Record a state change (also updates the transitions counter)
listener.record_state_change("open")
listener.record_state_change("half_open")
listener.record_state_change("closed")

Grafana alert example

PromQL
# Alert when any circuit breaker has been open for > 2 minutes
circuit_breaker_state > 0
  unless on(name) (circuit_breaker_state offset 2m == 0)

Rate Limiter Metrics

instrument_rate_limiter

New in v1.8.0. Wrap any object that has check() and/or record_limit() methods:

Python
from obskit import instrument_rate_limiter

instr = instrument_rate_limiter(my_rate_limiter, platform="twitter")

After instrumentation:

  • my_rate_limiter.check(...) — if it raises any exception, the hits counter is incremented and the exception is re-raised. If the exception carries a retry_after or reset_after attribute, the reset gauge is updated.
  • my_rate_limiter.record_limit(...) — the recorded counter is incremented, the original return value is preserved.

Emitted metrics

Metric Type Labels Description
rate_limit_hits_total{platform} Counter platform Exceptions raised by check() (rate-limited calls)
rate_limit_recorded_total{platform} Counter platform Calls to record_limit()
rate_limit_reset_seconds{platform} Gauge platform Seconds until the rate limit resets (retry_after / reset_after)

Parameters

Parameter Default Description
limiter required Any object with check() and/or record_limit() methods
platform "default" Label value for the platform dimension in all metrics

Example with tweepy

Python
import tweepy
from obskit import instrument_rate_limiter

client = tweepy.Client(bearer_token="...")
instr = instrument_rate_limiter(client, platform="twitter")

# client.check() / client.record_limit() are now instrumented

Grafana alert example

PromQL
# Alert when Twitter rate-limit hits spike
rate(rate_limit_hits_total{platform="twitter"}[5m]) > 5


Retry Metrics

instrument_tenacity — tenacity retry instrumentation

New in v1.9.0. Attach Prometheus metrics to any tenacity retry decorator. Two usage patterns are supported:

With the retry() shorthand (tenacity 9.x recommended pattern):

Python
from tenacity import retry, retry_if_exception_type, stop_after_attempt
from tenacity import wait_exponential_jitter
from obskit import instrument_tenacity

platform_retry = instrument_tenacity(
    retry(
        retry=retry_if_exception_type(IOError),
        stop=stop_after_attempt(3),
        wait=wait_exponential_jitter(initial=0.5, max=8, jitter=2),
        before_sleep=_log_retry,   # existing callback is preserved
        reraise=True,
    ),
    name="platform_http",
)

@platform_retry
async def call_api():
    ...

With a Retrying/AsyncRetrying instance (use with .wraps):

Python
import tenacity
from obskit import instrument_tenacity

platform_retry = instrument_tenacity(
    tenacity.AsyncRetrying(
        retry=tenacity.retry_if_exception_type(IOError),
        stop=tenacity.stop_after_attempt(3),
        wait=tenacity.wait_exponential_jitter(initial=0.5, max=8),
        reraise=True,
    ),
    name="platform_http",
)

@platform_retry.wraps
async def call_api():
    ...

instrument_tenacity detects which form is passed and handles both transparently.

Emitted metrics

Metric Type Labels Description
retry_attempts_total{name, attempt_number} Counter name, attempt_number Incremented before each sleep between retries. attempt_number is the 1-based index of the attempt that just failed.
retry_exhausted_total{name} Counter name Incremented when the stop condition is reached on a failed attempt — all retries exhausted.

How the counters relate

For a stop_after_attempt(3) retry that always fails:

Attempt retry_attempts_total (attempt_number) retry_exhausted_total
1 fails → retry scheduled "1" +1
2 fails → retry scheduled "2" +1
3 fails → stop condition met +1

The last failure is captured by retry_exhausted_total instead of retry_attempts_total because tenacity does not sleep before re-raising.

Parameters

Parameter Description
retry_obj Either a tenacity.Retrying / tenacity.AsyncRetrying instance or the decorator factory returned by tenacity.retry(...) (a plain callable in tenacity 9.x). Both forms are supported.
name Label value used in all metric series. Use a human-readable name such as "twitter_api" or "payments_http".

Preserving existing hooks

Any before_sleep or after hook already on the retry object is called before the metrics hook fires — existing logging callbacks are not replaced:

Python
import tenacity
from obskit import instrument_tenacity

def log_retry(retry_state):
    print(f"Retry #{retry_state.attempt_number}")

retry_obj = tenacity.AsyncRetrying(
    stop=tenacity.stop_after_attempt(5),
    before_sleep=log_retry,   # ← preserved
)
instrument_tenacity(retry_obj, name="my_service")

Grafana alert example

PromQL
# Alert when any service is exhausting retries
rate(retry_exhausted_total[5m]) > 0

API Reference

obskit.integrations.resilience.pybreaker.instrument_pybreaker

Python
instrument_pybreaker(
    cb: Any, name: str
) -> "ObskitCircuitBreakerListener"

Attach obskit Prometheus metrics to a pybreaker CircuitBreaker.

Parameters

cb : pybreaker.CircuitBreaker The circuit breaker to instrument. name : str Metric label — typically the resource protected by the breaker, e.g. "twitter", "redis", "payments-api".

Returns

ObskitCircuitBreakerListener The attached listener (useful for testing or manual removal).

Raises

TypeError If cb does not expose an add_listener method.

Source code in src/obskit/integrations/resilience/pybreaker.py
Python
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def instrument_pybreaker(cb: Any, name: str) -> "ObskitCircuitBreakerListener":
    """Attach obskit Prometheus metrics to a pybreaker ``CircuitBreaker``.

    Parameters
    ----------
    cb : pybreaker.CircuitBreaker
        The circuit breaker to instrument.
    name : str
        Metric label — typically the resource protected by the breaker,
        e.g. ``"twitter"``, ``"redis"``, ``"payments-api"``.

    Returns
    -------
    ObskitCircuitBreakerListener
        The attached listener (useful for testing or manual removal).

    Raises
    ------
    TypeError
        If *cb* does not expose an ``add_listener`` method.
    """
    from obskit.resilience.circuit_breaker import instrument_circuit_breaker  # noqa: PLC0415

    return instrument_circuit_breaker(cb, name=name)

obskit.integrations.resilience.tenacity.instrument_tenacity

Python
instrument_tenacity(retry_obj: Any, name: str) -> Any

Attach Prometheus metrics to a tenacity retry decorator.

Accepts either a :class:tenacity.Retrying / :class:tenacity.AsyncRetrying instance or the decorator factory returned by :func:tenacity.retry (the @retry(...) shorthand in tenacity 9.x).

  • Instance path — hooks are patched in-place; use with .wraps:

.. code-block:: python

Text Only
  retry_obj = instrument_tenacity(
      tenacity.AsyncRetrying(stop=tenacity.stop_after_attempt(3), reraise=True),
      name="my_service",
  )

  @retry_obj.wraps
  async def call_remote():
      ...
  • Factory path — a new decorator factory is returned that patches hooks each time it is applied to a function:

.. code-block:: python

Text Only
  platform_retry = instrument_tenacity(
      retry(stop=stop_after_attempt(3), reraise=True),
      name="platform_http",
  )

  @platform_retry
  async def call_api():
      ...

Parameters

retry_obj : Either a tenacity Retrying / AsyncRetrying instance (has a stop attribute) or the decorator factory returned by tenacity.retry(...) (a plain callable without a stop attribute). name : str Label value for all metric series emitted by this retry context. Use a human-readable name such as "twitter_api" or "payments_http".

Returns

Any * If retry_obj is an instance: the same object with hooks patched. * If retry_obj is a factory: a new decorator factory that patches hooks on the Retrying/AsyncRetrying object created at decoration time.

Notes

  • retry_attempts_total is incremented in before_sleep — it fires for every attempt that fails and has a retry scheduled, so a 3-attempt exhaustion increments attempt_number "1" and "2" (the last failure is captured by retry_exhausted_total instead).

  • retry_exhausted_total is incremented in after when the stop condition is met on a failed attempt.

  • Any pre-existing before_sleep or after hook on the retry object is preserved and called first / after the metrics hooks.

Source code in src/obskit/integrations/resilience/tenacity.py
Python
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def instrument_tenacity(retry_obj: Any, name: str) -> Any:
    """Attach Prometheus metrics to a tenacity retry decorator.

    Accepts either a :class:`tenacity.Retrying` / :class:`tenacity.AsyncRetrying`
    instance **or** the decorator factory returned by :func:`tenacity.retry` (the
    ``@retry(...)`` shorthand in tenacity 9.x).

    * **Instance path** — hooks are patched in-place; use with ``.wraps``:

      .. code-block:: python

          retry_obj = instrument_tenacity(
              tenacity.AsyncRetrying(stop=tenacity.stop_after_attempt(3), reraise=True),
              name="my_service",
          )

          @retry_obj.wraps
          async def call_remote():
              ...

    * **Factory path** — a new decorator factory is returned that patches hooks
      each time it is applied to a function:

      .. code-block:: python

          platform_retry = instrument_tenacity(
              retry(stop=stop_after_attempt(3), reraise=True),
              name="platform_http",
          )

          @platform_retry
          async def call_api():
              ...

    Parameters
    ----------
    retry_obj :
        Either a tenacity ``Retrying`` / ``AsyncRetrying`` instance (has a
        ``stop`` attribute) **or** the decorator factory returned by
        ``tenacity.retry(...)`` (a plain callable without a ``stop`` attribute).
    name : str
        Label value for all metric series emitted by this retry context.
        Use a human-readable name such as ``"twitter_api"`` or
        ``"payments_http"``.

    Returns
    -------
    Any
        * If *retry_obj* is an instance: the same object with hooks patched.
        * If *retry_obj* is a factory: a new decorator factory that patches
          hooks on the ``Retrying``/``AsyncRetrying`` object created at
          decoration time.

    Notes
    -----
    * ``retry_attempts_total`` is incremented in ``before_sleep`` — it fires
      for every attempt that fails **and** has a retry scheduled, so a
      3-attempt exhaustion increments attempt_number ``"1"`` and ``"2"`` (the
      last failure is captured by ``retry_exhausted_total`` instead).

    * ``retry_exhausted_total`` is incremented in ``after`` when the stop
      condition is met on a failed attempt.

    * Any pre-existing ``before_sleep`` or ``after`` hook on the retry object
      is preserved and called first / after the metrics hooks.
    """
    # Discriminate between Retrying/AsyncRetrying instances (have a "stop"
    # attribute) and the plain decorator factory returned by tenacity.retry()
    # in tenacity 9.x (a plain function with no "stop" attribute).
    if not hasattr(retry_obj, "stop"):
        # Factory path — wrap the factory so hooks are patched at decoration time.
        _factory = retry_obj

        def _instrumented_factory(fn: Any) -> Any:
            wrapped = _factory(fn)
            retry_instance = getattr(wrapped, "retry", None)
            if retry_instance is not None:  # pragma: no branch
                _patch_hooks(retry_instance, name)
            return wrapped

        return _instrumented_factory

    # Instance path — patch hooks directly on the Retrying/AsyncRetrying object.
    _patch_hooks(retry_obj, name)
    return retry_obj

obskit.integrations.resilience.rate_limiter.instrument_rate_limiter

Python
instrument_rate_limiter(
    limiter: Any, platform: str = "default"
) -> RateLimiterInstrumentor

Instrument a rate limiter with Prometheus metrics.

Parameters

limiter: Any object with check() and record_limit() methods. platform: Label value used in all metrics. Default: "default".

Returns

RateLimiterInstrumentor The instrumentor wrapping limiter.

Source code in src/obskit/integrations/resilience/rate_limiter.py
Python
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def instrument_rate_limiter(
    limiter: Any, platform: str = "default"
) -> RateLimiterInstrumentor:
    """Instrument a rate limiter with Prometheus metrics.

    Parameters
    ----------
    limiter:
        Any object with ``check()`` and ``record_limit()`` methods.
    platform:
        Label value used in all metrics.  Default: ``"default"``.

    Returns
    -------
    RateLimiterInstrumentor
        The instrumentor wrapping *limiter*.
    """
    return RateLimiterInstrumentor(limiter, platform)

obskit.resilience.circuit_breaker.ObskitCircuitBreakerListener

pybreaker-compatible listener that records Prometheus metrics.

Can also be used standalone (without pybreaker) by calling :meth:record_success, :meth:record_failure, and :meth:record_state_change directly.

Parameters

name : str Label value for the name dimension in all metrics.

Source code in src/obskit/resilience/circuit_breaker.py
Python
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class ObskitCircuitBreakerListener:
    """
    pybreaker-compatible listener that records Prometheus metrics.

    Can also be used standalone (without pybreaker) by calling
    :meth:`record_success`, :meth:`record_failure`, and
    :meth:`record_state_change` directly.

    Parameters
    ----------
    name : str
        Label value for the ``name`` dimension in all metrics.
    """

    def __init__(self, name: str) -> None:
        self.name = name
        self._current_state: str = "closed"
        # Initialise gauge as CLOSED so the metric exists from the start.
        _STATE_GAUGE.labels(name=name).set(CircuitState.CLOSED)

    # ------------------------------------------------------------------
    # pybreaker listener interface
    # ------------------------------------------------------------------

    def state_change(self, cb: Any, old_state: Any, new_state: Any) -> None:
        """Called by pybreaker when the circuit state transitions."""
        # pybreaker may pass state as a string, an object with .name, or
        # an object whose str() representation is the state name.
        raw_new = getattr(new_state, "name", str(new_state)).lower()
        raw_old = getattr(old_state, "name", str(old_state)).lower()
        state_val = _STATE_MAP.get(raw_new, CircuitState.CLOSED)
        _STATE_GAUGE.labels(name=self.name).set(state_val)
        _TRANSITIONS_TOTAL.labels(
            name=self.name, from_state=raw_old, to_state=raw_new
        ).inc()

    def failure(self, cb: Any, exc: BaseException) -> None:
        """Called by pybreaker after a function raises an exception."""
        _FAILURES_TOTAL.labels(name=self.name).inc()
        _CALLS_TOTAL.labels(name=self.name, outcome="failure").inc()

    def success(self, cb: Any) -> None:
        """Called by pybreaker after a function returns successfully."""
        _CALLS_TOTAL.labels(name=self.name, outcome="success").inc()

    def before_call(self, cb: Any, func: Any, *args: Any, **kwargs: Any) -> None:
        """Called by pybreaker before the wrapped function is invoked."""

    # ------------------------------------------------------------------
    # Standalone helpers (no pybreaker required)
    # ------------------------------------------------------------------

    def record_success(self) -> None:
        """Record a successful call (standalone, without pybreaker)."""
        _CALLS_TOTAL.labels(name=self.name, outcome="success").inc()

    def record_failure(self, exc: BaseException | None = None) -> None:
        """Record a failed call (standalone, without pybreaker)."""
        _FAILURES_TOTAL.labels(name=self.name).inc()
        _CALLS_TOTAL.labels(name=self.name, outcome="failure").inc()

    def record_state_change(self, new_state: str) -> None:
        """Record a state transition (standalone, without pybreaker).

        Parameters
        ----------
        new_state : str
            One of ``"closed"``, ``"open"``, or ``"half_open"`` / ``"half-open"``.
        """
        old_state = self._current_state
        self._current_state = new_state.lower()
        state_val = _STATE_MAP.get(self._current_state, CircuitState.CLOSED)
        _STATE_GAUGE.labels(name=self.name).set(state_val)
        _TRANSITIONS_TOTAL.labels(
            name=self.name, from_state=old_state, to_state=self._current_state
        ).inc()

state_change

Python
state_change(
    cb: Any, old_state: Any, new_state: Any
) -> None

Called by pybreaker when the circuit state transitions.

Source code in src/obskit/resilience/circuit_breaker.py
Python
128
129
130
131
132
133
134
135
136
137
138
def state_change(self, cb: Any, old_state: Any, new_state: Any) -> None:
    """Called by pybreaker when the circuit state transitions."""
    # pybreaker may pass state as a string, an object with .name, or
    # an object whose str() representation is the state name.
    raw_new = getattr(new_state, "name", str(new_state)).lower()
    raw_old = getattr(old_state, "name", str(old_state)).lower()
    state_val = _STATE_MAP.get(raw_new, CircuitState.CLOSED)
    _STATE_GAUGE.labels(name=self.name).set(state_val)
    _TRANSITIONS_TOTAL.labels(
        name=self.name, from_state=raw_old, to_state=raw_new
    ).inc()

failure

Python
failure(cb: Any, exc: BaseException) -> None

Called by pybreaker after a function raises an exception.

Source code in src/obskit/resilience/circuit_breaker.py
Python
140
141
142
143
def failure(self, cb: Any, exc: BaseException) -> None:
    """Called by pybreaker after a function raises an exception."""
    _FAILURES_TOTAL.labels(name=self.name).inc()
    _CALLS_TOTAL.labels(name=self.name, outcome="failure").inc()

success

Python
success(cb: Any) -> None

Called by pybreaker after a function returns successfully.

Source code in src/obskit/resilience/circuit_breaker.py
Python
145
146
147
def success(self, cb: Any) -> None:
    """Called by pybreaker after a function returns successfully."""
    _CALLS_TOTAL.labels(name=self.name, outcome="success").inc()

before_call

Python
before_call(
    cb: Any, func: Any, *args: Any, **kwargs: Any
) -> None

Called by pybreaker before the wrapped function is invoked.

Source code in src/obskit/resilience/circuit_breaker.py
Python
149
150
def before_call(self, cb: Any, func: Any, *args: Any, **kwargs: Any) -> None:
    """Called by pybreaker before the wrapped function is invoked."""

record_success

Python
record_success() -> None

Record a successful call (standalone, without pybreaker).

Source code in src/obskit/resilience/circuit_breaker.py
Python
156
157
158
def record_success(self) -> None:
    """Record a successful call (standalone, without pybreaker)."""
    _CALLS_TOTAL.labels(name=self.name, outcome="success").inc()

record_failure

Python
record_failure(exc: BaseException | None = None) -> None

Record a failed call (standalone, without pybreaker).

Source code in src/obskit/resilience/circuit_breaker.py
Python
160
161
162
163
def record_failure(self, exc: BaseException | None = None) -> None:
    """Record a failed call (standalone, without pybreaker)."""
    _FAILURES_TOTAL.labels(name=self.name).inc()
    _CALLS_TOTAL.labels(name=self.name, outcome="failure").inc()

record_state_change

Python
record_state_change(new_state: str) -> None

Record a state transition (standalone, without pybreaker).

Parameters

new_state : str One of "closed", "open", or "half_open" / "half-open".

Source code in src/obskit/resilience/circuit_breaker.py
Python
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def record_state_change(self, new_state: str) -> None:
    """Record a state transition (standalone, without pybreaker).

    Parameters
    ----------
    new_state : str
        One of ``"closed"``, ``"open"``, or ``"half_open"`` / ``"half-open"``.
    """
    old_state = self._current_state
    self._current_state = new_state.lower()
    state_val = _STATE_MAP.get(self._current_state, CircuitState.CLOSED)
    _STATE_GAUGE.labels(name=self.name).set(state_val)
    _TRANSITIONS_TOTAL.labels(
        name=self.name, from_state=old_state, to_state=self._current_state
    ).inc()

obskit.resilience.circuit_breaker.instrument_circuit_breaker

Python
instrument_circuit_breaker(
    cb: Any, *, name: str
) -> ObskitCircuitBreakerListener

Attach obskit metrics to a pybreaker CircuitBreaker instance.

Parameters

cb : pybreaker.CircuitBreaker The circuit breaker to instrument. name : str Metric label — typically the resource protected by the breaker, e.g. "redis_commands" or "upstream_http".

Returns

ObskitCircuitBreakerListener The attached listener (useful for testing or manual removal).

Raises

TypeError If cb does not expose an add_listener method.

Example

.. code-block:: python

Text Only
import pybreaker
from obskit.resilience.circuit_breaker import instrument_circuit_breaker

cb = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
instrument_circuit_breaker(cb, name="redis_commands")
Source code in src/obskit/resilience/circuit_breaker.py
Python
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def instrument_circuit_breaker(cb: Any, *, name: str) -> ObskitCircuitBreakerListener:
    """Attach obskit metrics to a pybreaker ``CircuitBreaker`` instance.

    Parameters
    ----------
    cb : pybreaker.CircuitBreaker
        The circuit breaker to instrument.
    name : str
        Metric label — typically the resource protected by the breaker,
        e.g. ``"redis_commands"`` or ``"upstream_http"``.

    Returns
    -------
    ObskitCircuitBreakerListener
        The attached listener (useful for testing or manual removal).

    Raises
    ------
    TypeError
        If *cb* does not expose an ``add_listener`` method.

    Example
    -------
    .. code-block:: python

        import pybreaker
        from obskit.resilience.circuit_breaker import instrument_circuit_breaker

        cb = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
        instrument_circuit_breaker(cb, name="redis_commands")
    """
    listener = ObskitCircuitBreakerListener(name=name)
    if not hasattr(cb, "add_listener"):
        raise TypeError(
            "cb must be a pybreaker CircuitBreaker (or implement add_listener). "
            "Install pybreaker: pip install pybreaker"
        )
    cb.add_listener(listener)
    return listener