Source code for request_limiter.strategy
import abc
import time
from typing import Callable, Optional
[docs]class LimitStrategy(metaclass=abc.ABCMeta):
"""
A request limit strategy abstract class
"""
[docs] @abc.abstractmethod
def allow(self, key: Optional[str] = None) -> bool:
"""
Checks if it can allocate one request
:return: True if it can allocate else False
"""
pass
[docs] @abc.abstractmethod
def get_remaining(self, key: Optional[str] = None) -> float:
"""
Returns the remaining seconds for the next request window
:return: time in seconds
"""
pass
[docs] @abc.abstractmethod
def clean(self):
"""
Clean expired keys from storage
"""
pass
[docs]class LimitedIntervalStrategy(LimitStrategy):
DEFAULT_KEY = 'default'
def __init__(self, requests: Optional[int] = 100, interval: Optional[int] = 24 * 60 * 60,
now: Optional[Callable[[], float]] = time.monotonic, storage: Optional[dict] = None):
self.requests = requests
self.interval = interval
self.now = now
self.storage = storage if storage is not None else {}
[docs] def allow(self, key: Optional[str] = None) -> bool:
"""
Allocates one request in current window for the key
:param key: Storage key
:return: True if allocated else False
"""
key = key or self.DEFAULT_KEY
remaining_time = self.get_remaining(key)
if remaining_time <= 0:
# new interval window, reset strategy
self._reset(key)
# increment request count and check if it pass the limit
self.storage[key]['request_count'] += 1
if self.storage[key]['request_count'] > self.requests:
return False
return True
[docs] def get_remaining(self, key: Optional[str] = None) -> float:
"""
Returns the remaining seconds for the next request window
:param key: Storage key
:return: Remaining seconds
"""
key = key or self.DEFAULT_KEY
self._init(key)
store = self.storage[key]
spent_time = self.now() - store['reset_time']
return self.interval - spent_time
[docs] def clean(self):
"""
Remove expired keys from the storage
"""
while True:
expired_keys = []
for key, store in self.storage.items():
if self.now() - store['reset_time'] > self.interval and key != self.DEFAULT_KEY:
expired_keys.append(key)
[self.storage.pop(key) for key in expired_keys] # delete keys
time.sleep(self.interval) # Check every interval period
def _init(self, key: str):
"""
Initialize storage for a key, if it is missing
"""
if key not in self.storage:
self.storage[key] = {}
self._reset(key)
def _reset(self, key: str):
"""
Reset the storage for a key
"""
self.storage[key]['request_count'] = 0
self.storage[key]['reset_time'] = self.now()