#!/usr/bin/env python3
#
# rate_limiter.py
"""
Rate limiters for making calls to external APIs in a polite manner.
"""
#
# Copyright (c) 2020 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
# Based on CacheControl
# Copyright 2015 Eric Larson
# https://github.com/ionrock/cachecontrol
# Apache-2.0 Licensed
#
# stdlib
import datetime
import logging
import shutil
import time
import warnings
import zlib
from functools import wraps
from typing import Any, Callable, Dict, Optional
# 3rd party
import appdirs # type: ignore
# import codetiming
import requests
from cachecontrol import CacheControl, CacheControlAdapter # type: ignore
from cachecontrol.caches.file_cache import FileCache # type: ignore
from cachecontrol.heuristics import ExpiresAfter # type: ignore
from domdf_python_tools.paths import PathPlus
__all__ = [
"rate_limit",
"RateLimitAdapter",
"HTTPCache",
]
# 3rd party
from requests import PreparedRequest
def rate_limit(min_time: float = 0.2, logger: Optional[logging.Logger] = None) -> Callable[[Callable], Any]:
"""
Decorator to force a function to run no less than ``min_time`` seconds after it last ran.
Used for rate limiting.
:param min_time: The minimum interval between subsequent runs of the decorated function.
:default min_time: ``0.2``, which gives a maximum rate of 5 calls per second.
:param logger: Optional logger to log information about requests to. Defaults to the root logger.
:no default logger:
"""
if logger is None:
# Log to root logger
logger_ = logging.getLogger()
else: # pragma: no cover
logger_ = logger
def decorator(func: Callable) -> Callable:
function_name = func.__name__
last_ran_message = f"{function_name}: Last ran %d seconds ago."
waiting_message = f"{function_name}: Waiting %d seconds."
@wraps(func)
def rate_limit_wrapper(*args, **kwargs):
now = datetime.datetime.now()
time_since_last_run = (now - rate_limit_wrapper.last_run_time).total_seconds() # type: ignore
logger_.debug(last_ran_message % time_since_last_run)
if time_since_last_run < min_time:
wait_time = min_time - time_since_last_run
logger_.debug(waiting_message % wait_time)
time.sleep(wait_time)
rate_limit_wrapper.last_run_time = now # type: ignore
res = func(*args, **kwargs)
return res
rate_limit_wrapper.last_run_time = datetime.datetime.fromtimestamp(0) # type: ignore
return rate_limit_wrapper
return decorator
class RateLimitAdapter(CacheControlAdapter):
"""
Custom :class:`CacheControl.CacheControlAdapter` to limit the rate of
requests to 5 per second.
:param cache:
:param cache_etags:
:param controller_class:
:param serializer:
:param heuristic:
:param cacheable_methods:
"""
def send(self, request: PreparedRequest, cacheable_methods=None, **kwargs) -> requests.Response:
"""
Send a request. Use the request information to see if it
exists in the cache and cache the response if we need to and can.
:param request: The :class:`requests.models.PreparedRequest` being sent.
:param cacheable_methods:
:param kwargs: Additional arguments take by :meth:`requests.adapters.HTTPAdapter.send`.
"""
cacheable = cacheable_methods or self.cacheable_methods
if request.method in cacheable:
try:
cached_response = self.controller.cached_request(request)
except zlib.error: # pragma: no cover
cached_response = None
if cached_response:
return self.build_response(request, cached_response, from_cache=True)
# check for etags and add headers if appropriate
request.headers.update(self.controller.conditional_headers(request))
resp = self.rate_limited_send(request, **kwargs)
return resp
@rate_limit(0.2)
def rate_limited_send(self, *args, **kwargs) -> requests.Response:
"""
Wrapper around :meth:`CacheControl.CacheControlAdapter.send` to limit the
rate of requests.
"""
return super(CacheControlAdapter, self).send(*args, **kwargs)
class HTTPCache:
"""
Cache HTTP requests for up to 28 days and limit the rate of requests to no more than 5/second.
:param app_name: The name of the app. This dictates the name of the cache directory.
:param expires_after: The maximum time to cache responses for.
"""
app_name: str #: The name of the app. This dictates the name of the cache directory.
cache_dir: PathPlus #: The location of the cache directory on disk.
caches: Dict[str, Dict[str, Any]] #: Mapping of function names to their caches.
def __init__(self, app_name: str, expires_after: datetime.timedelta = datetime.timedelta(days=28)):
self.app_name: str = str(app_name)
self.cache_dir = PathPlus(appdirs.user_cache_dir(self.app_name))
self.cache_dir.maybe_make(parents=True)
self.session: requests.Session = CacheControl(
sess=requests.Session(),
cache=FileCache(self.cache_dir),
heuristic=ExpiresAfter(
days=expires_after.days,
seconds=expires_after.seconds,
microseconds=expires_after.microseconds,
),
adapter_class=RateLimitAdapter
)
def clear(self) -> bool:
"""
Clear the cache.
:returns: True to indicate success. False otherwise.
"""
try:
shutil.rmtree(self.cache_dir)
return True
except Exception as e: # pragma: no cover
warnings.warn(f"Could not remove cache. The error was: {e}")
return False
#
# class ForceMinTime:
# """
# Decorator to force a function to take an amount of time to run.
# Used for rate limiting to external APIs.
#
# :param min_time: The minimum run time in seconds
# :type min_time: float
# """
#
# def __init__(self, min_time: float):
# """
# If there are decorator arguments, the function
# to be decorated is not passed to the constructor!
# """
#
# self.min_time = min_time
#
# def __call__(self, func):
# """
# If there are decorator arguments, __call__() is only called
# once, as part of the decoration process! You can only give
# it a single argument, which is the function object.
# """
#
# def wrapper(*args, **kwargs):
# with codetiming.Timer(logger=None) as t:
# r = func(*args, **kwargs)
#
# sleep_time = self.min_time - t.last
# print(t.last)
#
# if sleep_time > 0:
# time.sleep(sleep_time)
#
# return r
#
# return wrapper