from datetime import datetime
import time
import base64
import logging
from lzma import LZMAError
from functools import lru_cache
import requests
from requests import RequestException
import circleparse
import ossapi
from circleguard.replay_info import ReplayInfo
from circleguard.enums import Error
from circleguard.mod import Mod
from circleguard.exceptions import (InvalidArgumentsException, APIException, CircleguardException,
RatelimitException, InvalidKeyException, ReplayUnavailableException, UnknownAPIException,
InvalidJSONException, NoInfoAvailableException)
from circleguard.utils import TRACE
from circleguard.span import Span
[docs]def request(function):
"""
A decorator that handles :mod:`requests` and api related exceptions, as
well as resetting our ratelimits if appropriate.
Parameters
----------
function: callable
The function to wrap.
Notes
-----
Should be wrapped around any function that makes a request; to the api or
otherwise.
If it has been more than :data:`Loader.RATELIMIT_RESET` since
:data:`Loader.start_time`, sets :data:`Loader.start_time` to be
:func:`datetime.now <datetime.datetime.now>`.
"""
def wrapper(*args, **kwargs):
difference = datetime.now() - Loader.start_time
if difference.seconds > Loader.RATELIMIT_RESET:
Loader.start_time = datetime.now()
# catch them exceptions boy
ret = None
self = args[0]
try:
ret = function(*args, **kwargs)
except RatelimitException:
self._enforce_ratelimit()
# wrap function with the decorator then call decorator
ret = request(function)(*args, **kwargs)
except InvalidJSONException as e:
self.log.warning("Invalid json exception: {}. API likely having issues; sleeping for 3 seconds then retrying".format(e))
time.sleep(3)
ret = request(function)(*args, **kwargs)
except RequestException as e:
self.log.warning("Request exception: {}. Likely a network issue; sleeping for 5 seconds then retrying".format(e))
time.sleep(5)
ret = request(function)(*args, **kwargs)
except ReplayUnavailableException as e:
self.log.warning("We expected a replay from the api, but it was unable to deliver it: {}".format(e))
ret = None
return ret
return wrapper
[docs]def check_cache(function):
"""
A decorator that checks if the passed
:class:`~circleguard.replay_info.ReplayInfo` has its replay cached. If so,
returns a :class:`~circleguard.loadable.Replay` instance from the cached data.
Otherwise, calls and returns the `function` as normal.
Parameters
----------
function: callable
The function to wrap.
Notes
-----
``self`` and ``replay_info`` **MUST** be the first and second arguments to
the function, respectively.
Returns
-------
:class:`~circleguard.loadable.Replay` or Unknown:
A :class:`~circleguard.loadable.Replay` instance from the cached data
if it was cached, or the return value of the function if not.
"""
def wrapper(*args, **kwargs):
self = args[0]
replay_info = args[1]
if self.cacher is None:
return function(*args, **kwargs)
decompressed_lzma = self.cacher.check_cache(replay_info.map_id, replay_info.user_id, replay_info.mods)
if decompressed_lzma:
replay_data = circleparse.parse_replay(decompressed_lzma, pure_lzma=True, decompressed_lzma=True).play_data
return replay_data
else:
return function(*args, **kwargs)
return wrapper
[docs]class Loader():
"""
Manages interactions with the osu api, using the :mod:`ossapi` wrapper.
Parameters
----------
key: str
A valid api key. Can be retrieved from https://osu.ppy.sh/p/api/.
cacher: :class:`~circleguard.cacher.Cacher`
A :class:`~circleguard.cacher.Cacher` instance to manage
replay loading/caching. If `None`, replays will not be loaded from
the cache or cached to the database.
Notes
-----
If the api ratelimits the key, we wait until our ratelimits are refreshed
and retry the request. Because the api does not provide the time until the
next refresh (and we do not use exponential backoff or another retry
strategy), if the key is ratelimited because of an interaction not managed
by this class, the class may wait more time than necessary for the key to
refresh.
"""
# the maximum number of replay info available through the respective api
# calls. Note that osu! stores at least the top 1000 replays, but does not
# make these discoverable unless you know the exact user id, map id, and
# mods of the replay.
MAX_MAP_SPAN = Span("1-100")
MAX_USER_SPAN = Span("1-100")
# time in seconds until the api refreshes our ratelimits
RATELIMIT_RESET = 60
# when we started our requests cycle
start_time = datetime.min
def __init__(self, key, cacher=None):
self.log = logging.getLogger(__name__)
self.api = ossapi.ossapi(key)
self.cacher = cacher
@request
def replay_info(self, map_id, span=None, user_id=None, mods=None, limit=True):
"""
Retrieves replay infos from a map's leaderboard.
Parameters
----------
map_id: int
The map id to retrieve replay info for.
span: Span
A comma separated list of ranges of top replays on the map to
retrieve. ``span="1-3,6,2-4"`` -> replays in the range
``[1,2,3,4,6]``.
user_id: int
If passed, only retrieve replay info on ``map_id`` for this user.
Note that this is not necessarily limited to just the user's top
score on the map. See ``limit``.
mods: :class:`~.ModCombination`
If passed, will only retrieve replay infos for scores that were
played with the given mods.
limit: bool
Whether to limit to only one response. Only has an effect if
``user_id`` is passed. If ``limit`` is ``True``, will only return
the top scoring replay info by ``user_id``. If ``False``, will return
all scores by ``user_id``.
Returns
-------
list[:class:`~.ReplayInfo`]
The replay infos representing the map's leaderboard.
:class:`~.ReplayInfo`
If ``limit`` is ``True`` and ``user_id`` is passed.
Notes
-----
One of ``user_id`` or ``span`` must be passed.
Raises
------
NoInfoAvailableException
If there is no info available for the given parameters.
"""
# we have to define a new variable to hold locals - otherwise when we
# call it twice inside the dict comprehension, it rebinds to the comp
# scope and takes on different locals which is real bad.
# I spent many-a-hour figuring this out,
# and if anyone has a more elegant solution I'm all ears.
locals_ = locals()
self.log.log(TRACE, "Loading replay info on map %d with options %s",
map_id, {k: locals_[k] for k in locals_ if k != 'self'})
if not (span or user_id):
raise InvalidArgumentsException("One of user_id or span must be passed, but not both")
api_limit = None
if span:
api_limit = max(span)
response = self.api.get_scores({"m": "0", "b": map_id, "limit": api_limit, "u": user_id, "mods": mods if mods is None else mods.value})
Loader.check_response(response)
if span:
# filter span_set to remove indexes that would cause an indexerror
# when indexing ``response``
# span_set = {3, 6}; response = [a, b, c, d, e, f]; len(response) = 6
# we want to keep {6} since we index at [i-1], so use <= not <
_span = {x for x in span if x <= len(response)}
# filter out anything not in our span
response = [response[i-1] for i in _span]
# yes, it's necessary to cast the str response to int before bool - all strings are truthy.
# strptime format from https://github.com/ppy/osu-api/wiki#apiget_scores
infos = [ReplayInfo(datetime.strptime(x["date"], "%Y-%m-%d %H:%M:%S"), map_id, int(x["user_id"]), str(x["username"]), int(x["score_id"]),
Mod(int(x["enabled_mods"])), bool(int(x["replay_available"]))) for x in response]
return infos[0] if (limit and user_id) else infos # limit only applies if user_id was set
@request
def get_user_best(self, user_id, span, mods=None):
"""
Retrieves replay infos from a user's top plays.
Parameters
----------
user_id: int
The user id to get best plays of.
span: Span
A comma separated list of ranges of top plays to retrieve.
``span="1-3,6,2-4"`` -> replays in the range ``[1,2,3,4,6]``.
mods: :class:`~.ModCombination`
If passed, will only retrieve replay infos for scores that were
played with the given mods.
Returns
-------
list[:class:`~.ReplayInfo`]
The replay infos representing the user's top plays.
Raises
------
InvalidArgumentsException
If any elements in ``span`` are not between ``1`` and ``100``
inclusive.
"""
locals_ = locals()
self.log.log(TRACE, "Loading user best of %s with options %s",
user_id, {k: locals_[k] for k in locals_ if k != 'self'})
api_limit = max(span)
response = self.api.get_user_best({"m": "0", "u": user_id, "limit": api_limit})
Loader.check_response(response)
if mods:
_response = []
for r in response:
if Mod(int(r["enabled_mods"])) == mods:
_response.append(r)
response = _response
# remove span indices which would cause an index error because there
# weren't that many replay infos returned by the api. eg if there
# were 4 responses, remove any span above 4
response_count = len(response)
_span = [x for x in span if x <= response_count]
response = [response[i-1] for i in _span]
return [ReplayInfo(datetime.strptime(r["date"], "%Y-%m-%d %H:%M:%S"), int(r["beatmap_id"]), int(r["user_id"]),
self.username(int(r["user_id"])), int(r["score_id"]), Mod(int(r["enabled_mods"])),
bool(int(r["replay_available"]))) for r in response]
@request
def load_replay_data(self, map_id, user_id, mods=None):
"""
Retrieves replay data from the api.
Parameters
----------
map_id: int
The map the replay was played on.
user_id: int
The user that played the replay.
mods: :class:`~.ModCombination`
The mods the replay was played with, or ``None`` for the highest
scoring replay, regardless of mods.
Returns
-------
str
The lzma-encoded string, decoded from the base 64 api response,
representing the replay.
None
If no replay data was available.
Notes
-----
This is the low level implementation of :func:`~.replay_data`, handling
the actual api request.
"""
self.log.log(TRACE, "Requesting replay data by user %d on map %d with mods %s", user_id, map_id, mods)
response = self.api.get_replay({"m": "0", "b": map_id, "u": user_id, "mods": mods if mods is None else mods.value})
Loader.check_response(response)
return base64.b64decode(response["content"])
@check_cache
def replay_data(self, replay_info, cache=None):
"""
Retrieves replay data from the api, or from the cache if it is already
cached.
Parameters
----------
replay_info: :class:`~.ReplayInfo`
The replay info representing the replay to retrieve.
Returns
-------
list[:class:`circleparse.replay.ReplayEvent`]
The replay events with attributes ``x``, ``y``,
``time_since_previous_action``, and ``keys_pressed``.
None
If no replay data was available.
Raises
------
UnknownAPIException
If ``uxser_info.replay_available` was 1, but we did not receive
replay data from the api.
"""
user_id = replay_info.user_id
map_id = replay_info.map_id
mods = replay_info.mods
if not replay_info.replay_available:
self.log.debug("Replay data by user %d on map %d with mods %s not available", user_id, map_id, mods)
return None
lzma_bytes = self.load_replay_data(map_id, user_id, mods)
if lzma_bytes is None:
raise UnknownAPIException("The api guaranteed there would be a replay available, but we did not receive any data. "
"Please report this to the devs, who will open an issue on osu!api if necessary.")
try:
parsed_replay = circleparse.parse_replay(lzma_bytes, pure_lzma=True)
# see https://github.com/circleguard/circlecore/issues/61
# api sometimes returns corrupt replays
except LZMAError:
self.log.warning("lzma from %r could not be decompressed, api returned corrupt replay", replay_info)
return None
replay_data = parsed_replay.play_data
if cache and self.cacher is not None:
self.cacher.cache(lzma_bytes, replay_info)
return replay_data
@lru_cache()
@request
def map_id(self, map_hash):
"""
Retrieves a map id from a corresponding map hash through the api.
Parameters
----------
map_hash: str
The md5 hash of the map to get the id of.
Returns
-------
int
The map id that corresponds to ``map_hash``, or ``0`` if
``map_hash`` doesn't mach any map.
Notes
-----
This function is wrapped in a :func:`functools.lru_cache` to prevent
duplicate api calls.
"""
response = self.api.get_beatmaps({"h": map_hash})
try:
Loader.check_response(response)
except NoInfoAvailableException:
return 0
return int(response[0]["beatmap_id"])
@lru_cache()
@request
def user_id(self, username):
"""
Retrieves a user id from a corresponding username through the api.
Parameters
----------
username: str
The username of the user to get the user id of.
Returns
-------
int
The user id that corresponds to ``username``, or ``0`` if
``username`` doesn't match any user.
Notes
-----
The api redirects name changes to the current username. For instance,
``user_id("cookiezi")`` will return ``124493``, despite shige's current
osu! username being ``chocomint``. However, I am not sure if this
behavior is as well defined when someone else takes the previous name
of a user.
This function is case insensitive.
This function is wrapped in a :func:`functools.lru_cache` to prevent
duplicate api calls.
"""
response = self.api.get_user({"u": username, "type": "string"})
try:
Loader.check_response(response)
except NoInfoAvailableException:
return 0
return int(response[0]["user_id"])
@lru_cache()
@request
def username(self, user_id):
"""
Retrieves the username from a corresponding user id through the api.
Parameters
----------
user_id: int
The user id of the user to get the username of.
Returns
-------
str
The username that corresponds to ``user_id``, or an empty string
if ``user_id`` doesn't match any user.
Notes
-----
This function is the inverse of
:meth:`~circleguard.loader.Loader.user_id`.
This function is wrapped in a :func:`functools.lru_cache` to prevent
duplicate api calls.
"""
response = self.api.get_user({"u": user_id, "type": "id"})
try:
Loader.check_response(response)
except NoInfoAvailableException:
return ""
return response[0]["username"]
[docs] @staticmethod
def check_response(response):
"""
Checks a response from the api for an error or empty response.
Parameters
----------
response: list or dict
The response returned by the api.
Raises
------
One of :class:`~.enums.Error`
If an error exists in the response.
NoInfoAvailable
If the response is empty.
"""
if "error" in response: # dict case
for error in Error:
if response["error"] == error.value[0]:
raise error.value[1](error.value[2])
else:
raise Error.UNKNOWN.value[1](Error.UNKNOWN.value[2]) # pylint: disable=unsubscriptable-object
# pylint is dumb because Error is an enum and this is totally legal
if not response: # response is empty, list or dict case
raise NoInfoAvailableException("No info was available from the api for the given arguments.")
def _enforce_ratelimit(self):
"""
Sleeps the thread until we have refreshed our ratelimits.
"""
difference = datetime.now() - Loader.start_time
seconds_passed = difference.seconds
# sleep the remainder of the reset cycle so we guarantee it's been that long since the first request
sleep_seconds = Loader.RATELIMIT_RESET - seconds_passed
self._ratelimit(sleep_seconds)
def _ratelimit(self, length):
"""
Sleeps the thread for ``length`` time.
Parameters
----------
length: int
How long, in seconds, to sleep the thread for.
Notes
-----
This method is only called by :meth:`~._enforce_ratelimit`. It is split
into two functions to allow :class:`~.Loader` subclasses to overload
just this function and easily interact with the ``length``.
"""
self.log.info("Ratelimited, sleeping for %s seconds.", length)
time.sleep(length)