Every API needs rate limiting. Without it, a single misbehaving client can bring down your entire service. With naive rate limiting, legitimate users get blocked while attackers bypass your limits. The difference is in the algorithm.
Most developers implement rate limiting as “100 requests per minute” without thinking about edge cases. Then they discover users can make 100 requests in the first second and get blocked for 59 seconds, or that distributed systems can’t agree on counts, or that bursts of legitimate traffic trigger false positives.
I’ve implemented rate limiting for APIs handling millions of requests per day. Here’s what actually works, which algorithms to use when, and how to implement them without shooting yourself in the foot.
The Four Main Algorithms
Fixed Window: Count requests in fixed time intervals (e.g., 12:00-12:01, 12:01-12:02)
Sliding Window: Count requests in the last N seconds, recalculated for each request
Token Bucket: Requests consume tokens from a bucket that refills at a constant rate
Leaky Bucket: Requests queue up and process at a constant rate
Each has different trade-offs for accuracy, burstiness, memory usage, and implementation complexity.
Fixed Window: Simple but Flawed
Fixed window counting is the simplest approach:
import time
from collections import defaultdict
class FixedWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.windows = defaultdict(int)
def allow_request(self, client_id):
current_window = int(time.time() / self.window_seconds)
key = f"{client_id}:{current_window}"
if self.windows[key] >= self.max_requests:
return False
self.windows[key] += 1
return True
# Usage: 100 requests per 60 seconds
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=60)
if limiter.allow_request(client_id="user123"):
process_request()
else:
return_rate_limit_error()
This works, but has a critical flaw: boundary bursts.
A user can make 100 requests at 12:00:59, then 100 more at 12:01:00—200 requests in 2 seconds without triggering the limit. This happens because the windows reset at fixed intervals.
When to use fixed window:
- Internal rate limiting where exact limits don’t matter
- Logging/metrics collection (not critical if slightly inaccurate)
- Extremely high-throughput systems where overhead must be minimal
When NOT to use it:
- Public APIs (vulnerable to boundary exploits)
- Security-critical rate limiting (authentication endpoints)
- Anywhere bursts are a problem
Sliding Window: More Accurate
Sliding window fixes the boundary problem by counting requests in the last N seconds from the current moment:
import time
from collections import defaultdict
class SlidingWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def allow_request(self, client_id):
now = time.time()
cutoff = now - self.window_seconds
# Remove old requests
self.requests[client_id] = [
timestamp for timestamp in self.requests[client_id]
if timestamp > cutoff
]
# Check if under limit
if len(self.requests[client_id]) >= self.max_requests:
return False
# Record new request
self.requests[client_id].append(now)
return True
This solves the boundary problem. No matter when the user makes requests, they can’t exceed the limit in any rolling N-second window.
The downside: memory usage. You’re storing a timestamp for every request. At 100 requests/minute per user × 10,000 users, that’s 1 million timestamps in memory.
Sliding Window with Redis
For distributed systems, store request timestamps in Redis:
import redis
import time
class RedisSlidingWindowRateLimiter:
def __init__(self, redis_client, max_requests, window_seconds):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def allow_request(self, client_id):
key = f"rate_limit:{client_id}"
now = time.time()
cutoff = now - self.window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, cutoff)
# Count remaining entries
pipe.zcard(key)
# Add new request
pipe.zadd(key, {str(now): now})
# Set expiration
pipe.expire(key, self.window_seconds)
results = pipe.execute()
request_count = results[1]
return request_count < self.max_requests
This uses Redis sorted sets where the score is the timestamp. Old requests are automatically removed, and counting is fast.
When to use sliding window:
- Public APIs where accuracy matters
- User-facing features (download limits, API quotas)
- Moderate request volumes (up to ~10,000 requests/second)
When NOT to use it:
- Extremely high throughput (memory overhead becomes significant)
- Distributed systems without Redis/similar (hard to implement)
Token Bucket: Allowing Controlled Bursts
Token bucket is the most flexible algorithm. Users get a bucket of tokens that refills at a constant rate. Each request consumes one token. If the bucket is empty, the request is denied.
import time
class TokenBucketRateLimiter:
def __init__(self, capacity, refill_rate):
self.capacity = capacity # Max tokens
self.refill_rate = refill_rate # Tokens per second
self.buckets = {}
def allow_request(self, client_id):
now = time.time()
if client_id not in self.buckets:
self.buckets[client_id] = {
'tokens': self.capacity,
'last_refill': now
}
bucket = self.buckets[client_id]
# Calculate tokens to add since last refill
time_passed = now - bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
# Refill bucket (capped at capacity)
bucket['tokens'] = min(
self.capacity,
bucket['tokens'] + tokens_to_add
)
bucket['last_refill'] = now
# Check if request can be allowed
if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False
# Usage: 10 token capacity, refills at 1 token/second
# Allows bursts of 10 requests, then 1 request/second
limiter = TokenBucketRateLimiter(capacity=10, refill_rate=1.0)
Token bucket allows controlled bursts. If a user hasn’t made requests for 10 seconds, they have 10 tokens available and can burst 10 requests immediately. Then they’re limited to the refill rate (1 request/second in this example).
This is perfect for APIs where occasional bursts are acceptable but sustained high rates are not.
Token Bucket with Redis
import redis
import time
class RedisTokenBucketRateLimiter:
def __init__(self, redis_client, capacity, refill_rate):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
def allow_request(self, client_id):
key = f"rate_limit:{client_id}"
now = time.time()
# Lua script ensures atomicity
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
local time_passed = now - last_refill
local tokens_to_add = time_passed * refill_rate
tokens = math.min(capacity, tokens + tokens_to_add)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return 1
else
return 0
end
"""
result = self.redis.eval(
lua_script,
1,
key,
self.capacity,
self.refill_rate,
now
)
return result == 1
The Lua script ensures all operations are atomic—no race conditions even with multiple application servers.
When to use token bucket:
- APIs where bursts are acceptable (file uploads, batch operations)
- Systems with variable load patterns
- Applications that benefit from smoothing traffic spikes
When NOT to use it:
- Strict rate limits with no burst tolerance
- Simple use cases where fixed/sliding window is sufficient
Leaky Bucket: Constant Rate Processing
Leaky bucket enforces a constant processing rate. Requests queue up and process at a fixed rate, like water leaking from a bucket with a hole in the bottom.
import time
from collections import deque
class LeakyBucketRateLimiter:
def __init__(self, capacity, leak_rate):
self.capacity = capacity # Max queue size
self.leak_rate = leak_rate # Requests per second
self.queues = {}
def allow_request(self, client_id):
now = time.time()
if client_id not in self.queues:
self.queues[client_id] = {
'queue': deque(),
'last_leak': now
}
bucket = self.queues[client_id]
# Leak (process) requests since last check
time_passed = now - bucket['last_leak']
requests_to_leak = int(time_passed * self.leak_rate)
for _ in range(min(requests_to_leak, len(bucket['queue']))):
bucket['queue'].popleft()
bucket['last_leak'] = now
# Check if queue has space
if len(bucket['queue']) >= self.capacity:
return False
# Add request to queue
bucket['queue'].append(now)
return True
Leaky bucket is stricter than token bucket—it enforces a constant rate regardless of past behavior. This prevents bursts entirely.
When to use leaky bucket:
- Rate limiting third-party API calls (you want constant rate)
- Queuing systems where smooth processing is important
- Preventing thundering herds
When NOT to use it:
- User-facing APIs (users expect burst capability)
- Low-latency requirements (queuing adds latency)
Distributed Rate Limiting Challenges
Rate limiting gets complicated in distributed systems:
Problem 1: Shared state
If you have 5 application servers, each needs to know the count across all servers. Using local memory means each server has its own limit—effectively 5× the intended limit.
Solution: Centralized storage (Redis, Memcached)
Problem 2: Race conditions
Multiple servers incrementing the same counter simultaneously can cause incorrect counts.
Solution: Atomic operations (Redis INCR, Lua scripts, optimistic locking)
Problem 3: Network latency
Checking rate limits in Redis adds latency to every request.
Solution: Local caching with eventual consistency
class CachedRateLimiter:
def __init__(self, redis_limiter, cache_ttl=1):
self.redis_limiter = redis_limiter
self.cache_ttl = cache_ttl
self.local_cache = {}
def allow_request(self, client_id):
now = time.time()
# Check local cache first
if client_id in self.local_cache:
cached_decision, cached_time = self.local_cache[client_id]
if now - cached_time < self.cache_ttl:
return cached_decision
# Check Redis
decision = self.redis_limiter.allow_request(client_id)
# Cache decision
self.local_cache[client_id] = (decision, now)
return decision
This reduces Redis calls by 90%+ while accepting slight inaccuracy (users might get 101-105 requests instead of exactly 100).
Response Headers and User Experience
Good rate limiting communicates limits to clients:
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1643673600
Retry-After: 60
{
"error": "Rate limit exceeded",
"message": "You have exceeded 100 requests per minute. Try again in 60 seconds."
}
These headers let clients:
- Know their limits (
X-RateLimit-Limit) - See how many requests they have left (
X-RateLimit-Remaining) - Know when the limit resets (
X-RateLimit-Reset) - Automatically retry after the right delay (
Retry-After)
This prevents clients from hammering your API while rate-limited.
Multi-Tier Rate Limiting
Real systems often need multiple rate limits:
# Per-user limits
user_limiter = TokenBucketRateLimiter(capacity=100, refill_rate=10)
# Per-IP limits (prevent single IP from creating many users)
ip_limiter = SlidingWindowRateLimiter(max_requests=1000, window_seconds=60)
# Global limits (protect infrastructure)
global_limiter = FixedWindowRateLimiter(max_requests=100000, window_seconds=60)
def handle_request(request):
# Check all limits
if not global_limiter.allow_request("global"):
return rate_limit_error("System overloaded", retry_after=60)
if not ip_limiter.allow_request(request.ip):
return rate_limit_error("Too many requests from your IP", retry_after=60)
if not user_limiter.allow_request(request.user_id):
return rate_limit_error("You've exceeded your rate limit", retry_after=10)
return process_request(request)
This protects against:
- Individual abusive users
- Distributed attacks from many IPs
- Overwhelming the entire system
Rate Limiting by Cost, Not Just Count
Some requests are more expensive than others. A database query that scans millions of rows shouldn’t count the same as a cache hit.
Cost-based rate limiting:
class CostBasedRateLimiter:
def __init__(self, max_cost, refill_rate):
self.limiter = TokenBucketRateLimiter(
capacity=max_cost,
refill_rate=refill_rate
)
def allow_request(self, client_id, cost):
# Deduct 'cost' tokens instead of 1
bucket = self.limiter.buckets.get(client_id, {'tokens': self.limiter.capacity})
if bucket['tokens'] >= cost:
bucket['tokens'] -= cost
return True
return False
# Usage
COST_SIMPLE_QUERY = 1
COST_COMPLEX_QUERY = 10
COST_EXPORT = 50
limiter = CostBasedRateLimiter(max_cost=100, refill_rate=10)
# Expensive request consumes more tokens
if limiter.allow_request(user_id, COST_COMPLEX_QUERY):
run_complex_query()
This prevents users from exhausting your resources with expensive operations.
What We Actually Use
For most projects, we implement:
- Token bucket for API endpoints (allows reasonable bursts, smooth average rate)
- Sliding window for authentication (strict limits, no burst tolerance for security)
- Redis-backed distributed rate limiting (consistent limits across servers)
- Multi-tier limits (per-user, per-IP, global)
- Cost-based limiting for expensive operations (database queries, exports, AI API calls)
We avoid:
- Fixed window (boundary exploit risk)
- Leaky bucket (too strict for most use cases)
- In-memory rate limiting for distributed systems (causes 5-10× actual limits)
The algorithm matters less than the implementation details—atomic operations, proper headers, and clear error messages make the difference between rate limiting that protects your API and rate limiting that frustrates your users.
Need help implementing rate limiting for your API or debugging existing limits? We can help.