Skip to main content
Resources Architecture 9 min read

Rate Limiting: Algorithms and Implementation

From token buckets to sliding windows—understand rate limiting algorithms and implement them correctly to protect your API without frustrating users.

Every API needs rate limiting. Without it, a single misbehaving client can bring down your entire service. With naive rate limiting, legitimate users get blocked while attackers bypass your limits. The difference is in the algorithm.

Most developers implement rate limiting as “100 requests per minute” without thinking about edge cases. Then they discover users can make 100 requests in the first second and get blocked for 59 seconds, or that distributed systems can’t agree on counts, or that bursts of legitimate traffic trigger false positives.

I’ve implemented rate limiting for APIs handling millions of requests per day. Here’s what actually works, which algorithms to use when, and how to implement them without shooting yourself in the foot.

The Four Main Algorithms

Fixed Window: Count requests in fixed time intervals (e.g., 12:00-12:01, 12:01-12:02)

Sliding Window: Count requests in the last N seconds, recalculated for each request

Token Bucket: Requests consume tokens from a bucket that refills at a constant rate

Leaky Bucket: Requests queue up and process at a constant rate

Each has different trade-offs for accuracy, burstiness, memory usage, and implementation complexity.

Fixed Window: Simple but Flawed

Fixed window counting is the simplest approach:

import time
from collections import defaultdict

class FixedWindowRateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.windows = defaultdict(int)

    def allow_request(self, client_id):
        current_window = int(time.time() / self.window_seconds)
        key = f"{client_id}:{current_window}"

        if self.windows[key] >= self.max_requests:
            return False

        self.windows[key] += 1
        return True

# Usage: 100 requests per 60 seconds
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=60)

if limiter.allow_request(client_id="user123"):
    process_request()
else:
    return_rate_limit_error()

This works, but has a critical flaw: boundary bursts.

A user can make 100 requests at 12:00:59, then 100 more at 12:01:00—200 requests in 2 seconds without triggering the limit. This happens because the windows reset at fixed intervals.

When to use fixed window:

  • Internal rate limiting where exact limits don’t matter
  • Logging/metrics collection (not critical if slightly inaccurate)
  • Extremely high-throughput systems where overhead must be minimal

When NOT to use it:

  • Public APIs (vulnerable to boundary exploits)
  • Security-critical rate limiting (authentication endpoints)
  • Anywhere bursts are a problem

Sliding Window: More Accurate

Sliding window fixes the boundary problem by counting requests in the last N seconds from the current moment:

import time
from collections import defaultdict

class SlidingWindowRateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)

    def allow_request(self, client_id):
        now = time.time()
        cutoff = now - self.window_seconds

        # Remove old requests
        self.requests[client_id] = [
            timestamp for timestamp in self.requests[client_id]
            if timestamp > cutoff
        ]

        # Check if under limit
        if len(self.requests[client_id]) >= self.max_requests:
            return False

        # Record new request
        self.requests[client_id].append(now)
        return True

This solves the boundary problem. No matter when the user makes requests, they can’t exceed the limit in any rolling N-second window.

The downside: memory usage. You’re storing a timestamp for every request. At 100 requests/minute per user × 10,000 users, that’s 1 million timestamps in memory.

Sliding Window with Redis

For distributed systems, store request timestamps in Redis:

import redis
import time

class RedisSlidingWindowRateLimiter:
    def __init__(self, redis_client, max_requests, window_seconds):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds

    def allow_request(self, client_id):
        key = f"rate_limit:{client_id}"
        now = time.time()
        cutoff = now - self.window_seconds

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(key, 0, cutoff)

        # Count remaining entries
        pipe.zcard(key)

        # Add new request
        pipe.zadd(key, {str(now): now})

        # Set expiration
        pipe.expire(key, self.window_seconds)

        results = pipe.execute()
        request_count = results[1]

        return request_count < self.max_requests

This uses Redis sorted sets where the score is the timestamp. Old requests are automatically removed, and counting is fast.

When to use sliding window:

  • Public APIs where accuracy matters
  • User-facing features (download limits, API quotas)
  • Moderate request volumes (up to ~10,000 requests/second)

When NOT to use it:

  • Extremely high throughput (memory overhead becomes significant)
  • Distributed systems without Redis/similar (hard to implement)

Token Bucket: Allowing Controlled Bursts

Token bucket is the most flexible algorithm. Users get a bucket of tokens that refills at a constant rate. Each request consumes one token. If the bucket is empty, the request is denied.

import time

class TokenBucketRateLimiter:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity  # Max tokens
        self.refill_rate = refill_rate  # Tokens per second
        self.buckets = {}

    def allow_request(self, client_id):
        now = time.time()

        if client_id not in self.buckets:
            self.buckets[client_id] = {
                'tokens': self.capacity,
                'last_refill': now
            }

        bucket = self.buckets[client_id]

        # Calculate tokens to add since last refill
        time_passed = now - bucket['last_refill']
        tokens_to_add = time_passed * self.refill_rate

        # Refill bucket (capped at capacity)
        bucket['tokens'] = min(
            self.capacity,
            bucket['tokens'] + tokens_to_add
        )
        bucket['last_refill'] = now

        # Check if request can be allowed
        if bucket['tokens'] >= 1:
            bucket['tokens'] -= 1
            return True

        return False

# Usage: 10 token capacity, refills at 1 token/second
# Allows bursts of 10 requests, then 1 request/second
limiter = TokenBucketRateLimiter(capacity=10, refill_rate=1.0)

Token bucket allows controlled bursts. If a user hasn’t made requests for 10 seconds, they have 10 tokens available and can burst 10 requests immediately. Then they’re limited to the refill rate (1 request/second in this example).

This is perfect for APIs where occasional bursts are acceptable but sustained high rates are not.

Token Bucket with Redis

import redis
import time

class RedisTokenBucketRateLimiter:
    def __init__(self, redis_client, capacity, refill_rate):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate

    def allow_request(self, client_id):
        key = f"rate_limit:{client_id}"
        now = time.time()

        # Lua script ensures atomicity
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        local time_passed = now - last_refill
        local tokens_to_add = time_passed * refill_rate
        tokens = math.min(capacity, tokens + tokens_to_add)

        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            return 0
        end
        """

        result = self.redis.eval(
            lua_script,
            1,
            key,
            self.capacity,
            self.refill_rate,
            now
        )

        return result == 1

The Lua script ensures all operations are atomic—no race conditions even with multiple application servers.

When to use token bucket:

  • APIs where bursts are acceptable (file uploads, batch operations)
  • Systems with variable load patterns
  • Applications that benefit from smoothing traffic spikes

When NOT to use it:

  • Strict rate limits with no burst tolerance
  • Simple use cases where fixed/sliding window is sufficient

Leaky Bucket: Constant Rate Processing

Leaky bucket enforces a constant processing rate. Requests queue up and process at a fixed rate, like water leaking from a bucket with a hole in the bottom.

import time
from collections import deque

class LeakyBucketRateLimiter:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity  # Max queue size
        self.leak_rate = leak_rate  # Requests per second
        self.queues = {}

    def allow_request(self, client_id):
        now = time.time()

        if client_id not in self.queues:
            self.queues[client_id] = {
                'queue': deque(),
                'last_leak': now
            }

        bucket = self.queues[client_id]

        # Leak (process) requests since last check
        time_passed = now - bucket['last_leak']
        requests_to_leak = int(time_passed * self.leak_rate)

        for _ in range(min(requests_to_leak, len(bucket['queue']))):
            bucket['queue'].popleft()

        bucket['last_leak'] = now

        # Check if queue has space
        if len(bucket['queue']) >= self.capacity:
            return False

        # Add request to queue
        bucket['queue'].append(now)
        return True

Leaky bucket is stricter than token bucket—it enforces a constant rate regardless of past behavior. This prevents bursts entirely.

When to use leaky bucket:

  • Rate limiting third-party API calls (you want constant rate)
  • Queuing systems where smooth processing is important
  • Preventing thundering herds

When NOT to use it:

  • User-facing APIs (users expect burst capability)
  • Low-latency requirements (queuing adds latency)

Distributed Rate Limiting Challenges

Rate limiting gets complicated in distributed systems:

Problem 1: Shared state

If you have 5 application servers, each needs to know the count across all servers. Using local memory means each server has its own limit—effectively 5× the intended limit.

Solution: Centralized storage (Redis, Memcached)

Problem 2: Race conditions

Multiple servers incrementing the same counter simultaneously can cause incorrect counts.

Solution: Atomic operations (Redis INCR, Lua scripts, optimistic locking)

Problem 3: Network latency

Checking rate limits in Redis adds latency to every request.

Solution: Local caching with eventual consistency

class CachedRateLimiter:
    def __init__(self, redis_limiter, cache_ttl=1):
        self.redis_limiter = redis_limiter
        self.cache_ttl = cache_ttl
        self.local_cache = {}

    def allow_request(self, client_id):
        now = time.time()

        # Check local cache first
        if client_id in self.local_cache:
            cached_decision, cached_time = self.local_cache[client_id]
            if now - cached_time < self.cache_ttl:
                return cached_decision

        # Check Redis
        decision = self.redis_limiter.allow_request(client_id)

        # Cache decision
        self.local_cache[client_id] = (decision, now)

        return decision

This reduces Redis calls by 90%+ while accepting slight inaccuracy (users might get 101-105 requests instead of exactly 100).

Response Headers and User Experience

Good rate limiting communicates limits to clients:

HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1643673600
Retry-After: 60

{
  "error": "Rate limit exceeded",
  "message": "You have exceeded 100 requests per minute. Try again in 60 seconds."
}

These headers let clients:

  • Know their limits (X-RateLimit-Limit)
  • See how many requests they have left (X-RateLimit-Remaining)
  • Know when the limit resets (X-RateLimit-Reset)
  • Automatically retry after the right delay (Retry-After)

This prevents clients from hammering your API while rate-limited.

Multi-Tier Rate Limiting

Real systems often need multiple rate limits:

# Per-user limits
user_limiter = TokenBucketRateLimiter(capacity=100, refill_rate=10)

# Per-IP limits (prevent single IP from creating many users)
ip_limiter = SlidingWindowRateLimiter(max_requests=1000, window_seconds=60)

# Global limits (protect infrastructure)
global_limiter = FixedWindowRateLimiter(max_requests=100000, window_seconds=60)

def handle_request(request):
    # Check all limits
    if not global_limiter.allow_request("global"):
        return rate_limit_error("System overloaded", retry_after=60)

    if not ip_limiter.allow_request(request.ip):
        return rate_limit_error("Too many requests from your IP", retry_after=60)

    if not user_limiter.allow_request(request.user_id):
        return rate_limit_error("You've exceeded your rate limit", retry_after=10)

    return process_request(request)

This protects against:

  • Individual abusive users
  • Distributed attacks from many IPs
  • Overwhelming the entire system

Rate Limiting by Cost, Not Just Count

Some requests are more expensive than others. A database query that scans millions of rows shouldn’t count the same as a cache hit.

Cost-based rate limiting:

class CostBasedRateLimiter:
    def __init__(self, max_cost, refill_rate):
        self.limiter = TokenBucketRateLimiter(
            capacity=max_cost,
            refill_rate=refill_rate
        )

    def allow_request(self, client_id, cost):
        # Deduct 'cost' tokens instead of 1
        bucket = self.limiter.buckets.get(client_id, {'tokens': self.limiter.capacity})

        if bucket['tokens'] >= cost:
            bucket['tokens'] -= cost
            return True

        return False

# Usage
COST_SIMPLE_QUERY = 1
COST_COMPLEX_QUERY = 10
COST_EXPORT = 50

limiter = CostBasedRateLimiter(max_cost=100, refill_rate=10)

# Expensive request consumes more tokens
if limiter.allow_request(user_id, COST_COMPLEX_QUERY):
    run_complex_query()

This prevents users from exhausting your resources with expensive operations.

What We Actually Use

For most projects, we implement:

  • Token bucket for API endpoints (allows reasonable bursts, smooth average rate)
  • Sliding window for authentication (strict limits, no burst tolerance for security)
  • Redis-backed distributed rate limiting (consistent limits across servers)
  • Multi-tier limits (per-user, per-IP, global)
  • Cost-based limiting for expensive operations (database queries, exports, AI API calls)

We avoid:

  • Fixed window (boundary exploit risk)
  • Leaky bucket (too strict for most use cases)
  • In-memory rate limiting for distributed systems (causes 5-10× actual limits)

The algorithm matters less than the implementation details—atomic operations, proper headers, and clear error messages make the difference between rate limiting that protects your API and rate limiting that frustrates your users.


Need help implementing rate limiting for your API or debugging existing limits? We can help.

Have a Project
In Mind?

Let's discuss how we can help you build reliable, scalable systems.