From 6979519e126eb9ab66144752cb3f31d5125fac86 Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:13:50 +0200 Subject: [PATCH 1/8] chore(ruff): designate docstring style --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index c4a60f9..5d1042c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,9 @@ keep-runtime-typing = true [tool.ruff.pylint] max-args = 10 +[tool.ruff.pydocstyle] +convention = "numpy" + [tool.ruff.per-file-ignores] "scripts/*" = [ # Allow printing in scripts. From e859a6d13aa269de65c4c5d026df736b0dcb5fc8 Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:29:05 +0200 Subject: [PATCH 2/8] chore: refactor plugins to prepare for subplugins --- src/disnake/ext/plugins/async_utils.py | 31 - src/disnake/ext/plugins/plugin.py | 806 ++++++++++--------------- src/disnake/ext/plugins/typeshed.py | 427 +++++++++++++ src/disnake/ext/plugins/utils.py | 84 +++ 4 files changed, 823 insertions(+), 525 deletions(-) delete mode 100644 src/disnake/ext/plugins/async_utils.py create mode 100644 src/disnake/ext/plugins/typeshed.py create mode 100644 src/disnake/ext/plugins/utils.py diff --git a/src/disnake/ext/plugins/async_utils.py b/src/disnake/ext/plugins/async_utils.py deleted file mode 100644 index 18db497..0000000 --- a/src/disnake/ext/plugins/async_utils.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Utilities for asyncio patterns.""" - -import asyncio -import typing - -# asyncio.Task isn't subscriptable in py 3.8, so we do this "workaround" to -# make it subscriptable and compatible with inspect.signature etc. -# This probably isn't necessary but everything for our users, eh? - -if typing.TYPE_CHECKING: - Task = asyncio.Task -else: - T = typing.TypeVar("T") - Task = typing._alias(asyncio.Task, T) # noqa: SLF001 - -__all__: typing.Sequence[str] = ("safe_task",) - - -_tasks: typing.Set["Task[typing.Any]"] = set() - - -def safe_task( - coroutine: typing.Coroutine[typing.Any, typing.Any, typing.Any], -) -> "Task[typing.Any]": - """Create an asyncio background task without risk of it being GC'd.""" - task = asyncio.create_task(coroutine) - - _tasks.add(task) - task.add_done_callback(_tasks.discard) - - return task diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index 8fe44ec..e10d716 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -5,95 +5,21 @@ import asyncio import dataclasses import logging -import sys import typing as t import warnings -import disnake from disnake.ext import commands from typing_extensions import Self -from . import async_utils +from . import typeshed, utils if t.TYPE_CHECKING: from disnake.ext import tasks -__all__ = ("Plugin", "PluginMetadata", "get_parent_plugin") +__all__ = ("Plugin", "SubPlugin", "PluginMetadata", "get_parent_plugin") LOGGER = logging.getLogger(__name__) -_INVALID: t.Final[t.Sequence[str]] = (t.__file__, __file__) - -T = t.TypeVar("T") - -if sys.version_info < (3, 10): - import typing_extensions - - P = typing_extensions.ParamSpec("P") -else: - P = t.ParamSpec("P") - - -AnyBot = t.Union[ - commands.Bot, - commands.AutoShardedBot, - commands.InteractionBot, - commands.AutoShardedInteractionBot, -] - -BotT = t.TypeVar("BotT", bound=AnyBot) - -Coro = t.Coroutine[t.Any, t.Any, T] -MaybeCoro = t.Union[Coro[T], T] -EmptyAsync = t.Callable[[], Coro[None]] -SetupFunc = t.Callable[[BotT], None] - -AnyCommand = commands.Command[t.Any, t.Any, t.Any] -AnyGroup = commands.Group[t.Any, t.Any, t.Any] - -CoroFunc = t.Callable[..., Coro[t.Any]] -CoroFuncT = t.TypeVar("CoroFuncT", bound=CoroFunc) -CoroDecorator = t.Callable[[CoroFunc], T] - -LocalizedOptional = t.Union[t.Optional[str], disnake.Localized[t.Optional[str]]] -PermissionsOptional = t.Optional[t.Union[disnake.Permissions, int]] - -LoopT = t.TypeVar("LoopT", bound="tasks.Loop[t.Any]") - -PrefixCommandCheck = t.Callable[[commands.Context[t.Any]], MaybeCoro[bool]] -AppCommandCheck = t.Callable[[disnake.CommandInteraction], MaybeCoro[bool]] - -PrefixCommandCheckT = t.TypeVar("PrefixCommandCheckT", bound=PrefixCommandCheck) -AppCommandCheckT = t.TypeVar("AppCommandCheckT", bound=AppCommandCheck) - - -class CheckAware(t.Protocol): - checks: t.List[t.Callable[..., MaybeCoro[bool]]] - - -class CommandParams(t.TypedDict, total=False): - help: str - brief: str - usage: str - enabled: bool - description: str - hidden: bool - ignore_extra: bool - cooldown_after_parsing: bool - extras: t.Dict[str, t.Any] - - -class AppCommandParams(t.TypedDict, total=False): - auto_sync: bool - dm_permission: bool - default_member_permissions: PermissionsOptional - guild_ids: t.Sequence[int] - extras: t.Dict[str, t.Any] - - -class SlashCommandParams(AppCommandParams, total=False): - description: LocalizedOptional - connectors: t.Dict[str, str] @dataclasses.dataclass @@ -129,13 +55,21 @@ class PluginMetadata: extras: t.Dict[str, t.Any] """A dict of extra metadata for a plugin.""" - command_attrs: CommandParams = dataclasses.field(default_factory=CommandParams) + command_attrs: typeshed.CommandParams = dataclasses.field( + default_factory=typeshed.CommandParams, + ) """Parameters to apply to each prefix command in this plugin.""" - slash_command_attrs: SlashCommandParams = dataclasses.field(default_factory=SlashCommandParams) + slash_command_attrs: typeshed.SlashCommandParams = dataclasses.field( + default_factory=typeshed.SlashCommandParams, + ) """Parameters to apply to each slash command in this plugin.""" - message_command_attrs: AppCommandParams = dataclasses.field(default_factory=AppCommandParams) + message_command_attrs: typeshed.AppCommandParams = dataclasses.field( + default_factory=typeshed.AppCommandParams, + ) """Parameters to apply to each message command in this plugin.""" - user_command_attrs: AppCommandParams = dataclasses.field(default_factory=AppCommandParams) + user_command_attrs: typeshed.AppCommandParams = dataclasses.field( + default_factory=typeshed.AppCommandParams, + ) """Parameters to apply to each user command in this plugin.""" @property @@ -165,11 +99,7 @@ def category(self, value: t.Optional[str]) -> None: self.extras["category"] = value -class ExtrasAware(t.Protocol): - extras: t.Dict[str, t.Any] - - -def get_parent_plugin(obj: ExtrasAware) -> Plugin[AnyBot]: +def get_parent_plugin(obj: typeshed.ExtrasAware) -> Plugin[typeshed.AnyBot]: """Get the plugin to which the provided object is registered. This only works with objects that support an ``extras`` attribute. @@ -197,141 +127,68 @@ def get_parent_plugin(obj: ExtrasAware) -> Plugin[AnyBot]: raise LookupError(msg) -def _get_source_module_name() -> str: - """Get current frame from exception traceback.""" - # We ignore ruff here because we need to raise and immediately catch an - # exception to figure out our stack level. - try: - raise Exception # noqa: TRY002, TRY301 - except Exception as exc: # noqa: BLE001 - tb = exc.__traceback__ - - if not tb: - # No traceback, therefore can't access frames and infer plugin name... - LOGGER.warning("Failed to infer file name, defaulting to 'plugin'.") - return "plugin" - - # Navigate all frames for one with a valid path. - # Note that we explicitly filter out: - # - the stdlib typing module; if the generic parameter is specified, this - # will be encountered before the target module. - # - this file; we don't want to just return "plugin" if possible. - frame = tb.tb_frame - while frame := frame.f_back: - if frame.f_code.co_filename not in _INVALID: - break - - else: - LOGGER.warning("Failed to infer file name, defaulting to 'plugin'.") - return "plugin" - - module_name = frame.f_locals["__name__"] - LOGGER.debug("Module name resolved to %r", module_name) - return module_name - - -class Plugin(t.Generic[BotT]): - """An extension manager similar to disnake's :class:`commands.Cog`. +@dataclasses.dataclass +class PluginStorage(t.Generic[typeshed.PluginT]): + commands: t.Dict[str, commands.Command[typeshed.PluginT, t.Any, t.Any]] = dataclasses.field(default_factory=dict) # type: ignore + message_commands: t.Dict[str, commands.InvokableMessageCommand] = dataclasses.field(default_factory=dict) + slash_commands: t.Dict[str, commands.InvokableSlashCommand] = dataclasses.field(default_factory=dict) + user_commands: t.Dict[str, commands.InvokableUserCommand] = dataclasses.field(default_factory=dict) - A plugin can hold commands and listeners, and supports being loaded through - `bot.load_extension()` as per usual, and can similarly be unloaded and - reloaded. + command_checks: t.List[typeshed.PrefixCommandCheck] = dataclasses.field(default_factory=list) + slash_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) + message_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) + user_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) - Plugins can be constructed via :meth:`.with_metadata` to provide extra - information to the plugin. + loops: t.List[tasks.Loop[t.Any]] = dataclasses.field(default_factory=list) - Parameters - ---------- - name: Optional[:class:`str`] - The name of the plugin. Defaults to the module the plugin is created in. - category: Optional[:class:`str`] - The category this plugin belongs to. Does not serve any actual purpose, - but may be useful in organising plugins. + listeners: t.Dict[str, t.List[typeshed.CoroFunc]] = dataclasses.field(default_factory=dict) - .. deprecated:: 0.2.4 - Use ``extras`` instead. - command_attrs: Dict[:class:`str`, Any] - A dict of parameters to apply to each prefix command in this plugin. - message_command_attrs: Dict[:class:`str`, Any] - A dict of parameters to apply to each message command in this plugin. - slash_command_attrs: Dict[:class:`str`, Any] - A dict of parameters to apply to each slash command in this plugin. - user_command_attrs: Dict[:class:`str`, Any] - A dict of parameters to apply to each user command in this plugin. - logger: Optional[Union[:class:`logging.Logger`, :class:`str`]] - The logger or its name to use when logging plugin events. - If not specified, defaults to `disnake.ext.plugins.plugin`. - **extras: Dict[:class:`str`, Any] - A dict of extra metadata for this plugin. - """ + def update(self, other: PluginStorage[typeshed.PluginT]) -> None: + """Update this PluginStorage with another, merging their container dicts and lists.""" + self.commands.update(other.commands) + self.message_commands.update(other.message_commands) + self.slash_commands.update(other.slash_commands) + self.user_commands.update(other.user_commands) + + self.command_checks.extend(other.command_checks) + self.slash_command_checks.extend(other.slash_command_checks) + self.message_command_checks.extend(other.message_command_checks) + self.user_command_checks.extend(other.user_command_checks) + self.loops.extend(other.loops) + + for event, callbacks in other.listeners.items(): + if event not in self.listeners: + self.listeners[event] = callbacks + else: + self.listeners[event].extend(callbacks) + + +# The PluginBase holds the logic to register commands etc. to the plugin. +# Since this is relevant for both actual plugins and sub plugins, this is a +# separate base class that can be inherited by both. + +class PluginBase(typeshed.PluginProtocol[typeshed.BotT]): __slots__ = ( "metadata", - "logger", - "_bot", - "_commands", - "_slash_commands", - "_message_commands", - "_user_commands", - "_command_checks", - "_slash_command_checks", - "_message_command_checks", - "_user_command_checks", - "_listeners", - "_loops", - "_pre_load_hooks", - "_post_load_hooks", - "_pre_unload_hooks", - "_post_unload_hooks", + "_storage", ) metadata: PluginMetadata """The metadata assigned to the plugin.""" - logger: logging.Logger - """The logger associated with this plugin.""" - - @t.overload - def __init__( - self: Plugin[commands.Bot], - *, - name: t.Optional[str] = None, - command_attrs: t.Optional[CommandParams] = None, - message_command_attrs: t.Optional[AppCommandParams] = None, - slash_command_attrs: t.Optional[SlashCommandParams] = None, - user_command_attrs: t.Optional[AppCommandParams] = None, - logger: t.Union[logging.Logger, str, None] = None, - **extras: t.Any, # noqa: ANN401 - ) -> None: - ... - - @t.overload def __init__( self, *, name: t.Optional[str] = None, - command_attrs: t.Optional[CommandParams] = None, - message_command_attrs: t.Optional[AppCommandParams] = None, - slash_command_attrs: t.Optional[SlashCommandParams] = None, - user_command_attrs: t.Optional[AppCommandParams] = None, - logger: t.Union[logging.Logger, str, None] = None, + command_attrs: t.Optional[typeshed.CommandParams] = None, + message_command_attrs: t.Optional[typeshed.AppCommandParams] = None, + slash_command_attrs: t.Optional[typeshed.SlashCommandParams] = None, + user_command_attrs: t.Optional[typeshed.AppCommandParams] = None, **extras: t.Any, # noqa: ANN401 - ) -> None: - ... - - def __init__( - self, - *, - name: t.Optional[str] = None, - command_attrs: t.Optional[CommandParams] = None, - message_command_attrs: t.Optional[AppCommandParams] = None, - slash_command_attrs: t.Optional[SlashCommandParams] = None, - user_command_attrs: t.Optional[AppCommandParams] = None, - logger: t.Union[logging.Logger, str, None] = None, - **extras: t.Any, ) -> None: self.metadata: PluginMetadata = PluginMetadata( - name=name or _get_source_module_name(), + name=name or utils.get_source_module_name(), command_attrs=command_attrs or {}, message_command_attrs=message_command_attrs or {}, slash_command_attrs=slash_command_attrs or {}, @@ -339,38 +196,11 @@ def __init__( extras=extras, ) - if logger is not None: - if isinstance(logger, str): - logger = logging.getLogger(logger) - - else: - logger = LOGGER - - self.logger = logger - - self._commands: t.Dict[str, commands.Command[Self, t.Any, t.Any]] = {} # type: ignore - self._message_commands: t.Dict[str, commands.InvokableMessageCommand] = {} - self._slash_commands: t.Dict[str, commands.InvokableSlashCommand] = {} - self._user_commands: t.Dict[str, commands.InvokableUserCommand] = {} - - self._command_checks: t.MutableSequence[PrefixCommandCheck] = [] - self._slash_command_checks: t.MutableSequence[AppCommandCheck] = [] - self._message_command_checks: t.MutableSequence[AppCommandCheck] = [] - self._user_command_checks: t.MutableSequence[AppCommandCheck] = [] - - self._listeners: t.Dict[str, t.MutableSequence[CoroFunc]] = {} - self._loops: t.List[tasks.Loop[t.Any]] = [] - - # These are mainly here to easily run async code at (un)load time - # while we wait for disnake's async refactor. These will probably be - # left in for lower disnake versions, though they may be removed someday. - - self._pre_load_hooks: t.MutableSequence[EmptyAsync] = [] - self._post_load_hooks: t.MutableSequence[EmptyAsync] = [] - self._pre_unload_hooks: t.MutableSequence[EmptyAsync] = [] - self._post_unload_hooks: t.MutableSequence[EmptyAsync] = [] + self._storage: PluginStorage[Self] = PluginStorage[Self]() - self._bot: t.Optional[BotT] = None + @property + def bot(self) -> typeshed.BotT: + raise NotImplementedError @classmethod def with_metadata(cls, metadata: PluginMetadata) -> Self: @@ -391,20 +221,9 @@ def with_metadata(cls, metadata: PluginMetadata) -> Self: self.metadata = metadata return self - @property - def bot(self) -> BotT: - """The bot on which this plugin is registered. - - This will only be available after calling :meth:`.load`. - """ - if not self._bot: - msg = "Cannot access the bot on a plugin that has not yet been loaded." - raise RuntimeError(msg) - return self._bot - @property def name(self) -> str: - """The name of this plugin.""" + # << docstring inherited from typeshed.PluginProtocol>> return self.metadata.name @property @@ -424,10 +243,7 @@ def category(self) -> t.Optional[str]: @property def extras(self) -> t.Dict[str, t.Any]: - """A dict of extra metadata for this plugin. - - .. versionadded:: 0.2.4 - """ + # << docstring inherited from typeshed.PluginProtocol>> return self.metadata.extras @extras.setter @@ -436,28 +252,28 @@ def extras(self, value: t.Dict[str, t.Any]) -> None: @property def commands(self) -> t.Sequence[commands.Command[Self, t.Any, t.Any]]: # type: ignore - """All prefix commands registered in this plugin.""" - return tuple(self._commands.values()) + # << docstring inherited from typeshed.PluginProtocol>> + return tuple(self._storage.commands.values()) @property def slash_commands(self) -> t.Sequence[commands.InvokableSlashCommand]: - """All slash commands registered in this plugin.""" - return tuple(self._slash_commands.values()) + # << docstring inherited from typeshed.PluginProtocol>> + return tuple(self._storage.slash_commands.values()) @property def user_commands(self) -> t.Sequence[commands.InvokableUserCommand]: - """All user commands registered in this plugin.""" - return tuple(self._user_commands.values()) + # << docstring inherited from typeshed.PluginProtocol>> + return tuple(self._storage.user_commands.values()) @property def message_commands(self) -> t.Sequence[commands.InvokableMessageCommand]: - """All message commands registered in this plugin.""" - return tuple(self._message_commands.values()) + # << docstring inherited from typeshed.PluginProtocol>> + return tuple(self._storage.message_commands.values()) @property def loops(self) -> t.Sequence[tasks.Loop[t.Any]]: - """All loops registered to this plugin.""" - return tuple(self._loops) + # << docstring inherited from typeshed.PluginProtocol>> + return tuple(self._storage.loops) def _apply_attrs( self, @@ -481,53 +297,20 @@ def command( *, cls: t.Optional[t.Type[commands.Command[t.Any, t.Any, t.Any]]] = None, **kwargs: t.Any, # noqa: ANN401 - ) -> CoroDecorator[AnyCommand]: - """Transform a function into a :class:`commands.Command`. - - By default the ``help`` attribute is received automatically from the - docstring of the function and is cleaned up with the use of - ``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded - into :class:`str` using utf-8 encoding. - - All checks added using the :func:`commands.check` & co. decorators are - added into the function. There is no way to supply your own checks - through this decorator. - - Parameters - ---------- - name: :class:`str` - The name to create the command with. By default this uses the - function name unchanged. - cls: - The class to construct with. By default this is - :class:`commands.Command`. You usually do not change this. - **kwargs: - Keyword arguments to pass into the construction of the class denoted - by ``cls``. - - Returns - ------- - Callable[..., :class:`commands.Command`] - A decorator that converts the provided method into a - :class:`commands.Command` or a derivative, and returns it. - - Raises - ------ - TypeError - The function is not a coroutine or is already a command. - """ + ) -> typeshed.CoroDecorator[typeshed.AnyCommand]: + # << docstring inherited from typeshed.PluginProtocol>> attributes = self._apply_attrs(self.metadata.command_attrs, **kwargs) if cls is None: - cls = t.cast(t.Type[AnyCommand], attributes.pop("cls", AnyCommand)) + cls = t.cast(t.Type[typeshed.AnyCommand], attributes.pop("cls", commands.Command)) - def decorator(callback: t.Callable[..., Coro[t.Any]]) -> AnyCommand: + def decorator(callback: t.Callable[..., typeshed.Coro[t.Any]]) -> typeshed.AnyCommand: if not asyncio.iscoroutinefunction(callback): msg = f"<{callback.__qualname__}> must be a coroutine function." raise TypeError(msg) command = cls(callback, name=name or callback.__name__, **attributes) - self._commands[command.qualified_name] = command + self._storage.commands[command.qualified_name] = command return command @@ -539,41 +322,14 @@ def group( *, cls: t.Optional[t.Type[commands.Group[t.Any, t.Any, t.Any]]] = None, **kwargs: t.Any, # noqa: ANN401 - ) -> CoroDecorator[AnyGroup]: - """Transform a function into a :class:`commands.Group`. - - This is similar to the :func:`commands.command` decorator but the - ``cls`` parameter is set to :class:`Group` by default. - - Parameters - ---------- - name: :class:`str` - The name to create the group with. By default this uses the - function name unchanged. - cls: - The class to construct with. By default this is - :class:`commands.Group`. You usually do not change this. - **kwargs: - Keyword arguments to pass into the construction of the class denoted - by ``cls``. - - Returns - ------- - Callable[..., :class:`commands.Group`] - A decorator that converts the provided method into a - :class:`commands.Group` or a derivative, and returns it. - - Raises - ------ - TypeError - The function is not a coroutine or is already a command. - """ + ) -> typeshed.CoroDecorator[typeshed.AnyGroup]: + # << docstring inherited from typeshed.PluginProtocol>> attributes = self._apply_attrs(self.metadata.command_attrs, **kwargs) if cls is None: - cls = t.cast(t.Type[AnyGroup], attributes.pop("cls", AnyGroup)) + cls = t.cast(t.Type[typeshed.AnyGroup], attributes.pop("cls", commands.Group)) - def decorator(callback: t.Callable[..., Coro[t.Any]]) -> AnyGroup: + def decorator(callback: t.Callable[..., typeshed.Coro[t.Any]]) -> typeshed.AnyGroup: if not asyncio.iscoroutinefunction(callback): msg = f"<{callback.__qualname__}> must be a coroutine function." raise TypeError(msg) @@ -591,51 +347,15 @@ def slash_command( self, *, auto_sync: t.Optional[bool] = None, - name: LocalizedOptional = None, - description: LocalizedOptional = None, + name: typeshed.LocalizedOptional = None, + description: typeshed.LocalizedOptional = None, dm_permission: t.Optional[bool] = None, - default_member_permissions: PermissionsOptional = None, + default_member_permissions: typeshed.PermissionsOptional = None, guild_ids: t.Optional[t.Sequence[int]] = None, connectors: t.Optional[t.Dict[str, str]] = None, extras: t.Optional[t.Dict[str, t.Any]] = None, - ) -> CoroDecorator[commands.InvokableSlashCommand]: - """Transform a function into a slash command. - - Parameters - ---------- - auto_sync: :class:`bool` - Whether to automatically register the command. Defaults to ``True``. - name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] - The name of the slash command (defaults to function name). - description: Optional[Union[:class:`str`, :class:`disnake.Localized`]] - The description of the slash command. It will be visible in Discord. - dm_permission: :class:`bool` - Whether this command can be used in DMs. - Defaults to ``True``. - default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] - The default required permissions for this command. - See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. - guild_ids: List[:class:`int`] - If specified, the client will register the command in these guilds. - Otherwise, this command will be registered globally. - connectors: Dict[:class:`str`, :class:`str`] - Binds function names to option names. If the name - of an option already matches the corresponding function param, - you don't have to specify the connectors. Connectors template: - ``{"option-name": "param_name", ...}``. - If you're using :ref:`param_syntax`, you don't need to specify this. - extras: Dict[:class:`str`, Any] - A dict of user provided extras to attach to the command. - - .. note:: - This object may be copied by the library. - - Returns - ------- - Callable[..., :class:`commands.InvokableSlashCommand`] - A decorator that converts the provided method into a - :class:`commands.InvokableSlashCommand` and returns it. - """ + ) -> typeshed.CoroDecorator[commands.InvokableSlashCommand]: + # << docstring inherited from typeshed.PluginProtocol>> attributes = self._apply_attrs( self.metadata.slash_command_attrs, description=description, @@ -647,7 +367,9 @@ def slash_command( extras=extras, ) - def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableSlashCommand: + def decorator( + callback: t.Callable[..., typeshed.Coro[t.Any]], + ) -> commands.InvokableSlashCommand: if not asyncio.iscoroutinefunction(callback): msg = f"<{callback.__qualname__}> must be a coroutine function." raise TypeError(msg) @@ -657,7 +379,7 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableSlash name=name or callback.__name__, **attributes, ) - self._slash_commands[command.qualified_name] = command + self._storage.slash_commands[command.qualified_name] = command return command @@ -666,42 +388,14 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableSlash def user_command( self, *, - name: LocalizedOptional = None, + name: typeshed.LocalizedOptional = None, dm_permission: t.Optional[bool] = None, - default_member_permissions: PermissionsOptional = None, + default_member_permissions: typeshed.PermissionsOptional = None, auto_sync: t.Optional[bool] = None, guild_ids: t.Optional[t.Sequence[int]] = None, extras: t.Optional[t.Dict[str, t.Any]] = None, - ) -> CoroDecorator[commands.InvokableUserCommand]: - """Transform a function into a user command. - - Parameters - ---------- - name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] - The name of the user command (defaults to the function name). - dm_permission: :class:`bool` - Whether this command can be used in DMs. - Defaults to ``True``. - default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] - The default required permissions for this command. - See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. - auto_sync: :class:`bool` - Whether to automatically register the command. Defaults to ``True``. - guild_ids: Sequence[:class:`int`] - If specified, the client will register the command in these guilds. - Otherwise, this command will be registered globally. - extras: Dict[:class:`str`, Any] - A dict of user provided extras to attach to the command. - - .. note:: - This object may be copied by the library. - - Returns - ------- - Callable[..., :class:`commands.InvokableUserCommand`] - A decorator that converts the provided method into a - :class:`commands.InvokableUserCommand` and returns it. - """ + ) -> typeshed.CoroDecorator[commands.InvokableUserCommand]: + # << docstring inherited from typeshed.PluginProtocol>> attributes = self._apply_attrs( self.metadata.user_command_attrs, dm_permission=dm_permission, @@ -711,7 +405,9 @@ def user_command( extras=extras, ) - def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableUserCommand: + def decorator( + callback: t.Callable[..., typeshed.Coro[t.Any]], + ) -> commands.InvokableUserCommand: if not asyncio.iscoroutinefunction(callback): msg = f"<{callback.__qualname__}> must be a coroutine function." raise TypeError(msg) @@ -721,7 +417,7 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableUserC name=name or callback.__name__, **attributes, ) - self._user_commands[command.qualified_name] = command + self._storage.user_commands[command.qualified_name] = command return command @@ -730,42 +426,14 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableUserC def message_command( self, *, - name: LocalizedOptional = None, + name: typeshed.LocalizedOptional = None, dm_permission: t.Optional[bool] = None, - default_member_permissions: PermissionsOptional = None, + default_member_permissions: typeshed.PermissionsOptional = None, auto_sync: t.Optional[bool] = None, guild_ids: t.Optional[t.Sequence[int]] = None, extras: t.Optional[t.Dict[str, t.Any]] = None, - ) -> CoroDecorator[commands.InvokableMessageCommand]: - """Transform a function into a message command. - - Parameters - ---------- - name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] - The name of the message command (defaults to the function name). - dm_permission: :class:`bool` - Whether this command can be used in DMs. - Defaults to ``True``. - default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] - The default required permissions for this command. - See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. - auto_sync: :class:`bool` - Whether to automatically register the command. Defaults to ``True``. - guild_ids: Sequence[:class:`int`] - If specified, the client will register the command in these guilds. - Otherwise, this command will be registered globally. - extras: Dict[:class:`str`, Any] - A dict of user provided extras to attach to the command. - - .. note:: - This object may be copied by the library. - - Returns - ------- - Callable[..., :class:`commands.InvokableMessageCommand`] - A decorator that converts the provided method into an - :class:`commands.InvokableMessageCommand` and then returns it. - """ + ) -> typeshed.CoroDecorator[commands.InvokableMessageCommand]: + # << docstring inherited from typeshed.PluginProtocol>> attributes = self._apply_attrs( self.metadata.user_command_attrs, dm_permission=dm_permission, @@ -775,7 +443,9 @@ def message_command( extras=extras, ) - def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableMessageCommand: + def decorator( + callback: t.Callable[..., typeshed.Coro[t.Any]], + ) -> commands.InvokableMessageCommand: if not asyncio.iscoroutinefunction(callback): msg = f"<{callback.__qualname__}> must be a coroutine function." raise TypeError(msg) @@ -785,7 +455,7 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableMessa name=name or callback.__name__, **attributes, ) - self._message_commands[command.qualified_name] = command + self._storage.message_commands[command.qualified_name] = command return command @@ -793,48 +463,186 @@ def decorator(callback: t.Callable[..., Coro[t.Any]]) -> commands.InvokableMessa # Checks - def command_check(self, predicate: PrefixCommandCheckT) -> PrefixCommandCheckT: - """Add a `commands.check` to all prefix commands on this plugin.""" - self._command_checks.append(predicate) + def command_check( + self, + predicate: typeshed.PrefixCommandCheckT, + ) -> typeshed.PrefixCommandCheckT: + # << docstring inherited from typeshed.PluginProtocol>> + self._storage.command_checks.append(predicate) return predicate - def slash_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: - """Add a `commands.check` to all slash commands on this plugin.""" - self._slash_command_checks.append(predicate) + def slash_command_check( + self, + predicate: typeshed.AppCommandCheckT, + ) -> typeshed.AppCommandCheckT: + # << docstring inherited from typeshed.PluginProtocol>> + self._storage.slash_command_checks.append(predicate) return predicate - def message_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: - """Add a `commands.check` to all message commands on this plugin.""" - self._message_command_checks.append(predicate) + def message_command_check( + self, + predicate: typeshed.AppCommandCheckT, + ) -> typeshed.AppCommandCheckT: + # << docstring inherited from typeshed.PluginProtocol>> + self._storage.message_command_checks.append(predicate) return predicate - def user_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: - """Add a `commands.check` to all user commands on this plugin.""" - self._user_command_checks.append(predicate) + def user_command_check( + self, + predicate: typeshed.AppCommandCheckT, + ) -> typeshed.AppCommandCheckT: + # << docstring inherited from typeshed.PluginProtocol>> + self._storage.user_command_checks.append(predicate) return predicate # Listeners - def add_listeners(self, *callbacks: CoroFunc, event: t.Optional[str] = None) -> None: - """Add multiple listeners to the plugin. - - Parameters - ---------- - *callbacks: Callable[..., Any] - The callbacks to add as listeners for this plugin. - event: :class:`str` - The name of a single event to register all callbacks under. If not provided, - the callbacks will be registered individually based on function's name. - """ + def add_listeners( + self, + *callbacks: typeshed.CoroFunc, + event: t.Optional[str] = None, + ) -> None: + # << docstring inherited from typeshed.PluginProtocol>> for callback in callbacks: key = callback.__name__ if event is None else event - self._listeners.setdefault(key, []).append(callback) + self._storage.listeners.setdefault(key, []).append(callback) - def listener(self, event: t.Optional[str] = None) -> t.Callable[[CoroFuncT], CoroFuncT]: - """Register a function as a listener on this plugin. + def listener( + self, + event: t.Optional[str] = None, + ) -> t.Callable[[typeshed.CoroFuncT], typeshed.CoroFuncT]: + # << docstring inherited from typeshed.PluginProtocol>> + def decorator(callback: typeshed.CoroFuncT) -> typeshed.CoroFuncT: + self.add_listeners(callback, event=event) + return callback - This is the plugin equivalent of :meth:`commands.Bot.listen`. + return decorator + # Tasks + + def register_loop( + self, + *, + wait_until_ready: bool = False, + ) -> t.Callable[[typeshed.LoopT], typeshed.LoopT]: + # << docstring inherited from typeshed.PluginProtocol>> + def decorator(loop: typeshed.LoopT) -> typeshed.LoopT: + if wait_until_ready: + if loop._before_loop is not None: # noqa: SLF001 + msg = "This loop already has a `before_loop` callback registered." + raise TypeError(msg) + + async def _before_loop() -> None: + await self.bot.wait_until_ready() + + loop.before_loop(_before_loop) + + self._storage.loops.append(loop) + return loop + + return decorator + + +# The actual Plugin implementation adds loading/unloading behaviour to the base. +# For the user's convenience, we also provide easy access to custom loggers. + +class Plugin(PluginBase[typeshed.BotT]): + """An extension manager similar to disnake's :class:`commands.Cog`. + + A plugin can hold commands and listeners, and supports being loaded through + `bot.load_extension()` as per usual, and can similarly be unloaded and + reloaded. + + Plugins can be constructed via :meth:`.with_metadata` to provide extra + information to the plugin. + + Parameters + ---------- + name: Optional[:class:`str`] + The name of the plugin. Defaults to the module the plugin is created in. + category: Optional[:class:`str`] + The category this plugin belongs to. Does not serve any actual purpose, + but may be useful in organising plugins. + + .. deprecated:: 0.2.4 + Use ``extras`` instead. + command_attrs: Dict[:class:`str`, Any] + A dict of parameters to apply to each prefix command in this plugin. + message_command_attrs: Dict[:class:`str`, Any] + A dict of parameters to apply to each message command in this plugin. + slash_command_attrs: Dict[:class:`str`, Any] + A dict of parameters to apply to each slash command in this plugin. + user_command_attrs: Dict[:class:`str`, Any] + A dict of parameters to apply to each user command in this plugin. + logger: Optional[Union[:class:`logging.Logger`, :class:`str`]] + The logger or its name to use when logging plugin events. + If not specified, defaults to `disnake.ext.plugins.plugin`. + **extras: Dict[:class:`str`, Any] + A dict of extra metadata for this plugin. + """ + + __slots__ = ( + "logger", + "_bot", + "_pre_load_hooks", + "_post_load_hooks", + "_pre_unload_hooks", + "_post_unload_hooks", + ) + + logger: logging.Logger + """The logger associated with this plugin.""" + + def __init__( + self, + *, + name: t.Optional[str] = None, + command_attrs: t.Optional[typeshed.CommandParams] = None, + message_command_attrs: t.Optional[typeshed.AppCommandParams] = None, + slash_command_attrs: t.Optional[typeshed.SlashCommandParams] = None, + user_command_attrs: t.Optional[typeshed.AppCommandParams] = None, + logger: t.Union[logging.Logger, str, None] = None, + **extras: t.Any, # noqa: ANN401 + ) -> None: + super().__init__( + name=name, + command_attrs=command_attrs, + message_command_attrs=message_command_attrs, + slash_command_attrs=slash_command_attrs, + user_command_attrs=user_command_attrs, + **extras, + ) + + if logger is not None: + if isinstance(logger, str): + logger = logging.getLogger(logger) + + else: + logger = LOGGER + + self.logger = logger + + # These are mainly here to easily run async code at (un)load time + # while we wait for disnake's async refactor. These will probably be + # left in for lower disnake versions, though they may be removed someday. + + self._pre_load_hooks: t.MutableSequence[typeshed.EmptyAsync] = [] + self._post_load_hooks: t.MutableSequence[typeshed.EmptyAsync] = [] + self._pre_unload_hooks: t.MutableSequence[typeshed.EmptyAsync] = [] + self._post_unload_hooks: t.MutableSequence[typeshed.EmptyAsync] = [] + + self._bot: t.Optional[typeshed.BotT] = None + + @property + def bot(self) -> typeshed.BotT: + """The bot on which this plugin is registered. + + This will only be available after calling :meth:`.load` on the plugin. + """ + if not self._bot: + msg = "Cannot access the bot on a plugin that has not yet been loaded." + raise RuntimeError(msg) + return self._bot Parameters ---------- event: :class:`str` @@ -889,8 +697,8 @@ async def _before_loop() -> None: # TODO: Maybe make this a standalone function instead of a staticmethod. @staticmethod def _prepend_plugin_checks( - checks: t.Sequence[t.Union[PrefixCommandCheck, AppCommandCheck]], - command: CheckAware, + checks: t.Sequence[t.Union[typeshed.PrefixCommandCheck, typeshed.AppCommandCheck]], + command: typeshed.CheckAware, ) -> None: """Handle updating checks with plugin-wide checks. @@ -902,7 +710,7 @@ def _prepend_plugin_checks( if checks: command.checks = [*checks, *command.checks] - async def load(self, bot: BotT) -> None: + async def load(self, bot: typeshed.BotT) -> None: """Register commands to the bot and run pre- and post-load hooks. Parameters @@ -915,27 +723,27 @@ async def load(self, bot: BotT) -> None: await asyncio.gather(*(hook() for hook in self._pre_load_hooks)) if isinstance(bot, commands.BotBase): - for command in self._commands.values(): + for command in self.commands: bot.add_command(command) # type: ignore - self._prepend_plugin_checks(self._command_checks, command) + self._prepend_plugin_checks(self._storage.command_checks, command) - for command in self._slash_commands.values(): + for command in self.slash_commands: bot.add_slash_command(command) - self._prepend_plugin_checks(self._slash_command_checks, command) + self._prepend_plugin_checks(self._storage.slash_command_checks, command) - for command in self._user_commands.values(): + for command in self.user_commands: bot.add_user_command(command) - self._prepend_plugin_checks(self._user_command_checks, command) + self._prepend_plugin_checks(self._storage.user_command_checks, command) - for command in self._message_commands.values(): + for command in self._storage.message_commands.values(): bot.add_message_command(command) - self._prepend_plugin_checks(self._message_command_checks, command) + self._prepend_plugin_checks(self._storage.message_command_checks, command) - for event, listeners in self._listeners.items(): + for event, listeners in self._storage.listeners.items(): for listener in listeners: bot.add_listener(listener, event) - for loop in self._loops: + for loop in self.loops: loop.start() await asyncio.gather(*(hook() for hook in self._post_load_hooks)) @@ -944,7 +752,7 @@ async def load(self, bot: BotT) -> None: self.logger.info("Successfully loaded plugin %r", self.metadata.name) - async def unload(self, bot: BotT) -> None: + async def unload(self, bot: typeshed.BotT) -> None: """Remove commands from the bot and run pre- and post-unload hooks. Parameters @@ -955,23 +763,23 @@ async def unload(self, bot: BotT) -> None: await asyncio.gather(*(hook() for hook in self._pre_unload_hooks)) if isinstance(bot, commands.BotBase): - for command in self._commands: + for command in self._storage.commands: bot.remove_command(command) - for command in self._slash_commands: + for command in self._storage.slash_commands: bot.remove_slash_command(command) - for command in self._user_commands: + for command in self._storage.user_commands: bot.remove_user_command(command) - for command in self._message_commands: + for command in self._storage.message_commands: bot.remove_message_command(command) - for event, listeners in self._listeners.items(): + for event, listeners in self._storage.listeners.items(): for listener in listeners: bot.remove_listener(listener, event) - for loop in self._loops: + for loop in self.loops: loop.cancel() await asyncio.gather(*(hook() for hook in self._post_unload_hooks)) @@ -980,7 +788,11 @@ async def unload(self, bot: BotT) -> None: self.logger.info("Successfully unloaded plugin %r", self.metadata.name) - def load_hook(self, *, post: bool = False) -> t.Callable[[EmptyAsync], EmptyAsync]: + def load_hook( + self, + *, + post: bool = False, + ) -> t.Callable[[typeshed.EmptyAsync], typeshed.EmptyAsync]: """Mark a function as a load hook. .. versionchanged:: 0.2.4 @@ -993,13 +805,17 @@ def load_hook(self, *, post: bool = False) -> t.Callable[[EmptyAsync], EmptyAsyn """ hooks = self._post_load_hooks if post else self._pre_load_hooks - def wrapper(callback: EmptyAsync) -> EmptyAsync: + def wrapper(callback: typeshed.EmptyAsync) -> typeshed.EmptyAsync: hooks.append(callback) return callback return wrapper - def unload_hook(self, *, post: bool = False) -> t.Callable[[EmptyAsync], EmptyAsync]: + def unload_hook( + self, + *, + post: bool = False, + ) -> t.Callable[[typeshed.EmptyAsync], typeshed.EmptyAsync]: """Mark a function as an unload hook. .. versionchanged:: 0.2.4 @@ -1012,23 +828,25 @@ def unload_hook(self, *, post: bool = False) -> t.Callable[[EmptyAsync], EmptyAs """ hooks = self._post_unload_hooks if post else self._pre_unload_hooks - def wrapper(callback: EmptyAsync) -> EmptyAsync: + def wrapper(callback: typeshed.EmptyAsync) -> typeshed.EmptyAsync: hooks.append(callback) return callback return wrapper - def create_extension_handlers(self) -> t.Tuple[SetupFunc[BotT], SetupFunc[BotT]]: + def create_extension_handlers( + self, + ) -> t.Tuple[typeshed.SetupFunc[typeshed.BotT], typeshed.SetupFunc[typeshed.BotT]]: """Create basic setup and teardown handlers for an extension. Simply put, these functions ensure :meth:`.load` and :meth:`.unload` are called when the plugin is loaded or unloaded, respectively. """ - def setup(bot: BotT) -> None: - async_utils.safe_task(self.load(bot)) + def setup(bot: typeshed.BotT) -> None: + utils.safe_task(self.load(bot)) - def teardown(bot: BotT) -> None: - async_utils.safe_task(self.unload(bot)) + def teardown(bot: typeshed.BotT) -> None: + utils.safe_task(self.unload(bot)) return setup, teardown diff --git a/src/disnake/ext/plugins/typeshed.py b/src/disnake/ext/plugins/typeshed.py new file mode 100644 index 0000000..3575f67 --- /dev/null +++ b/src/disnake/ext/plugins/typeshed.py @@ -0,0 +1,427 @@ +"""Module that houses all typing related to plugins.""" + +from __future__ import annotations + +import typing as t + +import disnake +import typing_extensions +from disnake.ext import commands + +if t.TYPE_CHECKING: + from disnake.ext import tasks + +T = t.TypeVar("T") +P = typing_extensions.ParamSpec("P") + +AnyBot = t.Union[ + commands.Bot, + commands.AutoShardedBot, + commands.InteractionBot, + commands.AutoShardedInteractionBot, +] +BotT = typing_extensions.TypeVar("BotT", bound=AnyBot, default=AnyBot) +PluginT = typing_extensions.TypeVar( + "PluginT", + bound="PluginProtocol[t.Any]", + default="PluginProtocol[AnyBot]", +) + +Coro = t.Coroutine[t.Any, t.Any, T] +MaybeCoro = t.Union[Coro[T], T] +EmptyAsync = t.Callable[[], Coro[None]] +SetupFunc = t.Callable[[BotT], None] + +AnyCommand = commands.Command[t.Any, t.Any, t.Any] +AnyGroup = commands.Group[t.Any, t.Any, t.Any] + +CoroFunc = t.Callable[..., Coro[t.Any]] +CoroFuncT = t.TypeVar("CoroFuncT", bound=CoroFunc) +CoroDecorator = t.Callable[[CoroFunc], T] + +LocalizedOptional = t.Union[t.Optional[str], disnake.Localized[t.Optional[str]]] +PermissionsOptional = t.Optional[t.Union[disnake.Permissions, int]] + +LoopT = t.TypeVar("LoopT", bound="tasks.Loop[t.Any]") + +PrefixCommandCheck = t.Callable[[commands.Context[t.Any]], MaybeCoro[bool]] +AppCommandCheck = t.Callable[[disnake.CommandInteraction], MaybeCoro[bool]] + +PrefixCommandCheckT = t.TypeVar("PrefixCommandCheckT", bound=PrefixCommandCheck) +AppCommandCheckT = t.TypeVar("AppCommandCheckT", bound=AppCommandCheck) + +TypeT_co = t.TypeVar("TypeT_co", commands.SubCommandGroup, commands.SubCommand, covariant=True) + + +class ExtrasAware(t.Protocol): + """A protocol that matches any object that implements extras.""" + + extras: t.Dict[str, t.Any] + + +class CheckAware(t.Protocol): + """A protocol that matches any object that implements checks.""" + + checks: t.List[t.Callable[..., MaybeCoro[bool]]] + + +class CommandParams(t.TypedDict, total=False): + """A :class:`TypedDict` with all the parameters to a :class:`commands.Command`.""" + + help: str + brief: str + usage: str + enabled: bool + description: str + hidden: bool + ignore_extra: bool + cooldown_after_parsing: bool + extras: t.Dict[str, t.Any] + + +class AppCommandParams(t.TypedDict, total=False): + """A :class:`TypedDict` with all the parameters to any kind of application command.""" + + auto_sync: bool + dm_permission: bool + default_member_permissions: PermissionsOptional + guild_ids: t.Sequence[int] + extras: t.Dict[str, t.Any] + + +class SlashCommandParams(AppCommandParams, total=False): + """A :class:`TypedDict` with all the parameters to a :class:`commands.InvokableSlashCommand`.""" + + description: LocalizedOptional + connectors: t.Dict[str, str] + + +class PluginProtocol(t.Protocol[BotT]): + """Protocol for Plugin-like classes.""" + + @property + def bot(self) -> BotT: + """The bot to which this plugin is registered.""" + ... + + @property + def name(self) -> str: + """The name of this sub plugin.""" + ... + + @property + def extras(self) -> t.Dict[str, t.Any]: + """A dict of extra metadata for this plugin. + + .. versionadded:: 0.2.4 + """ + ... + + @property + def commands(self) -> t.Sequence[commands.Command[PluginProtocol[BotT], t.Any, t.Any]]: # type: ignore + """All prefix commands registered in this plugin.""" + ... + + @property + def slash_commands(self) -> t.Sequence[commands.InvokableSlashCommand]: + """All slash commands registered in this plugin.""" + ... + + @property + def user_commands(self) -> t.Sequence[commands.InvokableUserCommand]: + """All user commands registered in this plugin.""" + ... + + @property + def message_commands(self) -> t.Sequence[commands.InvokableMessageCommand]: + """All message commands registered in this plugin.""" + ... + + @property + def loops(self) -> t.Sequence[tasks.Loop[t.Any]]: + """All loops registered to this plugin.""" + ... + + # Prefix commands + + def command( + self, + name: t.Optional[str] = None, + *, + cls: t.Optional[t.Type[commands.Command[t.Any, t.Any, t.Any]]] = None, + **kwargs: t.Any, # noqa: ANN401 + ) -> CoroDecorator[AnyCommand]: + """Transform a function into a :class:`commands.Command`. + + By default the ``help`` attribute is received automatically from the + docstring of the function and is cleaned up with the use of + ``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded + into :class:`str` using utf-8 encoding. + + All checks added using the :func:`commands.check` & co. decorators are + added into the function. There is no way to supply your own checks + through this decorator. + + Parameters + ---------- + name: :class:`str` + The name to create the command with. By default this uses the + function name unchanged. + cls: + The class to construct with. By default this is + :class:`commands.Command`. You usually do not change this. + **kwargs: + Keyword arguments to pass into the construction of the class denoted + by ``cls``. + + Returns + ------- + Callable[..., :class:`commands.Command`] + A decorator that converts the provided method into a + :class:`commands.Command` or a derivative, and returns it. + + Raises + ------ + TypeError + The function is not a coroutine or is already a command. + """ + ... + + def group( + self, + name: t.Optional[str] = None, + *, + cls: t.Optional[t.Type[commands.Group[t.Any, t.Any, t.Any]]] = None, + **kwargs: t.Any, # noqa: ANN401 + ) -> CoroDecorator[AnyGroup]: + """Transform a function into a :class:`commands.Group`. + + This is similar to the :func:`commands.command` decorator but the + ``cls`` parameter is set to :class:`Group` by default. + + Parameters + ---------- + name: :class:`str` + The name to create the group with. By default this uses the + function name unchanged. + cls: + The class to construct with. By default this is + :class:`commands.Group`. You usually do not change this. + **kwargs: + Keyword arguments to pass into the construction of the class denoted + by ``cls``. + + Returns + ------- + Callable[..., :class:`commands.Group`] + A decorator that converts the provided method into a + :class:`commands.Group` or a derivative, and returns it. + + Raises + ------ + TypeError + The function is not a coroutine or is already a command. + """ + ... + + # Application commands + + def slash_command( + self, + *, + auto_sync: t.Optional[bool] = None, + name: LocalizedOptional = None, + description: LocalizedOptional = None, + dm_permission: t.Optional[bool] = None, + default_member_permissions: PermissionsOptional = None, + guild_ids: t.Optional[t.Sequence[int]] = None, + connectors: t.Optional[t.Dict[str, str]] = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + ) -> CoroDecorator[commands.InvokableSlashCommand]: + """Transform a function into a slash command. + + Parameters + ---------- + auto_sync: :class:`bool` + Whether to automatically register the command. Defaults to ``True``. + name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] + The name of the slash command (defaults to function name). + description: Optional[Union[:class:`str`, :class:`disnake.Localized`]] + The description of the slash command. It will be visible in Discord. + dm_permission: :class:`bool` + Whether this command can be used in DMs. + Defaults to ``True``. + default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] + The default required permissions for this command. + See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. + guild_ids: List[:class:`int`] + If specified, the client will register the command in these guilds. + Otherwise, this command will be registered globally. + connectors: Dict[:class:`str`, :class:`str`] + Binds function names to option names. If the name + of an option already matches the corresponding function param, + you don't have to specify the connectors. Connectors template: + ``{"option-name": "param_name", ...}``. + If you're using :ref:`param_syntax`, you don't need to specify this. + extras: Dict[:class:`str`, Any] + A dict of user provided extras to attach to the command. + + .. note:: + This object may be copied by the library. + + Returns + ------- + Callable[..., :class:`commands.InvokableSlashCommand`] + A decorator that converts the provided method into a + :class:`commands.InvokableSlashCommand` and returns it. + """ + ... + + def user_command( + self, + *, + name: LocalizedOptional = None, + dm_permission: t.Optional[bool] = None, + default_member_permissions: PermissionsOptional = None, + auto_sync: t.Optional[bool] = None, + guild_ids: t.Optional[t.Sequence[int]] = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + ) -> CoroDecorator[commands.InvokableUserCommand]: + """Transform a function into a user command. + + Parameters + ---------- + name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] + The name of the user command (defaults to the function name). + dm_permission: :class:`bool` + Whether this command can be used in DMs. + Defaults to ``True``. + default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] + The default required permissions for this command. + See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. + auto_sync: :class:`bool` + Whether to automatically register the command. Defaults to ``True``. + guild_ids: Sequence[:class:`int`] + If specified, the client will register the command in these guilds. + Otherwise, this command will be registered globally. + extras: Dict[:class:`str`, Any] + A dict of user provided extras to attach to the command. + + .. note:: + This object may be copied by the library. + + Returns + ------- + Callable[..., :class:`commands.InvokableUserCommand`] + A decorator that converts the provided method into a + :class:`commands.InvokableUserCommand` and returns it. + """ + ... + + def message_command( + self, + *, + name: LocalizedOptional = None, + dm_permission: t.Optional[bool] = None, + default_member_permissions: PermissionsOptional = None, + auto_sync: t.Optional[bool] = None, + guild_ids: t.Optional[t.Sequence[int]] = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + ) -> CoroDecorator[commands.InvokableMessageCommand]: + """Transform a function into a message command. + + Parameters + ---------- + name: Optional[Union[:class:`str`, :class:`disnake.Localized`]] + The name of the message command (defaults to the function name). + dm_permission: :class:`bool` + Whether this command can be used in DMs. + Defaults to ``True``. + default_member_permissions: Optional[Union[:class:`disnake.Permissions`, :class:`int`]] + The default required permissions for this command. + See :attr:`disnake.ApplicationCommand.default_member_permissions` for details. + auto_sync: :class:`bool` + Whether to automatically register the command. Defaults to ``True``. + guild_ids: Sequence[:class:`int`] + If specified, the client will register the command in these guilds. + Otherwise, this command will be registered globally. + extras: Dict[:class:`str`, Any] + A dict of user provided extras to attach to the command. + + .. note:: + This object may be copied by the library. + + Returns + ------- + Callable[..., :class:`commands.InvokableMessageCommand`] + A decorator that converts the provided method into an + :class:`commands.InvokableMessageCommand` and then returns it. + """ + ... + + # Checks + + def command_check(self, predicate: PrefixCommandCheckT) -> PrefixCommandCheckT: + """Add a `commands.check` to all prefix commands on this plugin.""" + ... + + def slash_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: + """Add a `commands.check` to all slash commands on this plugin.""" + ... + + def message_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: + """Add a `commands.check` to all message commands on this plugin.""" + ... + + def user_command_check(self, predicate: AppCommandCheckT) -> AppCommandCheckT: + """Add a `commands.check` to all user commands on this plugin.""" + ... + + # Listeners + + def add_listeners(self, *callbacks: CoroFunc, event: t.Optional[str] = None) -> None: + """Add multiple listeners to the plugin. + + Parameters + ---------- + *callbacks: Callable[..., Any] + The callbacks to add as listeners for this plugin. + event: :class:`str` + The name of a single event to register all callbacks under. If not provided, + the callbacks will be registered individually based on function's name. + """ + ... + + def listener(self, event: t.Optional[str] = None) -> t.Callable[[CoroFuncT], CoroFuncT]: + """Register a function as a listener on this plugin. + + This is the plugin equivalent of :meth:`commands.Bot.listen`. + + Parameters + ---------- + event: :class:`str` + The name of the event being listened to. If not provided, it + defaults to the function's name. + """ + ... + + # Tasks + + def register_loop(self, *, wait_until_ready: bool = False) -> t.Callable[[LoopT], LoopT]: + """Register a `tasks.Loop` to this plugin. + + Loops registered in this way will automatically start and stop as the + plugin is loaded and unloaded, respectively. + + Parameters + ---------- + wait_until_ready: :class:`bool` + Whether or not to add a simple `before_loop` callback that waits + until the bot is ready. This can be handy if you load plugins before + you start the bot (which you should!) and make api requests with a + loop. + .. warn:: + This only works if the loop does not already have a `before_loop` + callback registered. + """ + ... diff --git a/src/disnake/ext/plugins/utils.py b/src/disnake/ext/plugins/utils.py new file mode 100644 index 0000000..c47295a --- /dev/null +++ b/src/disnake/ext/plugins/utils.py @@ -0,0 +1,84 @@ +"""Utilities for plguins.""" + +import asyncio +import logging +import pathlib +import typing as t + +_LOGGER = logging.getLogger(__name__) + +# asyncio.Task isn't subscriptable in py 3.8, so we do this "workaround" to +# make it subscriptable and compatible with inspect.signature etc. +# This probably isn't necessary but everything for our users, eh? + +if t.TYPE_CHECKING: + Task = asyncio.Task +else: + T = t.TypeVar("T") + Task = t._alias(asyncio.Task, T) # noqa: SLF001 + +__all__: t.Sequence[str] = ("safe_task",) + + +_tasks: t.Set["Task[t.Any]"] = set() + + +def safe_task( + coroutine: t.Coroutine[t.Any, t.Any, t.Any], +) -> "Task[t.Any]": + """Create an asyncio background task without risk of it being GC'd.""" + task = asyncio.create_task(coroutine) + + _tasks.add(task) + task.add_done_callback(_tasks.discard) + + return task + + +# We don't want to use stdlib files (typing) or disnake.ext.plugins to name the plugin. +_INVALID: t.Final[t.Sequence[pathlib.Path]] = ( + pathlib.Path(t.__file__).parent.resolve(), + pathlib.Path(__file__).parent.resolve(), +) + + +def _is_valid_path(path: str) -> bool: + to_check = pathlib.Path(path).resolve() + for invalid_path in _INVALID: + if invalid_path == to_check or invalid_path in to_check.parents: + return False + + return True + + +def get_source_module_name() -> str: + """Get current frame from exception traceback.""" + # We ignore ruff here because we need to raise and immediately catch an + # exception to figure out our stack level. + try: + raise Exception # noqa: TRY002, TRY301 + except Exception as exc: # noqa: BLE001 + tb = exc.__traceback__ + + if not tb: + # No traceback, therefore can't access frames and infer plugin name... + _LOGGER.warning("Failed to infer file name, defaulting to 'plugin'.") + return "plugin" + + # Navigate all frames for one with a valid path. + # Note that we explicitly filter out: + # - the stdlib typing module; if the generic parameter is specified, this + # will be encountered before the target module. + # - this file; we don't want to just return "plugin" if possible. + frame = tb.tb_frame + while frame := frame.f_back: + if _is_valid_path(frame.f_code.co_filename): + break + + else: + _LOGGER.warning("Failed to infer file name, defaulting to 'plugin'.") + return "plugin" + + module_name = frame.f_locals["__name__"] + _LOGGER.debug("Module name resolved to %r", module_name) + return module_name From ca51c837a3fe5e36b5afabeffcab284810eabbc6 Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:31:29 +0200 Subject: [PATCH 3/8] feat: implement subplugins --- src/disnake/ext/plugins/plugin.py | 106 ++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index e10d716..8718534 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -633,6 +633,7 @@ def __init__( self._bot: t.Optional[typeshed.BotT] = None + self._sub_plugins: t.Set[SubPlugin[t.Any]] = set() @property def bot(self) -> typeshed.BotT: """The bot on which this plugin is registered. @@ -643,18 +644,31 @@ def bot(self) -> typeshed.BotT: msg = "Cannot access the bot on a plugin that has not yet been loaded." raise RuntimeError(msg) return self._bot + + # Subplugin registration + + def register_sub_plugin(self, sub_plugin: SubPlugin[t.Any]) -> None: + """Register a :class:`SubPlugin` to this plugin. + + This registers all commands, slash commands, listeners, loops, etc. + that are registered on the sub-plugin to this Plugin. + + When this Plugin is unloaded, all sub-plugins are automatically + unloaded along with it. + Parameters ---------- - event: :class:`str` - The name of the event being listened to. If not provided, it - defaults to the function's name. + sub_plugin: + The :class:`SubPlugin` that is to be registered. """ + self._sub_plugins.add(sub_plugin) + self._storage.update(sub_plugin._storage) # noqa: SLF001 def decorator(callback: CoroFuncT) -> CoroFuncT: self.add_listeners(callback, event=event) return callback - return decorator + sub_plugin.bind(self) # Tasks @@ -850,3 +864,87 @@ def teardown(bot: typeshed.BotT) -> None: utils.safe_task(self.unload(bot)) return setup, teardown + + +class SubPlugin(PluginBase[typeshed.BotT]): + """Bean.""" + + __slots__: t.Sequence[str] = ( + "_plugin", + "_command_placeholders", + ) + + _plugin: t.Optional[Plugin[typeshed.BotT]] + _command_placeholders: t.Dict[ + str, + t.List[t.Union[placeholder.SubCommandPlaceholder, placeholder.SubCommandGroupPlaceholder]], + ] + + def __init__( + self, + *, + name: t.Optional[str] = None, + command_attrs: t.Optional[typeshed.CommandParams] = None, + message_command_attrs: t.Optional[typeshed.AppCommandParams] = None, + slash_command_attrs: t.Optional[typeshed.SlashCommandParams] = None, + user_command_attrs: t.Optional[typeshed.AppCommandParams] = None, + **extras: t.Any, # noqa: ANN401 + ) -> None: + super().__init__( + name=name, + command_attrs=command_attrs, + message_command_attrs=message_command_attrs, + slash_command_attrs=slash_command_attrs, + user_command_attrs=user_command_attrs, + **extras, + ) + self._plugin = None + self._command_placeholders = {} + + def bind(self, plugin: Plugin[typeshed.BotT]) -> None: + """Bind a main Plugin to this SubPlugin. + + This generally doesn't need to be called manually. Instead, use + :meth:`Plugin.register_sub_plugin`, which will call this automatically. + + Arguments + --------- + plugin: + The Plugin to bind to this SubPlugin. + """ + if self._plugin: + msg = f"This subplugin is already bound to a Plugin named {self._plugin.name!r}" + raise RuntimeError(msg) + + self._plugin = plugin + + @property + def plugin(self) -> Plugin[typeshed.BotT]: + """The Plugin of which this is a SubPlugin. + + This is only available after the sub plugin was bound to a :class:`Plugin`. + """ + if self._plugin: + return self._plugin + + msg = f"The SubPlugin named {self.name!r} has not yet been bound to a Plugin." + raise RuntimeError(msg) + + @property + def bot(self) -> typeshed.BotT: + """The Plugin of which this is a SubPlugin. + + This is only available after the sub plugin was bound to a :class:`Plugin`, + and that plugin was loaded via :meth:`Plugin.load`. + """ + return self.plugin.bot + + # TODO: Maybe allow setting a separate logger here? + @property + def logger(self) -> logging.Logger: + """The logger of the Plugin of which this is a SubPlugin. + + This is only available after the sub plugin was bound to a :class:`Plugin`. + """ + return self.plugin.logger + From 93ea81784263b5f0f0b44e0c5ba865e6f9ccc53a Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:32:07 +0200 Subject: [PATCH 4/8] feat: implement getters on plugins --- src/disnake/ext/plugins/plugin.py | 42 +++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index 8718534..824456b 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -542,6 +542,48 @@ async def _before_loop() -> None: return decorator + # Getters + + def get_command(self, name: str) -> t.Optional[commands.Command[Self, t.Any, t.Any]]: # pyright: ignore + part, _, name = name.strip().partition(" ") + command = self._storage.commands.get(name) + + while name: + if not isinstance(command, commands.GroupMixin): + msg = ( + f"Got name {name!r}, indicating a Group with a sub-Command, but" + f" command {part!r} is not a Group." + ) + raise TypeError(msg) + + part, _, name = name.partition(" ") + command: t.Optional[typeshed.AnyCommand] = command.get_command(name) + + return command + + def get_slash_command(self, name: str) -> t.Union[ + commands.InvokableSlashCommand, + commands.SubCommandGroup, + commands.SubCommand, + None, + ]: + chain = name.strip().split() + length = len(chain) + + slash = self._storage.slash_commands.get(chain[0]) + if not slash or length == 1: + return slash + + if length == 2: # noqa: PLR2004 + return slash.children.get(chain[1]) + + if length == 3: # noqa: PLR2004 + group = slash.children.get(chain[1]) + if isinstance(group, commands.SubCommandGroup): + return group.children.get(chain[2]) + + return None + # The actual Plugin implementation adds loading/unloading behaviour to the base. # For the user's convenience, we also provide easy access to custom loggers. From 4efe5263a136b5ebe034e355fc538d37edae7e34 Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:32:26 +0200 Subject: [PATCH 5/8] feat: implement subplugins --- src/disnake/ext/plugins/placeholder.py | 403 +++++++++++++++++++++++++ src/disnake/ext/plugins/plugin.py | 246 +++++++++++++-- 2 files changed, 620 insertions(+), 29 deletions(-) create mode 100644 src/disnake/ext/plugins/placeholder.py diff --git a/src/disnake/ext/plugins/placeholder.py b/src/disnake/ext/plugins/placeholder.py new file mode 100644 index 0000000..d7c1352 --- /dev/null +++ b/src/disnake/ext/plugins/placeholder.py @@ -0,0 +1,403 @@ +"""Module containing slash command placeholder objects.""" + +import abc +import typing as t + +import disnake +from disnake.ext import commands + +from . import typeshed + +__all__: t.Sequence[str] = ("SubCommandPlaceholder", "SubCommandGroupPlaceholder") + +# Copium as we can't do super().__init__ with a Protocol. +def _shared_init( + self: "CommandPlaceholderABC[t.Any]", + func: typeshed.CoroFunc, + parent_name: str, + /, + *, + name: typeshed.LocalizedOptional, + **kwargs: t.Any, # noqa: ANN401 +) -> None: + self._func = func + self.parent_name = parent_name.strip() + + # This is how disnake does it so we follow suit. + name_loc = disnake.Localized._cast(name, required=False) # noqa: SLF001 + self._name = name_loc.string or func.__name__ + + # This will be None until `set_parent` is called. + self._command = None + + # These will be passed to the sub_command(_group) decorator. + self._kwargs = kwargs + + +class CommandPlaceholderABC(abc.ABC, t.Generic[typeshed.TypeT_co]): + """Base class for command placeholders. + + This is essentially only used internally as a shared logic container for + :class:`SubCommandGroupPlaceholder` and :class:`SubCommandPlaceholder`, + and should generally not need to be used externally. + """ + + __slots__: t.Sequence[str] = ("_command", "_func", "_kwargs", "_name", "parent_name") + + _command: t.Optional[typeshed.TypeT_co] + _func: typeshed.CoroFunc + _kwargs: t.Dict[str, t.Any] + _name: str + + @t.overload + @abc.abstractmethod + def set_parent( + self: "CommandPlaceholderABC[commands.SubCommand]", + parent: t.Union[commands.InvokableSlashCommand, commands.SubCommandGroup], + ) -> None: + ... + + @t.overload + @abc.abstractmethod + def set_parent( + self: "CommandPlaceholderABC[commands.SubCommandGroup]", + parent: commands.InvokableSlashCommand, + ) -> None: + ... + + @abc.abstractmethod + def set_parent( + self, + parent: t.Union[commands.InvokableSlashCommand, commands.SubCommandGroup], + ) -> None: + raise NotImplementedError + + @property + def command(self) -> typeshed.TypeT_co: + if self._command: + return self._command + + msg = ( + "Cannot access attributes of a placeholder SubCommand(Group)" + " without first setting its parent command." + ) + raise RuntimeError(msg) + + @property + def name(self) -> str: + return self._name + + @property + def qualified_name(self) -> str: + return f"{self.parent_name} {self._name}" + + @property + def docstring(self) -> str: + return self.docstring + + @property + def root_parent(self) -> commands.InvokableSlashCommand: + return self.command.root_parent + + +class SubCommandPlaceholder(CommandPlaceholderABC[commands.SubCommand]): + """A placeholder for a slash subcommand. + + This class allows to define a subcommand in a different file than the + parent command. + + Most of the attributes on this class cannot be used until a parent is set + using :meth:`.set_parent`. This is done automatically when the Plugin to + which this placeholder is registered is loaded into the bot. + + This class should generally not be instantiated directly. Instead, create + instances of this class through :meth:`SubPlugin.external_sub_command`. + """ + + __slots__: t.Sequence[str] = ("_deferred_autocompleters",) + + _deferred_autocompleters: t.Dict[str, t.Callable[..., t.Any]] + + def __init__( + self, + func: typeshed.CoroFunc, + parent_name: str, + /, + *, + name: typeshed.LocalizedOptional = None, + **kwargs: t.Any, # noqa: ANN401 + ) -> None: + _shared_init(self, func, parent_name, name=name, **kwargs) + self._deferred_autocompleters = {} + + def set_parent( + self, + parent: t.Union[commands.InvokableSlashCommand, commands.SubCommandGroup], + ) -> None: + """Set the parent command of this subcommand placeholder. + + This finalises the :class:`disnake.SubCommand` and makes it available + through :obj:`.command`. After doing this, all properties that proxy to + the underlying command become available. + + Parameters + ---------- + parent: + The parent command that this subcommand was a placeholder for. + """ + self._command = parent.sub_command(**self._kwargs)(self._func) # type: ignore + + if not self._deferred_autocompleters: + return + + # Populate autocompleters... + options = {option.name: option for option in self.body.options} + + for option_name, autocompleter in self._deferred_autocompleters.items(): + if option_name not in options: + msg = f"Option {option_name!r} doesn't exist in '{self.qualified_name}'." + raise ValueError(msg) + + self._command.autocompleters[option_name] = autocompleter + options[option_name].autocomplete = True + + # TODO: If people ask for this, make fiels like description available + # before setting the parent command. + @property + def description(self) -> str: + """The description of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.description + + @property + def connectors(self) -> t.Dict[str, str]: + """The connectors of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.connectors + + @property + def autocompleters(self) -> t.Dict[str, t.Any]: + """The autocompleters of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.autocompleters + + @property + def parent(self) -> t.Union[commands.InvokableSlashCommand, commands.SubCommandGroup]: + """The parent of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.parent + + @property + def parents( + self, + ) -> t.Union[ + t.Tuple[commands.InvokableSlashCommand], + t.Tuple[commands.SubCommandGroup, commands.InvokableSlashCommand], + ]: + """The parents of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.parents + + @property + def body(self) -> disnake.Option: + """The underlying representation of this subcommand. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.body + + # TODO: Maybe use ParamSpec here. + async def invoke( + self, + inter: disnake.CommandInteraction, + *args: t.Any, # noqa: ANN401 + **kwargs: t.Any, # noqa: ANN401 + ) -> None: + """Invoke the slash command, running its callback and any converters. + + Parameters + ---------- + inter: + The interaction with which to run the command. + *args: + The positional arguments required to run the callback. + **kwargs: + The keyword arguments required to run the callback. + """ + await self.command.invoke(inter, *args, **kwargs) # pyright: ignore + + def autocomplete( + self, + option_name: str, + ) -> t.Callable[[typeshed.CoroFuncT], typeshed.CoroFuncT]: + """Register an autocomplete function for the specified option. + + Parameters + ---------- + option_name: + The name of the option for which to add an autocomplete. This has + to match the name of an option on this subcommand. + + Returns + ------- + Callable[[Callable], Callable] + A decorator that adds the wrapped function as an autocomplete + function to this subcommand. + """ + if self._command: + return self._command.autocomplete(option_name) # pyright: ignore + + def decorator(func: typeshed.CoroFuncT) -> typeshed.CoroFuncT: + self._deferred_autocompleters[option_name] = func + return func + + return decorator + + +class SubCommandGroupPlaceholder(CommandPlaceholderABC[commands.SubCommandGroup]): + """A placeholder for a slash subcommand group. + + This class allows to define a subcommand group in a different file than the + parent command. + + Most of the attributes on this class cannot be used until a parent is set + using :meth:`.set_parent`. This is done automatically when the Plugin to + which this placeholder is registered is loaded into the bot. + + This class should generally not be instantiated directly. Instead, create + instances of this class through :meth:`SubPlugin.external_sub_command_group`. + """ + + __slots__: t.Sequence[str] = ("_subcommand_placeholders",) + + _subcommand_placeholders: t.List[SubCommandPlaceholder] + + def __init__( + self, + func: typeshed.CoroFunc, + parent_name: str, + /, + *, + name: typeshed.LocalizedOptional = None, + **kwargs: t.Any, # noqa: ANN401 + ) -> None: + _shared_init(self, func, parent_name, name=name, **kwargs) + self._subcommand_placeholders = [] + + def set_parent(self, parent: commands.InvokableSlashCommand) -> None: + """Set the parent command of this subcommand group placeholder. + + This finalises the :class:`disnake.SubCommandGroup` and makes it + available through :obj:`.command`. After doing this, all properties + that proxy to the underlying command become available. + + Parameters + ---------- + parent: + The parent command that this subcommand was a placeholder for. + """ + self._command = parent.sub_command_group(**self._kwargs)(self._func) # type: ignore + + for subcommand in self._subcommand_placeholders: + subcommand.set_parent(self._command) + + @property + def parent(self) -> commands.InvokableSlashCommand: + """The parent of this subcommand group. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.parent + + @property + def parents(self) -> t.Tuple[commands.InvokableSlashCommand]: + """The parents of this subcommand group. + + This is only available after setting a parent command using + :meth:`.set_parent`. + """ + return self.command.parents + + def sub_command( + self, + name: typeshed.LocalizedOptional = None, + description: typeshed.LocalizedOptional = None, + options: t.Optional[t.List[disnake.Option]] = None, + connectors: t.Optional[t.Dict[str, str]] = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + **kwargs: t.Any, # noqa: ANN401 + ) -> t.Callable[[typeshed.CoroFunc], SubCommandPlaceholder]: + """Wrap a callable to create a subcommand in this group. + + As the parent command may not yet exist, this decorator returns a + placeholder object. The placeholder object has properties proxying to + the parent command that become available as soon as :meth:`set_parent` + is called. This is done automatically when the plugin to which this + group is registered is loaded into the bot. + + Parameters + ---------- + parent_name: + The name of the parent :class:`InvokableSlashCommand` or + :class:`SubCommandGroup` to which this subcommand should be + registered. + name: + The name of this subcommand. If not provided, this will use the + name of the decorated function. + description: + The description of this command. If not provided, this will use the + docstring of the decorated function. + connectors: + A mapping of option names to function parameter names, mainly for + internal processes. + extras: + Any extras that are to be stored on the subcommand. + + Returns + ------- + Callable[..., :class:`SubCommandPlaceholder`] + A decorator that converts the provided method into a + :class:`SubCommandGroupPlaceholder` and returns it. + """ + + def create_placeholder(func: typeshed.CoroFunc) -> SubCommandPlaceholder: + placeholder = SubCommandPlaceholder( + func, + self.qualified_name, + name=name, + description=description, + options=options, + connectors=connectors, + extras=extras, + **kwargs, + ) + self._subcommand_placeholders.append(placeholder) + return placeholder + + if self._command: + + def create_and_bind_placeholder(func: typeshed.CoroFunc) -> SubCommandPlaceholder: + placeholder = create_placeholder(func) + placeholder.set_parent(self.command) + return placeholder + + return create_and_bind_placeholder + + return create_placeholder diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index 824456b..b8362e5 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -11,7 +11,7 @@ from disnake.ext import commands from typing_extensions import Self -from . import typeshed, utils +from . import placeholder, typeshed, utils if t.TYPE_CHECKING: from disnake.ext import tasks @@ -630,6 +630,8 @@ class Plugin(PluginBase[typeshed.BotT]): "_post_load_hooks", "_pre_unload_hooks", "_post_unload_hooks", + "_sub_plugins", + "_placeholders", ) logger: logging.Logger @@ -676,6 +678,16 @@ def __init__( self._bot: t.Optional[typeshed.BotT] = None self._sub_plugins: t.Set[SubPlugin[t.Any]] = set() + self._placeholders: t.Dict[ + str, + t.List[ + t.Union[ + placeholder.SubCommandPlaceholder, + placeholder.SubCommandGroupPlaceholder, + ] + ], + ] = {} + @property def bot(self) -> typeshed.BotT: """The bot on which this plugin is registered. @@ -706,47 +718,97 @@ def register_sub_plugin(self, sub_plugin: SubPlugin[t.Any]) -> None: self._sub_plugins.add(sub_plugin) self._storage.update(sub_plugin._storage) # noqa: SLF001 - def decorator(callback: CoroFuncT) -> CoroFuncT: - self.add_listeners(callback, event=event) - return callback + for key, new_placeholders in sub_plugin._command_placeholders.items(): # noqa: SLF001 + if key in self._placeholders: + self._placeholders[key].extend(new_placeholders) + else: + self._placeholders[key] = new_placeholders sub_plugin.bind(self) - # Tasks + def merge_placeholders(self, *, final: bool) -> None: + """Merge the placeholders registered to this Plugin with its commands. - def register_loop(self, *, wait_until_ready: bool = False) -> t.Callable[[LoopT], LoopT]: - """Register a `tasks.Loop` to this plugin. + You generally do not need to manually call this method, as it is + automatically called when the plugin is loaded. This is done after + pre-load hooks run. - Loops registered in this way will automatically start and stop as the - plugin is loaded and unloaded, respectively. + Calls to this method remove the internally stored placeholders as they + are successfully merged, so repeated calls are not a slowdown. Parameters ---------- - wait_until_ready: :class:`bool` - Whether or not to add a simple `before_loop` callback that waits - until the bot is ready. This can be handy if you load plugins before - you start the bot (which you should!) and make api requests with a - loop. - .. warn:: - This only works if the loop does not already have a `before_loop` - callback registered. + final: + If set to ``True``, ALL placeholders MUST successfully merge with a + parent command. Otherwise, any placeholders that fail to merge + are simply left as-is until a future call successfully merges them. + When :meth:`.load` automatically calls this, ``final`` is set to + ``True``. + + Raises + ------ + RuntimeError: + ``final`` was set to ``True`` and a placeholder failed to merge. + TypeError: + The provided combination of parent command and placeholder command + do not merge to a valid :class:`SubCommandGroup` or + :class:`SubCommand`. """ + # We sort to make sure we don't ever attempt to register a subcommand + # before registering its subcommand group. + # E.g. we guarantee "foo bar" is added before "foo bar baz" because + # sorted() will return the former first. + sorted_keys = sorted(self._placeholders) + + for name in sorted_keys: + placeholders = self._placeholders.pop(name) + + parent = self.get_slash_command(name) + if not parent: + if final: + msg = ( + f"Command placeholder {placeholders[0].qualified_name!r}" + f" could not be finalised as no slash command with name" + f" {name!r} is registered to plugin {self.name!r}." + ) + raise RuntimeError(msg) + + # This should happen very infrequently as we don't expect users + # to call this manually (with final=False) often, so it's + # probably more efficient to always pop and reinsert in this + # rare case. + self._placeholders[name] = placeholders + continue + + if isinstance(parent, commands.SubCommand): + msg = ( + f"Command {parent.name!r} cannot have subcommands as it is" + "itself is a subcommand." + ) + raise TypeError(msg) - def decorator(loop: LoopT) -> LoopT: - if wait_until_ready: - if loop._before_loop is not None: # noqa: SLF001 - msg = "This loop already has a `before_loop` callback registered." + is_group = isinstance(parent, commands.SubCommandGroup) + + for placeholder_ in placeholders: + # We've already checked that no SubCommands are used as parent, + # so the only possible parents are of type InvokableSlashCommand + # or SubCommandGroup. + # The only possible placeholders are of type SubCommandGroup or + # Subcommand. + # Therefore, the only thing we need to prevent is mistakenly + # trying to register a SubCommandGroup to another SubCommandGroup. + if is_group and isinstance(placeholder_, placeholder.SubCommandGroupPlaceholder): + msg = ( + f"Cannot finalise placeholder SubCommandGroup {placeholder_.name!r}" + f" by setting its parent to SubCommandGroup {parent.qualified_name!r}." + ) raise TypeError(msg) - async def _before_loop() -> None: - await self.bot.wait_until_ready() - - loop.before_loop(_before_loop) + # We've already validated everything, so this is safe to ignore. + # Ignoring here is considerably less painful than implementing + # more checks just to make this typecheck correctly. Sorry Eric. + placeholder_.set_parent(parent) # pyright: ignore - self._loops.append(loop) - return loop - - return decorator # Plugin (un)loading... @@ -778,6 +840,8 @@ async def load(self, bot: typeshed.BotT) -> None: await asyncio.gather(*(hook() for hook in self._pre_load_hooks)) + self.merge_placeholders(final=True) + if isinstance(bot, commands.BotBase): for command in self.commands: bot.add_command(command) # type: ignore @@ -990,3 +1054,127 @@ def logger(self) -> logging.Logger: """ return self.plugin.logger + # Command placeholders... + + def _add_command_placeholder( + self, + command: t.Union[placeholder.SubCommandPlaceholder, placeholder.SubCommandGroupPlaceholder], + ) -> None: + key = command.parent_name + if key in self._command_placeholders: + self._command_placeholders[key].append(command) + + else: + self._command_placeholders[key] = [command] + + + def external_sub_command_group( + self, + parent_name: str, + *, + name: typeshed.LocalizedOptional = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + ) -> t.Callable[[typeshed.CoroFunc], placeholder.SubCommandGroupPlaceholder]: + """Wrap a callable to create a subcommand group in an external command. + + As the parent command may not yet exist, this decorator returns a + placeholder object. The placeholder object has properties proxying to + the parent command that become available as soon as + :meth:`Plugin.finalise_placeholders` is called. This is done + automatically when it is loaded into the bot. + + .. warning:: + This ONLY works confined within the same :class:`Plugin`. Different + sub-plugins on the same plugin CAN have interdependent placeholders, + but you CANNOT make a placeholder dependent on an entirely + different plugin. + + Parameters + ---------- + parent_name: + The name of the parent :class:`InvokableSlashCommand` to which this + subcommand group should be registered. + name: + The name of this subcommand group. If not provided, this will use + the name of the decorated function. + extras: + Any extras that are to be stored on the command. + + Returns + ------- + Callable[..., :class:`SubCommandGroupPlaceholder`] + A decorator that converts the provided method into a + :class:`SubCommandGroupPlaceholder` and returns it. + """ + def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandGroupPlaceholder: + placeholder_cmd = placeholder.SubCommandGroupPlaceholder( + func, + parent_name, + name=name, + extras=extras, + ) + self._add_command_placeholder(placeholder_cmd) + return placeholder_cmd + + return decorator + + def external_sub_command( + self, + parent_name: str, + *, + name: typeshed.LocalizedOptional = None, + description: typeshed.LocalizedOptional = None, + connectors: t.Optional[t.Dict[str, str]] = None, + extras: t.Optional[t.Dict[str, t.Any]] = None, + ) -> t.Callable[[typeshed.CoroFunc], placeholder.SubCommandPlaceholder]: + """Wrap a callable to create a subcommand in an external command or group. + + As the parent command may not yet exist, this decorator returns a + placeholder object. The placeholder object has properties proxying to + the parent command that become available as soon as + :meth:`Plugin.finalise_placeholders` is called. This is done + automatically when it is loaded into the bot. + + .. warning:: + This ONLY works confined within the same :class:`Plugin`. Different + sub-plugins on the same plugin CAN have interdependent placeholders, + but you CANNOT make a placeholder dependent on an entirely + different plugin. + + Parameters + ---------- + parent_name: + The name of the parent :class:`InvokableSlashCommand` or + :class:`SubCommandGroup` to which this subcommand should be + registered. + name: + The name of this subcommand. If not provided, this will use the + name of the decorated function. + description: + The description of this command. If not provided, this will use the + docstring of the decorated function. + connectors: + A mapping of option names to function parameter names, mainly for + internal processes. + extras: + Any extras that are to be stored on the subcommand. + + Returns + ------- + Callable[..., :class:`SubCommandPlaceholder`] + A decorator that converts the provided method into a + :class:`SubCommandGroupPlaceholder` and returns it. + """ + def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandPlaceholder: + placeholder_cmd = placeholder.SubCommandPlaceholder( + func, + parent_name, + name=name, + description=description, + connectors=connectors, + extras=extras, + ) + self._add_command_placeholder(placeholder_cmd) + return placeholder_cmd + + return decorator From 8ab44d06e356124c37a6434943f3e207fff6f76a Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 02:39:04 +0200 Subject: [PATCH 6/8] chore: make pre-commit pass --- src/disnake/ext/plugins/plugin.py | 38 +++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index b8362e5..4253bda 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -129,21 +129,31 @@ def get_parent_plugin(obj: typeshed.ExtrasAware) -> Plugin[typeshed.AnyBot]: @dataclasses.dataclass class PluginStorage(t.Generic[typeshed.PluginT]): - commands: t.Dict[str, commands.Command[typeshed.PluginT, t.Any, t.Any]] = dataclasses.field(default_factory=dict) # type: ignore - message_commands: t.Dict[str, commands.InvokableMessageCommand] = dataclasses.field(default_factory=dict) - slash_commands: t.Dict[str, commands.InvokableSlashCommand] = dataclasses.field(default_factory=dict) - user_commands: t.Dict[str, commands.InvokableUserCommand] = dataclasses.field(default_factory=dict) + commands: t.Dict[str, commands.Command[typeshed.PluginT, t.Any, t.Any]] = dataclasses.field( # type: ignore + default_factory=dict, + ) + message_commands: t.Dict[str, commands.InvokableMessageCommand] = dataclasses.field( + default_factory=dict, + ) + slash_commands: t.Dict[str, commands.InvokableSlashCommand] = dataclasses.field( + default_factory=dict, + ) + user_commands: t.Dict[str, commands.InvokableUserCommand] = dataclasses.field( + default_factory=dict, + ) command_checks: t.List[typeshed.PrefixCommandCheck] = dataclasses.field(default_factory=list) slash_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) - message_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) + message_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field( + default_factory=list, + ) user_command_checks: t.List[typeshed.AppCommandCheck] = dataclasses.field(default_factory=list) loops: t.List[tasks.Loop[t.Any]] = dataclasses.field(default_factory=list) listeners: t.Dict[str, t.List[typeshed.CoroFunc]] = dataclasses.field(default_factory=dict) - def update(self, other: PluginStorage[typeshed.PluginT]) -> None: + def update(self, other: PluginStorage[t.Any]) -> None: """Update this PluginStorage with another, merging their container dicts and lists.""" self.commands.update(other.commands) self.message_commands.update(other.message_commands) @@ -168,6 +178,7 @@ def update(self, other: PluginStorage[typeshed.PluginT]) -> None: # Since this is relevant for both actual plugins and sub plugins, this is a # separate base class that can be inherited by both. + class PluginBase(typeshed.PluginProtocol[typeshed.BotT]): __slots__ = ( "metadata", @@ -544,7 +555,10 @@ async def _before_loop() -> None: # Getters - def get_command(self, name: str) -> t.Optional[commands.Command[Self, t.Any, t.Any]]: # pyright: ignore + def get_command( + self, + name: str, + ) -> t.Optional[commands.Command[Self, t.Any, t.Any]]: # pyright: ignore part, _, name = name.strip().partition(" ") command = self._storage.commands.get(name) @@ -561,7 +575,10 @@ def get_command(self, name: str) -> t.Optional[commands.Command[Self, t.Any, t.A return command - def get_slash_command(self, name: str) -> t.Union[ + def get_slash_command( + self, + name: str, + ) -> t.Union[ commands.InvokableSlashCommand, commands.SubCommandGroup, commands.SubCommand, @@ -588,6 +605,7 @@ def get_slash_command(self, name: str) -> t.Union[ # The actual Plugin implementation adds loading/unloading behaviour to the base. # For the user's convenience, we also provide easy access to custom loggers. + class Plugin(PluginBase[typeshed.BotT]): """An extension manager similar to disnake's :class:`commands.Cog`. @@ -809,7 +827,6 @@ def merge_placeholders(self, *, final: bool) -> None: # more checks just to make this typecheck correctly. Sorry Eric. placeholder_.set_parent(parent) # pyright: ignore - # Plugin (un)loading... # TODO: Maybe make this a standalone function instead of a staticmethod. @@ -1067,7 +1084,6 @@ def _add_command_placeholder( else: self._command_placeholders[key] = [command] - def external_sub_command_group( self, parent_name: str, @@ -1106,6 +1122,7 @@ def external_sub_command_group( A decorator that converts the provided method into a :class:`SubCommandGroupPlaceholder` and returns it. """ + def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandGroupPlaceholder: placeholder_cmd = placeholder.SubCommandGroupPlaceholder( func, @@ -1165,6 +1182,7 @@ def external_sub_command( A decorator that converts the provided method into a :class:`SubCommandGroupPlaceholder` and returns it. """ + def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandPlaceholder: placeholder_cmd = placeholder.SubCommandPlaceholder( func, From c96eb99553daa20beccf8b077c42f68753b73e3f Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 22:39:41 +0200 Subject: [PATCH 7/8] feat: add additional validation --- src/disnake/ext/plugins/plugin.py | 36 +++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/disnake/ext/plugins/plugin.py b/src/disnake/ext/plugins/plugin.py index 4253bda..4caa2fd 100644 --- a/src/disnake/ext/plugins/plugin.py +++ b/src/disnake/ext/plugins/plugin.py @@ -779,7 +779,7 @@ def merge_placeholders(self, *, final: bool) -> None: sorted_keys = sorted(self._placeholders) for name in sorted_keys: - placeholders = self._placeholders.pop(name) + placeholders = self._placeholders[name] parent = self.get_slash_command(name) if not parent: @@ -791,17 +791,12 @@ def merge_placeholders(self, *, final: bool) -> None: ) raise RuntimeError(msg) - # This should happen very infrequently as we don't expect users - # to call this manually (with final=False) often, so it's - # probably more efficient to always pop and reinsert in this - # rare case. - self._placeholders[name] = placeholders continue if isinstance(parent, commands.SubCommand): msg = ( f"Command {parent.name!r} cannot have subcommands as it is" - "itself is a subcommand." + "itself a subcommand." ) raise TypeError(msg) @@ -819,6 +814,7 @@ def merge_placeholders(self, *, final: bool) -> None: msg = ( f"Cannot finalise placeholder SubCommandGroup {placeholder_.name!r}" f" by setting its parent to SubCommandGroup {parent.qualified_name!r}." + " (SubCommandGroups cannot have further SubCommandGroups as children.)" ) raise TypeError(msg) @@ -827,6 +823,10 @@ def merge_placeholders(self, *, final: bool) -> None: # more checks just to make this typecheck correctly. Sorry Eric. placeholder_.set_parent(parent) # pyright: ignore + # Merging all commands into the parent was succesful, so we can + # delete the key. + del self._placeholders[name] + # Plugin (un)loading... # TODO: Maybe make this a standalone function instead of a staticmethod. @@ -1122,6 +1122,17 @@ def external_sub_command_group( A decorator that converts the provided method into a :class:`SubCommandGroupPlaceholder` and returns it. """ + # First we validate whether this placeholder even *can* work. The + # sooner and closer to where they make a mistake we can inform a user of + # a mistake, the better. + parent_name = parent_name.strip() + if " " in parent_name: + msg = ( + "Subcommand groups can only be added to top-level slash" + f" commands. The provided parent name {parent_name!r} implies" + "the parent is a subcommand or group." + ) + raise ValueError(msg) def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandGroupPlaceholder: placeholder_cmd = placeholder.SubCommandGroupPlaceholder( @@ -1182,6 +1193,17 @@ def external_sub_command( A decorator that converts the provided method into a :class:`SubCommandGroupPlaceholder` and returns it. """ + # First we validate whether this placeholder even *can* work. The + # sooner and closer to where they make a mistake we can inform a user of + # a mistake, the better. + parent_name = parent_name.strip() + if parent_name.count(" ") > 1: + msg = ( + f"The provided parent name {parent_name!r} implies the parent" + " would itself be a subcommand, but subcommands cannot have" + " children." + ) + raise ValueError(msg) def decorator(func: typeshed.CoroFunc) -> placeholder.SubCommandPlaceholder: placeholder_cmd = placeholder.SubCommandPlaceholder( From be17e76626127021c6ac8e73b71eb6a5eabbfe62 Mon Sep 17 00:00:00 2001 From: Chromosomologist Date: Wed, 11 Oct 2023 23:14:40 +0200 Subject: [PATCH 8/8] fix: adds slots to plugin protocol --- src/disnake/ext/plugins/typeshed.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/disnake/ext/plugins/typeshed.py b/src/disnake/ext/plugins/typeshed.py index 3575f67..e7d55f4 100644 --- a/src/disnake/ext/plugins/typeshed.py +++ b/src/disnake/ext/plugins/typeshed.py @@ -99,6 +99,8 @@ class SlashCommandParams(AppCommandParams, total=False): class PluginProtocol(t.Protocol[BotT]): """Protocol for Plugin-like classes.""" + __slots__: t.Sequence[str] = () + @property def bot(self) -> BotT: """The bot to which this plugin is registered."""