diff --git a/datastore/adapter/keytransform.py b/datastore/adapter/keytransform.py index 980abcb..fb4688c 100644 --- a/datastore/adapter/keytransform.py +++ b/datastore/adapter/keytransform.py @@ -1,3 +1,4 @@ +import functools import typing import datastore @@ -17,11 +18,14 @@ "BinaryNestedPathAdapter", "ObjectNestedPathAdapter", + + "BinaryFlatFSAdapter", + "ObjectFlatFSAdapter" ) - KEY_TRANSFORM_T = typing.Callable[[datastore.Key], datastore.Key] +T = typing.TypeVar('T', bound="_Adapter") class _Adapter(typing.Generic[DS, RT, RV]): @@ -164,9 +168,9 @@ class ObjectLowercaseKeyAdapter( __slots__ = ("key_transform_fn",) - class _NamespaceAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]): """Represents a simple DatastoreAdapter that namespaces all incoming keys. + For example: >>> import datastore.core @@ -220,7 +224,6 @@ class ObjectNamespaceAdapter( __slots__ = ("key_transform_fn", "namespace",) - class _NestedPathAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]): """Represents a simple DatastoreAdapter that shards/namespaces incoming keys. @@ -338,3 +341,204 @@ class ObjectNestedPathAdapter( datastore.abc.ObjectAdapter[T_co, T_co] ): __slots__ = ("key_transform_fn", "nest_depth", "nest_length", "nest_keyfn") + + +class _FlatFSAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]): + """Represents a simple DatastoreAdapter that shards/namespaces incoming keys. + + Incoming keys are sharded into namespaces according to specifications + in a `SHARDING` file, which `prefix`, `suffix`, and `next-to-last` + sharding functions available. For example: + + Implementation Notes: + * The `default_prefix` is "/repo/flatfs/shard/" + * The only version supported is "v1" + * function_name must be `prefix`, `suffix`, or `next-to-last` + + """ + + prefix: str = "/repo/flatfs/shard/" + _default_sharding_key: datastore.Key = datastore.Key("SHARDING") + _default_sharding_func: str = "v1/prefix/2" + + @classmethod + async def create( + cls: typing.Type[T], + child_datastore: DS, + *args, + sharding_key: datastore.Key = None, + default_sharding_func: str = None, + **kwargs + ) -> T: + """Initializes _FlatFSAdapter with sharding function. + + Arguments + --------- + cls + Class variable passed in due to `classmethod` + child_datastore + Underlying datastore always used to initialize an `Adapter` + *args + Variable length list argument to be passed to cls constructor + sharding_key + Key used to access the sharding function in the underlying Datastore + default_sharding_func + Default sharding func to use if one is not found in the datastore + """ + # Get the variables needed to parse the sharding function + sharding_key = ( + sharding_key + if sharding_key is not None + else cls._default_sharding_key # type: ignore[attr-defined] # noqa: F821 + ) + default_sharding_func = ( + default_sharding_func + if default_sharding_func is not None + else f"{cls.prefix}{cls._default_sharding_func}" # type: ignore[attr-defined] # noqa: F821 + ) + sharding_fn: KEY_TRANSFORM_T = await \ + cls._parse_sharding_function( # type: ignore[attr-defined] # noqa: F821 + child_datastore, sharding_key, default_sharding_func + ) + + # Return an instantiated version of the class + return cls( + child_datastore, *args, key_transform=sharding_fn, **kwargs + ) # type: ignore[attr-defined] # noqa: F821 + + @staticmethod + def _prefix(key: datastore.Key, length: int) -> datastore.Key: + return datastore.Key(str(key).ljust(length, "_")[:length]) + + @staticmethod + def _suffix(key: datastore.Key, length: int) -> datastore.Key: + return datastore.Key(str(key).rjust(length, "_")[:length]) + + @staticmethod + def _next_to_last(key: datastore.Key, length: int) -> datastore.Key: + return datastore.Key(str(key).rjust(length + 1, "_")[:length]) + + @classmethod + async def _parse_sharding_function( + cls: typing.Type[T], + child_datastore: DS, + sharding_key: datastore.Key, + default_sharding_func: str + ) -> KEY_TRANSFORM_T: + """Determine the sharding function from the sharding_key. + + Arguments + --------- + cls + Class variable passed in due to `classmethod` + child_datastore + Underlying datastore always used to access the `sharding_key` + sharding_key + Key used to access the sharding function in the `child_datastore` + default_sharding_func + Default sharding func to use if one is not found in `child_datastore` + + Raises + ------ + Exception + Empty shard identifier file. + Exception + Prefix was not present in sharding file + Exception + Sharding file was in incorrect format + Exception + Expect `v1` format + Exception + `key_length` shouold be an integer + Exception + `function_name` was not `prefix`, `suffix`, or `next-to-last` + RuntimeError + An internal error occurred + """ + + prefix: str = cls.prefix # type: ignore[attr-defined] # noqa: F821 + + sharding_func: str + need_to_store_sharding_func: bool = False + try: + # Try reading existing sharding func used + sharding_func = next( # type: ignore[assignment] # noqa: F821 + iter(await child_datastore.get_all(sharding_key)) + ) + except KeyError: + # Use the given default sharding func + sharding_func = default_sharding_func + need_to_store_sharding_func = True + + # Ensure not empty + if not sharding_func: + raise Exception("Empty shard identifier file.") + + sharding_func = sharding_func.strip() + + # Returns a 3-tuple containing the part before the separator, + # the separator itself, and the part after the separator + _, _, sharding_fn = sharding_func.partition(prefix) + + # Ensure proper format + if not sharding_fn: + raise Exception(f"Prefix ({prefix}) was not present in {sharding_func}") + + parts: list = sharding_fn.split("/") + + if len(parts) != 3: + raise Exception( + f"invalid shard identifier: {sharding_fn}.\n" + f"Expecting form: {prefix}/version/function_name/key_length" + ) + + version, function_name, length = parts + + if version != "v1": + raise Exception(f"Expected 'v1' for version string got: {version}\n") + + try: + length = int(length) + except ValueError: + raise Exception(f"Invalid parameter: {length}. Should be integer representing `key_length`.") + + funcs: dict = { + "prefix": cls._prefix, # type: ignore[attr-defined] # noqa: F821 + "suffix": cls._suffix, # type: ignore[attr-defined] # noqa: F821 + "next-to-last": cls._next_to_last # type: ignore[attr-defined] # noqa: F821 + } + + try: + func = funcs[function_name] + except Exception: + raise Exception(f"Expected 'prefix', 'suffix', or 'next-to-last' got: {function_name}") + + if need_to_store_sharding_func: + # All checks have been passed, put in datastore, remembering it for subsequent use of this repo + await child_datastore.put(sharding_key, [sharding_func]) # type: ignore[list-item] # noqa: F821 + + return functools.partial(func, length=length) + + async def query(self, query: datastore.Query) -> datastore.Cursor: + # Requires supporting * operator on queries. + raise NotImplementedError() + + + +class BinaryFlatFSAdapter( + _FlatFSAdapter[datastore.abc.BinaryDatastore, datastore.abc.ReceiveStream, bytes], + datastore.abc.BinaryAdapter +): + __slots__ = ("key_transform_fn",) + + +class ObjectFlatFSAdapter( + typing.Generic[T_co], + _FlatFSAdapter[ + datastore.abc.ObjectDatastore[T_co], + datastore.abc.ReceiveChannel[T_co], + typing.List[T_co] + ], + datastore.abc.ObjectAdapter[T_co, T_co] +): + __slots__ = ("key_transform_fn",)