Source code for src.utils

import calendar
from datetime import datetime, UTC
from typing import Any, Iterable
from aiohttp.web import Request, HTTPNotImplemented, HTTPForbidden, HTTPUnauthorized, FileField
from twitchio import models, client, http as _http

import os
import json

from src.configuration import config

__all__ = [
    "get_req_data",
    "post_prediction",
    "getfile",
    "update_db",
    "convert_class_to_obj",
    "format_for_slaytabase",
    "complete_match",
    "parse_date_range",
]

[docs] async def get_req_data(req: Request, *keys: str) -> list[str]: pw = req.query.get("key") if pw is None: raise HTTPUnauthorized(reason="No API key provided") if not config.server.secret: raise HTTPNotImplemented(reason="No API key present in config") if pw != config.server.secret: raise HTTPForbidden(reason="Invalid API key provided") post = await req.post() res = [] for key in keys: value = post.get(key) if isinstance(value, FileField): value = value.file.read() if isinstance(value, bytes): value = value.decode("utf-8", "xmlcharrefreplace") res.append(value) return res
# TODO: Merge this upstream to TwitchIO eventually # DEPCHECK
[docs] async def post_prediction(app: client.Client, broadcaster_id: int, title: str, outcomes: list[str], prediction_window: int): body = { "broadcaster_id": broadcaster_id, "title": title, "prediction_window": prediction_window, "outcomes": [{"title": x} for x in outcomes], } val = await app._http.request( _http.Route("POST", "predictions", body=body, token=app._http.token), paginate=False, ) return models.Prediction(app._http, val[0])
[docs] def getfile(x: str, mode: str): return open(os.path.join("data", x), mode)
[docs] def update_db(): from src.server import _cmds with getfile("data.json", "w") as f: json.dump(_cmds, f, indent=config.server.json_indent)
[docs] def convert_class_to_obj(obj: Any) -> dict[str, Any]: return vars(obj)
[docs] def format_for_slaytabase(val: str) -> str: return val.replace(":", "-").replace("'", "").replace(" ", "").lower()
[docs] def complete_match(string: str, matches: Iterable[str]) -> list: possible_matches = set() for possible in matches: if string == possible: return [string] if possible.startswith(string) or string in possible: possible_matches.add(possible) return sorted(possible_matches)
[docs] def parse_date_range(date_string: str) -> tuple[datetime]: """valid date strings: YYYY/MM/DD-YYYY/MM/DD (MM and DD optional), YYYY/MM/DD+ (no end date), YYYY/MM/DD- (no start date)""" start_date: datetime | None = None end_date: datetime | None = None if date_string[-1] == "+": start_date = _parse_dates_with_optional_month_day(date_string[:-1]) elif date_string[-1] == "-": end_date = _parse_dates_with_optional_month_day(date_string[:-1]) else: date_parts = date_string.split("-") if len(date_string) == 4 and len(date_parts) == 1: # only a year was passed, default to Jan 1 - Dec 31 of the specified year return (datetime(int(date_parts[0]), 1, 1, tzinfo=UTC), datetime(int(date_parts[0]), 12, 31, 23, 59, 59, tzinfo=UTC)) if len(date_parts) > 2: raise ValueError("Range format is invalid") start_date = _parse_dates_with_optional_month_day(date_parts[0]) end_date = _parse_dates_with_optional_month_day(date_parts[-1], True) if end_date is not None and end_date < start_date: raise TypeError("Start date was after End Date") return (start_date, end_date)
def _parse_dates_with_optional_month_day(val: str, isEndDate: bool = False) -> datetime: """Base val should be in YYYY/MM/DD where MM and DD are optional, defaulted to start or end of year""" date_parts = val.split("/") year = int(date_parts[0]) month = 12 if isEndDate else 1 if (len(date_parts) > 1): month = int(date_parts[1]) daysInMonth = calendar.monthrange(year, month)[1] day = daysInMonth if isEndDate else 1 if (len(date_parts) > 2): day = int(date_parts[2]) if isEndDate: return datetime(year, month, day, hour=23, minute=59, second=59, tzinfo=UTC) else: return datetime(year, month, day, tzinfo=UTC) def edit_distance(left, right): m = len(left) + 1 n = len(right) + 1 d = [[0 for _ in range(0, n)] for _ in range(0, m)] for i in range(1, m): d[i][0] = i for i in range(1, n): d[0][i] = i for i in range(1, m): for j in range(1, n): if left[i-1] == right[j-1]: subCost = 0 else: subCost = 1 d[i][j] = min( d[i-1][j] + 1, # deletion d[i][j-1] + 1, # insertion d[i-1][j-1] + subCost # substitution ) return d[len(left)][len(right)]