diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py new file mode 100644 index 0000000..d8ee819 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/horizontal_shard.py @@ -0,0 +1,481 @@ +# ext/horizontal_shard.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +"""Horizontal sharding support. + +Defines a rudimental 'horizontal sharding' system which allows a Session to +distribute queries and persistence operations across multiple databases. + +For a usage example, see the :ref:`examples_sharding` example included in +the source distribution. + +.. deepalchemy:: The horizontal sharding extension is an advanced feature, + involving a complex statement -> database interaction as well as + use of semi-public APIs for non-trivial cases. Simpler approaches to + refering to multiple database "shards", most commonly using a distinct + :class:`_orm.Session` per "shard", should always be considered first + before using this more complex and less-production-tested system. + + + +""" +from __future__ import annotations + +from typing import Any +from typing import Callable +from typing import Dict +from typing import Iterable +from typing import Optional +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from .. import event +from .. import exc +from .. import inspect +from .. import util +from ..orm import PassiveFlag +from ..orm._typing import OrmExecuteOptionsParameter +from ..orm.interfaces import ORMOption +from ..orm.mapper import Mapper +from ..orm.query import Query +from ..orm.session import _BindArguments +from ..orm.session import _PKIdentityArgument +from ..orm.session import Session +from ..util.typing import Protocol +from ..util.typing import Self + +if TYPE_CHECKING: + from ..engine.base import Connection + from ..engine.base import Engine + from ..engine.base import OptionEngine + from ..engine.result import IteratorResult + from ..engine.result import Result + from ..orm import LoaderCallableStatus + from ..orm._typing import _O + from ..orm.bulk_persistence import BulkUDCompileState + from ..orm.context import QueryContext + from ..orm.session import _EntityBindKey + from ..orm.session import _SessionBind + from ..orm.session import ORMExecuteState + from ..orm.state import InstanceState + from ..sql import Executable + from ..sql._typing import _TP + from ..sql.elements import ClauseElement + +__all__ = ["ShardedSession", "ShardedQuery"] + +_T = TypeVar("_T", bound=Any) + + +ShardIdentifier = str + + +class ShardChooser(Protocol): + def __call__( + self, + mapper: Optional[Mapper[_T]], + instance: Any, + clause: Optional[ClauseElement], + ) -> Any: ... + + +class IdentityChooser(Protocol): + def __call__( + self, + mapper: Mapper[_T], + primary_key: _PKIdentityArgument, + *, + lazy_loaded_from: Optional[InstanceState[Any]], + execution_options: OrmExecuteOptionsParameter, + bind_arguments: _BindArguments, + **kw: Any, + ) -> Any: ... + + +class ShardedQuery(Query[_T]): + """Query class used with :class:`.ShardedSession`. + + .. legacy:: The :class:`.ShardedQuery` is a subclass of the legacy + :class:`.Query` class. The :class:`.ShardedSession` now supports + 2.0 style execution via the :meth:`.ShardedSession.execute` method. + + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + assert isinstance(self.session, ShardedSession) + + self.identity_chooser = self.session.identity_chooser + self.execute_chooser = self.session.execute_chooser + self._shard_id = None + + def set_shard(self, shard_id: ShardIdentifier) -> Self: + """Return a new query, limited to a single shard ID. + + All subsequent operations with the returned query will + be against the single shard regardless of other state. + + The shard_id can be passed for a 2.0 style execution to the + bind_arguments dictionary of :meth:`.Session.execute`:: + + results = session.execute( + stmt, + bind_arguments={"shard_id": "my_shard"} + ) + + """ + return self.execution_options(_sa_shard_id=shard_id) + + +class ShardedSession(Session): + shard_chooser: ShardChooser + identity_chooser: IdentityChooser + execute_chooser: Callable[[ORMExecuteState], Iterable[Any]] + + def __init__( + self, + shard_chooser: ShardChooser, + identity_chooser: Optional[IdentityChooser] = None, + execute_chooser: Optional[ + Callable[[ORMExecuteState], Iterable[Any]] + ] = None, + shards: Optional[Dict[str, Any]] = None, + query_cls: Type[Query[_T]] = ShardedQuery, + *, + id_chooser: Optional[ + Callable[[Query[_T], Iterable[_T]], Iterable[Any]] + ] = None, + query_chooser: Optional[Callable[[Executable], Iterable[Any]]] = None, + **kwargs: Any, + ) -> None: + """Construct a ShardedSession. + + :param shard_chooser: A callable which, passed a Mapper, a mapped + instance, and possibly a SQL clause, returns a shard ID. This id + may be based off of the attributes present within the object, or on + some round-robin scheme. If the scheme is based on a selection, it + should set whatever state on the instance to mark it in the future as + participating in that shard. + + :param identity_chooser: A callable, passed a Mapper and primary key + argument, which should return a list of shard ids where this + primary key might reside. + + .. versionchanged:: 2.0 The ``identity_chooser`` parameter + supersedes the ``id_chooser`` parameter. + + :param execute_chooser: For a given :class:`.ORMExecuteState`, + returns the list of shard_ids + where the query should be issued. Results from all shards returned + will be combined together into a single listing. + + .. versionchanged:: 1.4 The ``execute_chooser`` parameter + supersedes the ``query_chooser`` parameter. + + :param shards: A dictionary of string shard names + to :class:`~sqlalchemy.engine.Engine` objects. + + """ + super().__init__(query_cls=query_cls, **kwargs) + + event.listen( + self, "do_orm_execute", execute_and_instances, retval=True + ) + self.shard_chooser = shard_chooser + + if id_chooser: + _id_chooser = id_chooser + util.warn_deprecated( + "The ``id_chooser`` parameter is deprecated; " + "please use ``identity_chooser``.", + "2.0", + ) + + def _legacy_identity_chooser( + mapper: Mapper[_T], + primary_key: _PKIdentityArgument, + *, + lazy_loaded_from: Optional[InstanceState[Any]], + execution_options: OrmExecuteOptionsParameter, + bind_arguments: _BindArguments, + **kw: Any, + ) -> Any: + q = self.query(mapper) + if lazy_loaded_from: + q = q._set_lazyload_from(lazy_loaded_from) + return _id_chooser(q, primary_key) + + self.identity_chooser = _legacy_identity_chooser + elif identity_chooser: + self.identity_chooser = identity_chooser + else: + raise exc.ArgumentError( + "identity_chooser or id_chooser is required" + ) + + if query_chooser: + _query_chooser = query_chooser + util.warn_deprecated( + "The ``query_chooser`` parameter is deprecated; " + "please use ``execute_chooser``.", + "1.4", + ) + if execute_chooser: + raise exc.ArgumentError( + "Can't pass query_chooser and execute_chooser " + "at the same time." + ) + + def _default_execute_chooser( + orm_context: ORMExecuteState, + ) -> Iterable[Any]: + return _query_chooser(orm_context.statement) + + if execute_chooser is None: + execute_chooser = _default_execute_chooser + + if execute_chooser is None: + raise exc.ArgumentError( + "execute_chooser or query_chooser is required" + ) + self.execute_chooser = execute_chooser + self.__shards: Dict[ShardIdentifier, _SessionBind] = {} + if shards is not None: + for k in shards: + self.bind_shard(k, shards[k]) + + def _identity_lookup( + self, + mapper: Mapper[_O], + primary_key_identity: Union[Any, Tuple[Any, ...]], + identity_token: Optional[Any] = None, + passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, + lazy_loaded_from: Optional[InstanceState[Any]] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Union[Optional[_O], LoaderCallableStatus]: + """override the default :meth:`.Session._identity_lookup` method so + that we search for a given non-token primary key identity across all + possible identity tokens (e.g. shard ids). + + .. versionchanged:: 1.4 Moved :meth:`.Session._identity_lookup` from + the :class:`_query.Query` object to the :class:`.Session`. + + """ + + if identity_token is not None: + obj = super()._identity_lookup( + mapper, + primary_key_identity, + identity_token=identity_token, + **kw, + ) + + return obj + else: + for shard_id in self.identity_chooser( + mapper, + primary_key_identity, + lazy_loaded_from=lazy_loaded_from, + execution_options=execution_options, + bind_arguments=dict(bind_arguments) if bind_arguments else {}, + ): + obj2 = super()._identity_lookup( + mapper, + primary_key_identity, + identity_token=shard_id, + lazy_loaded_from=lazy_loaded_from, + **kw, + ) + if obj2 is not None: + return obj2 + + return None + + def _choose_shard_and_assign( + self, + mapper: Optional[_EntityBindKey[_O]], + instance: Any, + **kw: Any, + ) -> Any: + if instance is not None: + state = inspect(instance) + if state.key: + token = state.key[2] + assert token is not None + return token + elif state.identity_token: + return state.identity_token + + assert isinstance(mapper, Mapper) + shard_id = self.shard_chooser(mapper, instance, **kw) + if instance is not None: + state.identity_token = shard_id + return shard_id + + def connection_callable( # type: ignore [override] + self, + mapper: Optional[Mapper[_T]] = None, + instance: Optional[Any] = None, + shard_id: Optional[ShardIdentifier] = None, + **kw: Any, + ) -> Connection: + """Provide a :class:`_engine.Connection` to use in the unit of work + flush process. + + """ + + if shard_id is None: + shard_id = self._choose_shard_and_assign(mapper, instance) + + if self.in_transaction(): + trans = self.get_transaction() + assert trans is not None + return trans.connection(mapper, shard_id=shard_id) + else: + bind = self.get_bind( + mapper=mapper, shard_id=shard_id, instance=instance + ) + + if isinstance(bind, Engine): + return bind.connect(**kw) + else: + assert isinstance(bind, Connection) + return bind + + def get_bind( + self, + mapper: Optional[_EntityBindKey[_O]] = None, + *, + shard_id: Optional[ShardIdentifier] = None, + instance: Optional[Any] = None, + clause: Optional[ClauseElement] = None, + **kw: Any, + ) -> _SessionBind: + if shard_id is None: + shard_id = self._choose_shard_and_assign( + mapper, instance=instance, clause=clause + ) + assert shard_id is not None + return self.__shards[shard_id] + + def bind_shard( + self, shard_id: ShardIdentifier, bind: Union[Engine, OptionEngine] + ) -> None: + self.__shards[shard_id] = bind + + +class set_shard_id(ORMOption): + """a loader option for statements to apply a specific shard id to the + primary query as well as for additional relationship and column + loaders. + + The :class:`_horizontal.set_shard_id` option may be applied using + the :meth:`_sql.Executable.options` method of any executable statement:: + + stmt = ( + select(MyObject). + where(MyObject.name == 'some name'). + options(set_shard_id("shard1")) + ) + + Above, the statement when invoked will limit to the "shard1" shard + identifier for the primary query as well as for all relationship and + column loading strategies, including eager loaders such as + :func:`_orm.selectinload`, deferred column loaders like :func:`_orm.defer`, + and the lazy relationship loader :func:`_orm.lazyload`. + + In this way, the :class:`_horizontal.set_shard_id` option has much wider + scope than using the "shard_id" argument within the + :paramref:`_orm.Session.execute.bind_arguments` dictionary. + + + .. versionadded:: 2.0.0 + + """ + + __slots__ = ("shard_id", "propagate_to_loaders") + + def __init__( + self, shard_id: ShardIdentifier, propagate_to_loaders: bool = True + ): + """Construct a :class:`_horizontal.set_shard_id` option. + + :param shard_id: shard identifier + :param propagate_to_loaders: if left at its default of ``True``, the + shard option will take place for lazy loaders such as + :func:`_orm.lazyload` and :func:`_orm.defer`; if False, the option + will not be propagated to loaded objects. Note that :func:`_orm.defer` + always limits to the shard_id of the parent row in any case, so the + parameter only has a net effect on the behavior of the + :func:`_orm.lazyload` strategy. + + """ + self.shard_id = shard_id + self.propagate_to_loaders = propagate_to_loaders + + +def execute_and_instances( + orm_context: ORMExecuteState, +) -> Union[Result[_T], IteratorResult[_TP]]: + active_options: Union[ + None, + QueryContext.default_load_options, + Type[QueryContext.default_load_options], + BulkUDCompileState.default_update_options, + Type[BulkUDCompileState.default_update_options], + ] + + if orm_context.is_select: + active_options = orm_context.load_options + + elif orm_context.is_update or orm_context.is_delete: + active_options = orm_context.update_delete_options + else: + active_options = None + + session = orm_context.session + assert isinstance(session, ShardedSession) + + def iter_for_shard( + shard_id: ShardIdentifier, + ) -> Union[Result[_T], IteratorResult[_TP]]: + bind_arguments = dict(orm_context.bind_arguments) + bind_arguments["shard_id"] = shard_id + + orm_context.update_execution_options(identity_token=shard_id) + return orm_context.invoke_statement(bind_arguments=bind_arguments) + + for orm_opt in orm_context._non_compile_orm_options: + # TODO: if we had an ORMOption that gets applied at ORM statement + # execution time, that would allow this to be more generalized. + # for now just iterate and look for our options + if isinstance(orm_opt, set_shard_id): + shard_id = orm_opt.shard_id + break + else: + if active_options and active_options._identity_token is not None: + shard_id = active_options._identity_token + elif "_sa_shard_id" in orm_context.execution_options: + shard_id = orm_context.execution_options["_sa_shard_id"] + elif "shard_id" in orm_context.bind_arguments: + shard_id = orm_context.bind_arguments["shard_id"] + else: + shard_id = None + + if shard_id is not None: + return iter_for_shard(shard_id) + else: + partial = [] + for shard_id in session.execute_chooser(orm_context): + result_ = iter_for_shard(shard_id) + partial.append(result_) + return partial[0].merge(*partial[1:]) |