Source code for src.wrapper

from typing import Callable, Coroutine, Optional

import inspect

from twitchio.ext.commands import Context

from src.logger import logger

__all__ = ["wrapper"]

[docs] def wrapper(func: Callable, force_argcount: bool, wrapper_func: Optional[Coroutine], name: Optional[str]): if name is None: name = func.__name__ async def caller(ctx: Context, *args: str): wrapped_args = None if wrapper_func is not None: try: wrapped_args: list = await wrapper_func(ctx) except ValueError: return new_args = [] if wrapped_args is not None: new_args.extend(wrapped_args) delta = len(new_args) + 1 multiple = (co.co_flags & 0x04) # whether *args is supported annotations = inspect.get_annotations(func, eval_str=True) for i, arg in enumerate(args, delta): if i < co.co_argcount: var = co.co_varnames[i] elif multiple: # all from here on will match the same type -- typically str, but could be something else var = co.co_varnames[co.co_argcount] else: # no *args and reached max argcount; no point in continuing break if var in annotations: expected = annotations[var] if isinstance(expected, type(Optional[int])): if arg is None: continue expected = expected.__args__[0] # using the non-None arg elif expected == int: try: arg = int(arg) except ValueError: await ctx.reply(f"Error: Argument #{i-delta} ({var!r}) must be an integer.") return elif expected == float: try: arg = float(arg) except ValueError: await ctx.reply(f"Error: Argument #{i-delta} ({var!r}) must be a floating point number.") return elif expected == bool: if arg.lower() in ("yes", "y", "on", "true", "1"): arg = True elif arg.lower() in ("no", "n", "off", "false", "0"): arg = False else: await ctx.reply(f"Error: Argument #{i-delta} ({var!r}) must be parsable as a boolean value.") return elif expected != str: await ctx.reply(f"Warning: Unhandled type {expected!r} for argument #{i} ({var!r}) - please ping @FaeLyka") new_args.append(arg) if req > len(new_args): names = co.co_varnames[len(new_args)+1:req] if len(names) == 1: await ctx.reply(f"Error: Missing required argument {names[0]!r}") else: await ctx.reply(f"Error: Missing required arguments {names!r}") return if multiple: # function supports *args, don't check further await func(ctx, *new_args) return if len(new_args) != len(args) and force_argcount: # too many args and we enforce it await ctx.reply(f"Error: too many arguments (maximum {co.co_argcount - 1})") return await func(ctx, *new_args) co = func.__code__ req = co.co_argcount - 1 if func.__defaults__: req -= len(func.__defaults__) caller.__required__ = req caller.__name__ = name caller.__doc__ = func.__doc__ logger.debug(f"Creating wrapped command {name}") return caller