Skip to content

Redis Cache Instrumentation

obskit wraps async Redis clients so every command is automatically timed and counted with Prometheus metrics — no manual instrumentation or pipeline changes required.

Quick Start

Python
import redis.asyncio as aioredis
from obskit import instrument_redis_client

redis_client = aioredis.from_url("redis://localhost:6379", decode_responses=True)
redis_client = instrument_redis_client(redis_client, name="engagement-cache")

# All commands are now instrumented:
await redis_client.get("my-key")
await redis_client.set("my-key", "value", ex=60)
await redis_client.hgetall("hash-key")

The wrapper is transparent — every attribute on the underlying client is accessible through it. Only async command methods are wrapped with instrumentation; synchronous helpers (like .connection_pool) pass through unchanged.

instrument_redis_client

New in v1.9.0.

Python
from obskit import instrument_redis_client

redis_client = instrument_redis_client(redis_client, name="rate-limiter-store")

Parameters

Parameter Default Description
client required Any async Redis client (e.g. redis.asyncio.Redis, aioredis.Redis)
name "default" Label value for the name dimension in all metrics. Use a logical name such as "session-cache" or "rate-limiter-store".

Emitted metrics

Metric Type Labels Description
redis_commands_total{name, command, status} Counter name, command, status Total commands executed. status is "success" or "error".
redis_command_duration_seconds{name, command} Histogram name, command Round-trip latency per command.
redis_command_errors_total{name, command} Counter name, command Commands that raised an exception (dedicated error counter).

command label values

The command label is the Redis method name as called on the client — "get", "set", "hgetall", "zadd", "expire", etc.

Connection Pool Monitoring

Call update_pool_stats() periodically to refresh the pool gauge:

Python
redis_client = instrument_redis_client(
    aioredis.from_url("redis://localhost:6379"),
    name="engagement-cache",
)

# In a background task or health check:
redis_client.update_pool_stats()
Metric Type Labels Description
redis_pool_connections{name, state} Gauge name, state Pool connections by state: "available" or "in_use".

The gauge is not updated on every command because pool introspection differs across redis-py versions.

Multiple Redis Clients

Use a distinct name per logical cache to keep metrics separate:

Python
from obskit import instrument_redis_client

session_cache = instrument_redis_client(
    aioredis.from_url("redis://sessions:6379"),
    name="session-cache",
)

rate_limiter_store = instrument_redis_client(
    aioredis.from_url("redis://ratelimit:6379"),
    name="rate-limiter-store",
)

event_retry_store = instrument_redis_client(
    aioredis.from_url("redis://events:6379"),
    name="event-retry-store",
)

Legacy instrument_redis

instrument_redis (the original function name) is still available and equivalent:

Python
from obskit.integrations.cache import instrument_redis

redis_client = instrument_redis(redis_client, name="engagement-cache")

Prefer instrument_redis_client in new code — it is available at the top-level obskit namespace and has a more descriptive name.

Grafana alert examples

PromQL
# Alert when Redis error rate rises
rate(redis_command_errors_total[5m]) > 0.1

# P99 GET latency exceeds 50 ms
histogram_quantile(0.99,
  rate(redis_command_duration_seconds_bucket{command="get"}[5m])
) > 0.05

# Connection pool exhaustion
redis_pool_connections{state="available"} == 0

API Reference

obskit.integrations.cache.instrument_redis_client

Python
instrument_redis_client(
    client: Any, name: str = "default"
) -> InstrumentedRedis

Wrap an async Redis client with Prometheus instrumentation.

Emits per-command latency histogram and error counter, making Redis performance directly visible in Grafana dashboards.

Parameters

client : An async Redis client (redis.asyncio.Redis or compatible). name : str Human-readable label identifying this client's role, e.g. "engagement-cache", "rate-limit-store". Default: "default".

Returns

InstrumentedRedis A transparent proxy that records the following metrics on every async command:

Text Only
* ``redis_command_duration_seconds{name, command}`` — latency histogram
* ``redis_command_errors_total{name, command}`` — dedicated error counter
* ``redis_commands_total{name, command, status}`` — full outcome counter

Example

import redis.asyncio as aioredis from obskit.integrations.cache import instrument_redis_client

r = aioredis.from_url("redis://redis:6379") r = instrument_redis_client(r, name="engagement-cache") await r.get("key") # metrics emitted automatically

Source code in src/obskit/integrations/cache.py
Python
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def instrument_redis_client(client: Any, name: str = "default") -> InstrumentedRedis:
    """Wrap an async Redis client with Prometheus instrumentation.

    Emits per-command latency histogram and error counter, making Redis
    performance directly visible in Grafana dashboards.

    Parameters
    ----------
    client :
        An async Redis client (``redis.asyncio.Redis`` or compatible).
    name : str
        Human-readable label identifying this client's role, e.g.
        ``"engagement-cache"``, ``"rate-limit-store"``.
        Default: ``"default"``.

    Returns
    -------
    InstrumentedRedis
        A transparent proxy that records the following metrics on every
        async command:

        * ``redis_command_duration_seconds{name, command}`` — latency histogram
        * ``redis_command_errors_total{name, command}`` — dedicated error counter
        * ``redis_commands_total{name, command, status}`` — full outcome counter

    Example
    -------
    >>> import redis.asyncio as aioredis
    >>> from obskit.integrations.cache import instrument_redis_client
    >>>
    >>> r = aioredis.from_url("redis://redis:6379")
    >>> r = instrument_redis_client(r, name="engagement-cache")
    >>> await r.get("key")   # metrics emitted automatically
    """
    return InstrumentedRedis(client, name)

obskit.integrations.cache.InstrumentedRedis

Transparent async Redis proxy that records Prometheus metrics.

Do not instantiate directly — use :func:instrument_redis.

Parameters

client : The underlying async Redis client (redis.asyncio.Redis or any compatible client with async command methods). name : str Label value used in all metric series. Choose a human-readable name that identifies the role of this client (e.g. "session-cache", "rate-limit-store").

Source code in src/obskit/integrations/cache.py
Python
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class InstrumentedRedis:
    """Transparent async Redis proxy that records Prometheus metrics.

    Do not instantiate directly — use :func:`instrument_redis`.

    Parameters
    ----------
    client :
        The underlying async Redis client (``redis.asyncio.Redis`` or
        any compatible client with async command methods).
    name : str
        Label value used in all metric series.  Choose a human-readable
        name that identifies the role of this client (e.g.
        ``"session-cache"``, ``"rate-limit-store"``).
    """

    __slots__ = ("_client", "_name")

    def __init__(self, client: Any, name: str) -> None:
        self._client = client
        self._name = name

    # ------------------------------------------------------------------
    # Transparent attribute proxy
    # ------------------------------------------------------------------

    def __getattr__(self, attr: str) -> Any:
        original = getattr(self._client, attr)

        # Only wrap async callables — non-async attrs (properties, sync
        # helpers, the connection pool itself) pass through unchanged.
        if not asyncio.iscoroutinefunction(original):
            return original

        name = self._name  # local ref for closure

        @functools.wraps(original)
        async def _instrumented(*args: Any, **kwargs: Any) -> Any:
            start = time.perf_counter()
            status = "success"
            try:
                return await original(*args, **kwargs)
            except Exception:
                status = "error"
                REDIS_COMMAND_ERRORS_TOTAL.labels(name=name, command=attr).inc()
                raise
            finally:
                elapsed = time.perf_counter() - start
                REDIS_COMMANDS_TOTAL.labels(
                    name=name, command=attr, status=status
                ).inc()
                REDIS_COMMAND_DURATION_SECONDS.labels(
                    name=name, command=attr
                ).observe(elapsed)

        return _instrumented

    # ------------------------------------------------------------------
    # Pool gauge helper
    # ------------------------------------------------------------------

    def update_pool_stats(self) -> None:
        """Refresh the ``redis_pool_connections`` gauge from the pool state.

        Call periodically (e.g. from a background task or after large
        bursts) to keep the gauge current.  Silently no-ops if the
        underlying client does not expose a ``connection_pool`` attribute
        or if the pool API is unavailable.
        """
        try:
            pool = self._client.connection_pool
            available = getattr(pool, "_available_connections", None)
            in_use = getattr(pool, "_in_use_connections", None)

            if available is not None:
                REDIS_POOL_CONNECTIONS.labels(
                    name=self._name, state="available"
                ).set(len(available))
            if in_use is not None:
                REDIS_POOL_CONNECTIONS.labels(
                    name=self._name, state="in_use"
                ).set(len(in_use))
        except Exception:  # pragma: no cover
            pass

update_pool_stats

Python
update_pool_stats() -> None

Refresh the redis_pool_connections gauge from the pool state.

Call periodically (e.g. from a background task or after large bursts) to keep the gauge current. Silently no-ops if the underlying client does not expose a connection_pool attribute or if the pool API is unavailable.

Source code in src/obskit/integrations/cache.py
Python
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def update_pool_stats(self) -> None:
    """Refresh the ``redis_pool_connections`` gauge from the pool state.

    Call periodically (e.g. from a background task or after large
    bursts) to keep the gauge current.  Silently no-ops if the
    underlying client does not expose a ``connection_pool`` attribute
    or if the pool API is unavailable.
    """
    try:
        pool = self._client.connection_pool
        available = getattr(pool, "_available_connections", None)
        in_use = getattr(pool, "_in_use_connections", None)

        if available is not None:
            REDIS_POOL_CONNECTIONS.labels(
                name=self._name, state="available"
            ).set(len(available))
        if in_use is not None:
            REDIS_POOL_CONNECTIONS.labels(
                name=self._name, state="in_use"
            ).set(len(in_use))
    except Exception:  # pragma: no cover
        pass