from __future__ import annotations
from typing import Any, NamedTuple
import datetime
import json
import time
import os
from aiohttp.web import Request, Response, HTTPNotFound, HTTPForbidden, HTTPNotImplemented
import aiohttp_jinja2
from response_objects.run_single import RunResponse
from response_objects.profiles import ProfilesResponse
from src.cache.run_stats import update_all_run_stats
from src.cache.cache_helpers import RunLinkedListNode
from src.cache.mastered import update_mastery_stats
from src.cache.streaks import update_streak_collections
from src.sts_profile import get_profile
from src.gamedata import FileParser, KeysObtained, _enemies
from src.webpage import router
from src.logger import logger
from src.events import add_listener
from src.utils import convert_class_to_obj, get_req_data
from src.activemods import ActiveMods, ActiveMod, ACTIVEMODS_KEY
__all__ = ["get_latest_run", "RunParser", "StreakInfo"]
_cache: dict[str, RunParser] = {}
_ts_cache: dict[int, RunParser] = {}
def get_latest_run(character: str | None, victory: bool | None) -> RunParser:
_update_cache()
if not _ts_cache:
return None
latest = _ts_cache[max(_ts_cache)]
is_character_specific = False
if character is not None:
is_character_specific = True
while latest.character != character:
latest = latest.matched.prev
if victory is not None:
if victory:
while not latest.won:
latest = latest.matched.get_run(is_prev=True, is_character_specific=is_character_specific)
else:
while latest.won:
latest = latest.matched.get_run(is_prev=True, is_character_specific=is_character_specific)
return latest
[docs]
class RunParser(FileParser):
done = True
def __init__(self, filename: str, profile: int, data: dict[str, Any]):
if filename in _cache:
raise RuntimeError(f"Created duplicate run parser with name {filename}")
super().__init__(data)
self.filename = filename
self.name, _, ext = filename.partition(".")
self.matched = RunLinkedListNode()
self._character = data["character_chosen"]
self._profile = profile
self._character_streak = None
self._rotating_streak = None
self._activemods = None
def __repr__(self):
return f"Run<{self.display_name}>"
@property
def display_name(self) -> str:
return f"({self.character} {self.verb}) {self.timestamp}"
@property
def profile(self):
return get_profile(self._profile)
@property
def timestamp(self) -> datetime.datetime:
"""Time when the run finished, as UTC."""
return datetime.datetime.fromtimestamp(self._data["timestamp"], datetime.UTC)
@property
def timedelta(self) -> datetime.timedelta:
"""Difference between now and the run."""
return datetime.datetime.now(datetime.UTC) - self.timestamp
@property
def keys(self) -> KeysObtained:
keys = KeysObtained()
for choice in self._data["campfire_choices"]:
if choice["key"] == "RECALL":
keys.ruby_key_obtained = True
keys.ruby_key_floor = int(choice["floor"])
if "green_key_taken_log" in self._data:
keys.emerald_key_obtained = True
keys.emerald_key_floor = int(self._data["green_key_taken_log"])
if "blue_key_relic_skipped_log" in self._data:
keys.sapphire_key_obtained = True
keys.sapphire_key_floor = int(self._data["blue_key_relic_skipped_log"]["floor"])
return keys
@property
def _neow_data(self) -> tuple[dict[str, list[str] | int], list[str], list[str]]:
data = dict(self._data.get("neow_bonus_log", {}))
bonuses = list(self._data.get("neow_bonuses_skipped_log", ()))
costs = list(self._data.get("neow_costs_skipped_log", ()))
return (data, bonuses, costs)
@property
def _master_deck(self) -> list[str]:
return list(self._data["master_deck"])
@property
def won(self) -> bool:
return self._data["victory"]
@property
def verb(self) -> str:
return "victory" if self.won else "loss"
@property
def killed_by(self) -> str | None:
killer = self._data.get("killed_by")
return _enemies.get(killer, killer)
@property
def floor_reached(self) -> int:
return int(self._data["floor_reached"])
floor = floor_reached
@property
def acts_beaten(self) -> int:
"""Return how many acts were beaten."""
return self._data["path_per_floor"].count(None) # None is a boss chest, act 4 transition, AND final screen
@property
def final_health(self) -> tuple[int, int]:
return self._data["current_hp_per_floor"][-1], self._data["max_hp_per_floor"][-1]
@property
def score(self) -> int:
return int(self._data["score"])
@property
def score_breakdown(self) -> list[str]:
return self._data.get("score_breakdown", [])
@property
def run_length(self) -> str:
seconds = self._data["playtime"]
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}:{minutes:>02}:{seconds:>02}"
return f"{minutes:>02}:{seconds:>02}"
@property
def character_streak(self) -> StreakInfo:
streak = self._character_streak
if streak is None:
streak = self._get_streak(is_character_streak=True)
if not streak.is_ongoing:
self._character_streak = streak
return streak
@property
def rotating_streak(self) -> StreakInfo:
streak = self._rotating_streak
if streak is None:
streak = self._get_streak(is_character_streak=False)
if not streak.is_ongoing:
self._rotating_streak = streak
return streak
def _get_streak(self, *, is_character_streak: bool) -> StreakInfo:
if self.won:
streak_total = 1
position_in_streak = 1
is_ongoing = True
def loop_cached_runs(*, is_prev: bool):
nonlocal streak_total, position_in_streak, is_ongoing
current_run: RunParser = self.matched.get_run(is_prev=is_prev, is_character_specific=is_character_streak)
if current_run is not None:
last_char = self.character
while current_run.won:
# If rotating, only iterate if not same char
if is_character_streak or current_run.character != last_char:
streak_total += 1
# only iterate the position if the Win came before the current run
if is_prev:
position_in_streak += 1
if (run := current_run.matched.get_run(is_prev=is_prev, is_character_specific=is_character_streak)) is not None:
last_char = current_run.character
current_run = run
else:
break
does_upcoming_loss_exist = not is_prev and not current_run.won
if does_upcoming_loss_exist:
is_ongoing = False
loop_cached_runs(is_prev=True)
loop_cached_runs(is_prev=False)
return StreakInfo(streak_total, position_in_streak, is_ongoing)
return StreakInfo(0, 0, False)
@property
def has_activemods(self) -> bool:
return ACTIVEMODS_KEY in self._data
@property
def activemods(self) -> ActiveMods:
if self._activemods is None:
self._activemods = ActiveMods(self._data)
return self._activemods
[docs]
def find_mod(self, mod_name: str) -> ActiveMod | None:
return self.activemods.find_mod(mod_name)
@property
def mods(self) -> list[ActiveMod]:
return self.activemods.all_mods
[docs]
class StreakInfo(NamedTuple):
"""Contain run streak information."""
streak: int
position: int
is_ongoing: bool
@add_listener("setup_init")
async def _setup_cache():
for i in range(3):
os.makedirs(os.path.join("data", "runs", str(i)), exist_ok=True)
_update_cache()
def _update_cache():
start = time.time()
for path, folders, files in os.walk(os.path.join("data", "runs")):
for folder in folders:
profile = int(folder)
_cur_cache: dict[int, RunParser] = {}
for p1, d1, f1 in os.walk(os.path.join(path, folder)):
for file in f1:
if file in _cache:
parser = _cache[file]
else:
with open(os.path.join(p1, file)) as f:
_cache[file] = parser = RunParser(file, profile, json.load(f))
_ts_cache[parser._data["timestamp"]] = parser
_cur_cache[parser._data["timestamp"]] = parser
prev = None
prev_char: dict[str, RunParser | None] = {}
prev_win = None
prev_loss = None
for t in sorted(_cur_cache):
cur = _cur_cache[t]
if prev is not None:
if cur.matched.prev is None:
prev.matched.next = cur
cur.matched.prev = prev
if cur.character not in prev_char:
prev_char[cur.character] = None
if cur.matched.prev_char is None and (c := prev_char[cur.character]) is not None:
c.matched.next_char = cur
cur.matched.prev_char = c
prev_char[cur.character] = cur
if cur.won:
if cur.matched.prev_win is None and prev_win is not None:
prev_win.matched.next_win = cur
cur.matched.prev_win = prev_win
prev_win = cur
else:
if cur.matched.prev_loss is None and prev_loss is not None:
prev_loss.matched.next_loss = cur
cur.matched.prev_loss = prev_loss
prev_loss = cur
prev = cur
update_all_run_stats()
update_mastery_stats()
update_streak_collections()
# I don't actually know how long this cache updating is going to take...
# I think it's as optimized as I could make it while still being safe,
# but it's possible it still takes some time. I'm not going to focus on
# that for now, but logging the update time everytime, in case it turns
# out to be a bottleneck. We only want to actually update new runs.
logger.info(f"Updated run parser cache in {time.time() - start}s")
@router.get("/runs")
@aiohttp_jinja2.template("runs_profile.jinja2")
async def pick_profile(req: Request):
profiles = []
for i in range(3):
profile = get_profile(i)
if profile is not None:
profiles.append(profile)
if not profiles:
raise HTTPNotImplemented(reason="No run files were found")
return convert_class_to_obj(ProfilesResponse(profiles))
def get_parser(name) -> RunParser | None:
parser = _cache.get(f"{name}.run") # most common case
if parser is None:
_update_cache()
parser = _cache.get(f"{name}.run") # try again, just in case
if parser is None: # okay, iterate through everything
for run_parser in _cache.values():
if run_parser.name == name:
parser = run_parser
break
return parser
def _truthy(x: str | None) -> bool:
if x and x.lower() in ("1", "true", "yes"):
return True
return False
def _falsey(x: str | None) -> bool:
if x and x.lower() in ("0", "false", "no"):
return False
return True
@router.get("/runs/{name}")
@aiohttp_jinja2.template("run_single.jinja2")
async def run_single(req: Request):
parser = get_parser(req.match_info["name"])
if parser is None:
raise HTTPNotFound()
redirect = _truthy(req.query.get("redirect"))
response = RunResponse(parser, parser.matched, autorefresh=False, redirect=redirect)
return convert_class_to_obj(response)
@router.get("/runs/{name}/raw")
async def run_raw_json(req: Request) -> Response:
parser = get_parser(req.match_info["name"])
if parser is None:
raise HTTPNotFound()
return Response(text=json.dumps(parser._data, indent=4), content_type="application/json")
@router.get("/runs/{name}/{type}")
async def run_chart(req: Request) -> Response:
parser = get_parser(req.match_info["name"])
if parser is None:
raise HTTPNotFound()
return parser.graph(req)
#@router.get("/compare/view")
@aiohttp_jinja2.template("compare_single.jinja2")
async def compare_runs(req: Request):
context = {}
try:
start = int(req.query.get("start", 0))
end = int(req.query.get("end", time.time()))
score = int(req.query.get("score", 0))
except ValueError:
raise HTTPForbidden(reason="'start', 'end', 'score' params must be integers if present")
chars = req.query.getall("character", [])
victory = _truthy(req.query.get("victory"))
loss = _falsey(req.query.get("loss"))
relics = req.query.getall("relic", [])
cards = req.query.getall("card", [])
return context
@router.post("/sync/run")
async def receive_run(req: Request) -> Response:
content, name, profile = await get_req_data(req, "run", "name", "profile")
with open(os.path.join("data", "runs", profile, name), "w") as f:
f.write(content)
data = json.loads(content)
if name not in _cache:
_cache[name] = parser = RunParser(name, int(profile), data)
_ts_cache[parser._data["timestamp"]] = parser
_update_cache()
logger.debug(f"Received run history file. Updated data. Transaction time: {time.time() - float(req.query['start'])}s")
return Response()