Source code for src.sts_profile

from __future__ import annotations

from typing import Generator, TYPE_CHECKING

import zipfile
import math
import time
import json
import os
import io

from aiohttp.web import Request, Response, HTTPForbidden, HTTPNotFound
from itertools import islice
from datetime import datetime

import aiohttp_jinja2

from src.nameinternal import get
from src.webpage import router
from src.logger import logger
from src.events import add_listener
from src.utils import get_req_data

if TYPE_CHECKING: # circular imports otherwise
    from src.runs import RunParser

__all__ = ["get_profile", "get_current_profile"]

_profiles: dict[int, Profile] = {}
_slots: dict[str, str] = {}

[docs] def get_profile(x: int) -> Profile: return _profiles.get(x, None)
[docs] def get_current_profile() -> Profile: return _profiles[int(_slots["DEFAULT_SLOT"])]
def profile_from_request(req: Request) -> Profile: try: profile = get_profile(int(req.match_info["profile"])) if profile is None: raise HTTPNotFound() except ValueError: raise HTTPForbidden(reason="profile must be integer") return profile class Profile: RUNS_PER_PAGE = 50 def __init__(self, index: int, data: dict[str, str]): self.index = index self._prefix = "" if index: self._prefix = f"{index}_" self.data = data def __str__(self) -> str: return self.name @property def name(self) -> str: return _slots[f"{self._prefix}PROFILE_NAME"] @property def completion(self) -> str: return "{:.2%}".format(_slots[f"{self._prefix}COMPLETION"]) @property def playtime(self) -> str: minutes, seconds = divmod(int(_slots[f"{self._prefix}PLAYTIME"]), 60) hours, minutes = divmod(minutes, 60) return f"{hours:>2}:{minutes:>02}:{seconds:>02}" @property def hole_card(self) -> str: item = get(self.data["NOTE_CARD"]) match self.data["NOTE_UPGRADE"]: case "0": name = item.name case "1": name = f"{item.name}+" case a: name = f"{name}+{a}" return name @property def runs(self) -> Generator[RunParser, None, None]: """Return all runs from the matching profile, newest first.""" from src.runs import _ts_cache l = list(_ts_cache) l.sort() l.reverse() for ts in l: if _ts_cache[ts]._profile == self.index: yield _ts_cache[ts] def paged_runs(self, page): page -= 1 # UI serves the page number one-indexed, we want zero-indexed start = page * self.RUNS_PER_PAGE end = start + self.RUNS_PER_PAGE print(page, start, end) return islice(self.runs, start, end) @property def pages(self): return math.floor(sum(1 for _ in self.runs) / self.RUNS_PER_PAGE) + 1 @router.get("/profile/{profile}/runs") @router.get("/profile/{profile}/runs/{page}") @aiohttp_jinja2.template("runs.jinja2") async def runs_page(req: Request): profile = profile_from_request(req) from src.runs import _update_cache _update_cache() try: page = int(req.match_info.get("page", 1)) if page < 1: page = 1 except: page = 1 return { "profile": profile, "page": page, "pages": profile.pages, } @router.get("/profile/{profile}/runs/by-timestamp/{timestamp}") @aiohttp_jinja2.template("runs_timestamp.jinja2") async def runs_by_timestamp(req: Request): profile = profile_from_request(req) from src.runs import _update_cache _update_cache() try: timestamp = req.match_info.get("timestamp", "") start, _, end = timestamp.partition("..") start = int(start) if start else 0 end = int(end) if end else time.time() except ValueError: raise HTTPForbidden(reason="Timestamp must be integers if given.") has_runs = False runs = [] for run in profile.runs: if start <= run.timestamp.timestamp() <= end: runs.append(run) has_runs = True if not has_runs: raise HTTPForbidden(reason="No run file matches the given range.") return { "profile": profile, "runs": runs, "back": req.rel_url.query.get('back'), "start": datetime.fromtimestamp(start), # TODO: Map to UTC "end": datetime.fromtimestamp(end), } @router.get("/archive/{profile}/{timestamp}.zip") async def runs_as_zipfile(req: Request) -> Response: profile = profile_from_request(req) from src.runs import _update_cache _update_cache() try: timestamp = req.match_info.get("timestamp", "") if timestamp not in ("all", "runs"): start, _, end = timestamp.partition("..") start = int(start) if start else 0 end = int(end) if end else time.time() else: start = 0 end = time.time() except ValueError: raise HTTPForbidden(reason="Timestamp must be integers if given.") has_file = False with io.BytesIO() as zfile: with zipfile.ZipFile(zfile, mode="w") as archive: for run in profile.runs: if start <= run.timestamp.timestamp() <= end: archive.write(f"data/runs/{profile.index}/{run.filename}") has_file = True if not has_file: raise HTTPForbidden(reason="No run file matches the given range.") return Response(body=zfile.getvalue(), content_type="application/zip") @router.post("/sync/profile") async def sync_profiles(req: Request) -> Response: slots, *profiles = await get_req_data(req, "slots", "0", "1", "2") if slots: _slots.clear() _slots.update(json.loads(slots)) with open(os.path.join("data", "slots"), "w") as f: f.write(slots) for i in range(3): profile = profiles[i] if not profile: continue # either it doesn't exist, or it hasn't changed with open(os.path.join("data", f"profile_{i}"), "w") as f: f.write(profile) profile = json.loads(profile) if i not in _profiles: _profiles[i] = Profile(i, profile) else: _profiles[i].data = profile logger.debug(f"Received profiles. Transaction time: {time.time() - float(req.query['start'])}s") return Response() @add_listener("setup_init") async def fetch_profiles(): try: with open(os.path.join("data", "slots")) as f: _slots.update(json.load(f)) except OSError: pass for i in range(3): try: with open(os.path.join("data", f"profile_{i}")) as f: _profiles[i] = Profile(i, json.load(f)) except OSError: continue