import sqlite3
import logging
import os
import wtc
from circleguard.loader import Loader
from circleguard.exceptions import CircleguardException
from circleguard.utils import TRACE
[docs]class Cacher:
"""
Handles compressing and caching replay data to a database.
Parameters
----------
cache: bool
Whether or not replays should be cached.
path: str or :class:`os.PathLike`
The absolute path to the database. If the path does not exist,
a fresh database will be created there.
Notes
-----
Each Cacher instance maintains its own database connection.
"""
def __init__(self, cache, path):
self.log = logging.getLogger(__name__)
self.should_cache = cache
if not os.path.isfile(str(path)):
self._create_cache(str(path))
self.conn = sqlite3.connect(str(path))
self.cursor = self.conn.cursor()
[docs] def cache(self, lzma_bytes, replay_info):
"""
Caches a replay in the form of a (compressed) lzma stream to the
database, linking it to the replay info.
Parameters
----------
map_id: str
The map id to insert into the db.
lzma_bytes: str
The lzma stream to compress and insert into the db.
replay_info: :class:`~circleguard.replay_info.ReplayInfo`
The ReplayInfo object representing this replay.
Notes
-----
If an entry with the given replay info already exists, it is overwritten
by the passed lzma.
The lzma string is compressed with wtc compression. See
:func:`~Cacher._compress` and :func:`wtc.compress` for more.
A call to this method has no effect if the Cacher's ``should_cache``
is ``False``.
"""
self.log.debug("Caching lzma bytes")
if self.should_cache is False:
self.log.debug("Cacher should_cache is False, not caching")
return
compressed_bytes = self._compress(lzma_bytes)
map_id = replay_info.map_id
user_id = replay_info.user_id
mods = replay_info.mods.value
replay_id = replay_info.replay_id
result = self.cursor.execute("SELECT COUNT(1) FROM replays WHERE map_id=? AND user_id=? AND mods=?", [map_id, user_id, mods]).fetchone()[0]
self.log.log(TRACE, "Writing compressed lzma to db")
if result: # already exists so we overwrite (this happens when we call Cacher.revalidate)
self._write("UPDATE replays SET replay_data=?, replay_id=? WHERE map_id=? AND user_id=? AND mods=?", [compressed_bytes, replay_id, map_id, user_id, mods])
else: # else just insert
self._write("INSERT INTO replays VALUES(?, ?, ?, ?, ?)", [map_id, user_id, compressed_bytes, replay_id, mods])
[docs] def revalidate(self, loader, replay_info):
"""
Checks entries in ``replay_info`` against their entries in the database
(if any) to look for score id mismatches, indicating an outdated replay.
If there are mismatches, the replay is redownloaded and cached from the
replay info.
Parameters
----------
loader: :class:`~circleguard.loader.Loader`
The Loader from the circleguard instance to redownload replays with
if they are outdated.
replay_info: list[:class:`~circleguard.replay_info.ReplayInfo`]
A list of ReplayInfo objects containing the up-to-date information
of user's replays.
Raises
------
CircleguardException
Raised when the redownloaded replay id is lower than the cached
replay id. This should never happen and is indicative of either a
fault on our end or the api's end.
Also raised if the replay data is not available from the api when
redownloaded.
Notes
-----
If the replay is found to be outdated, it will be overwritten
by the newer replay in the database.
"""
self.log.info("Revalidating cache with %d replay_infos", len(replay_info))
for info in replay_info:
map_id = info.map_id
user_id = info.user_id
mods = info.enabled_mods.value
self.log.log(TRACE, "Revalidating entry with map id %s, user %d, mods %s", map_id, user_id, mods)
result = self.cursor.execute("SELECT replay_id FROM replays WHERE map_id=? AND user_id=? AND mods=?", [map_id, user_id, mods]).fetchall()
if not result:
self.log.trace("Nothing cached with map id %s, user %d, mods %s", map_id, user_id, mods)
continue # nothing cached
db_replay_id = result[0][0] # blame sqlite for nesting tuples in lists
new_replay_id = info.replay_id
if db_replay_id != new_replay_id:
if db_replay_id > new_replay_id:
raise CircleguardException("The cached replay id of {} is higher than the new replay id of {}. Map id: {}, User id: {}, mods: {}"
.format(db_replay_id, new_replay_id, user_id, map_id, mods))
self.log.info("Cached replay on map %d by user %d with mods %d is outdated, redownloading", map_id, user_id, mods)
lzma_data = loader.replay_data(info)
if lzma_data is None:
raise CircleguardException("We could not load lzma data for map {}, user {}, mods {}, replay available {} while revalidating."
.format(map_id, user_id, mods, info.replay_available))
self.cache(lzma_data, info)
[docs] def check_cache(self, map_id, user_id, mods):
"""
Checks the cache for a replay described by the parameters, and returns
its data if the cache contains the replay.
Parameters
----------
map_id: int
The id of the map the replay was played on.
user_id: int
The id of the user that played the replay.
mods: :class:`~circleguard.mod.ModCombination`
The mods this replay was played with.
Returns
-------
str or None
The replay data in decompressed lzma form if the cache contains the
replay, or None if not.
"""
mods = mods.value
self.log.log(TRACE, "Checking cache for a replay on map %d by user %d with mods %s", map_id, user_id, mods)
result = self.cursor.execute("SELECT replay_data FROM replays WHERE map_id=? AND user_id=? AND mods=?", [map_id, user_id, mods]).fetchone()
if result:
self.log.debug("Loading replay on map %d by user %d with mods %s from cache", map_id, user_id, mods)
return wtc.decompress(result[0], decompressed_lzma=True)
self.log.log(TRACE, "No replay found in cache")
return None
def _write(self, statement, args):
"""
A helper method that writes an sql statement with
the given args, and commits the connection.
Parameters
----------
statement: str
The sql statement to execute.
args: list
The values to insert into the statement.
Must be of length equal to the number of missing values
(question marks) in the statement.
"""
self.cursor.execute(statement, args)
self.conn.commit()
def _compress(self, lzma_bytes):
"""
Compresses an lzma string with wtc compression
(see :func:`wtc.compress`).
Parameters
----------
lzma_bytes: str
The lzma string representing a replay, to compress.
Returns
-------
str
The lzma_bytes string, compressed with wtc compression
(:func:`wtc.compress`).
Notes
-----
wtc compression is not lossless, in order to save space. Please see
:func:`wtc.compress` for more details.
"""
self.log.log(TRACE, "Compressing lzma bytes")
return wtc.compress(lzma_bytes)
def _create_cache(self, path):
"""
Creates a database with the necessary tables at the given path.
Parameters
----------
path: str
The absolute path to where the database should be created.
Notes
-----
This function will create directories specified in the path if they
don't already exist.
"""
self.log.info("Cache not found at path %s, creating cache", path)
if not os.path.exists(os.path.split(path)[0]): # create dir if nonexistent
os.makedirs(os.path.split(path)[0])
conn = sqlite3.connect(str(path))
c = conn.cursor()
c.execute("""CREATE TABLE "REPLAYS"(
"MAP_ID" INTEGER NOT NULL,
"USER_ID" INTEGER NOT NULL,
"REPLAY_DATA" MEDIUMTEXT NOT NULL,
"REPLAY_ID" INTEGER NOT NULL,
"MODS" INTEGER NOT NULL,
PRIMARY KEY("REPLAY_ID")
)""")
conn.close()