Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 207 additions & 3 deletions datastore/adapter/keytransform.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import typing

import datastore
Expand All @@ -17,11 +18,14 @@

"BinaryNestedPathAdapter",
"ObjectNestedPathAdapter",

"BinaryFlatFSAdapter",
"ObjectFlatFSAdapter"
)



KEY_TRANSFORM_T = typing.Callable[[datastore.Key], datastore.Key]
T = typing.TypeVar('T', bound="_Adapter")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bound here should be on "_FlatFSAdapter", not just "_Adapter". That may also fix many mypy complains, if I'm not mistaken…



class _Adapter(typing.Generic[DS, RT, RV]):
Expand Down Expand Up @@ -164,9 +168,9 @@ class ObjectLowercaseKeyAdapter(
__slots__ = ("key_transform_fn",)



class _NamespaceAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]):
"""Represents a simple DatastoreAdapter that namespaces all incoming keys.

For example:

>>> import datastore.core
Expand Down Expand Up @@ -220,7 +224,6 @@ class ObjectNamespaceAdapter(
__slots__ = ("key_transform_fn", "namespace",)



class _NestedPathAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]):
"""Represents a simple DatastoreAdapter that shards/namespaces incoming keys.

Expand Down Expand Up @@ -338,3 +341,204 @@ class ObjectNestedPathAdapter(
datastore.abc.ObjectAdapter[T_co, T_co]
):
__slots__ = ("key_transform_fn", "nest_depth", "nest_length", "nest_keyfn")


class _FlatFSAdapter(_Adapter[DS, RT, RV], typing.Generic[DS, RT, RV]):
"""Represents a simple DatastoreAdapter that shards/namespaces incoming keys.

Incoming keys are sharded into namespaces according to specifications
in a `SHARDING` file, which `prefix`, `suffix`, and `next-to-last`
sharding functions available. For example:

Implementation Notes:
* The `default_prefix` is "/repo/flatfs/shard/"
* The only version supported is "v1"
* function_name must be `prefix`, `suffix`, or `next-to-last`

"""

prefix: str = "/repo/flatfs/shard/"
_default_sharding_key: datastore.Key = datastore.Key("SHARDING")
_default_sharding_func: str = "v1/prefix/2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "v1/next-to-last/2" here as that is the default go-ipfs uses.


@classmethod
async def create(
cls: typing.Type[T],
child_datastore: DS,
*args,
sharding_key: datastore.Key = None,
default_sharding_func: str = None,
**kwargs
) -> T:
"""Initializes _FlatFSAdapter with sharding function.

Arguments
---------
cls
Class variable passed in due to `classmethod`
child_datastore
Underlying datastore always used to initialize an `Adapter`
*args
Variable length list argument to be passed to cls constructor
sharding_key
Key used to access the sharding function in the underlying Datastore
default_sharding_func
Default sharding func to use if one is not found in the datastore
"""
# Get the variables needed to parse the sharding function
sharding_key = (
sharding_key
if sharding_key is not None
else cls._default_sharding_key # type: ignore[attr-defined] # noqa: F821
)
default_sharding_func = (
default_sharding_func
if default_sharding_func is not None
else f"{cls.prefix}{cls._default_sharding_func}" # type: ignore[attr-defined] # noqa: F821
)
sharding_fn: KEY_TRANSFORM_T = await \
cls._parse_sharding_function( # type: ignore[attr-defined] # noqa: F821
child_datastore, sharding_key, default_sharding_func
)

# Return an instantiated version of the class
return cls(
child_datastore, *args, key_transform=sharding_fn, **kwargs
) # type: ignore[attr-defined] # noqa: F821

@staticmethod
def _prefix(key: datastore.Key, length: int) -> datastore.Key:
return datastore.Key(str(key).ljust(length, "_")[:length])

@staticmethod
def _suffix(key: datastore.Key, length: int) -> datastore.Key:
return datastore.Key(str(key).rjust(length, "_")[:length])

@staticmethod
def _next_to_last(key: datastore.Key, length: int) -> datastore.Key:
return datastore.Key(str(key).rjust(length + 1, "_")[:length])

@classmethod
async def _parse_sharding_function(
cls: typing.Type[T],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use cls: typing.Type[T], here, it will let you drop a bunch of mypy attr-defined comments in this method.

child_datastore: DS,
sharding_key: datastore.Key,
default_sharding_func: str
) -> KEY_TRANSFORM_T:
"""Determine the sharding function from the sharding_key.

Arguments
---------
cls
Class variable passed in due to `classmethod`
child_datastore
Underlying datastore always used to access the `sharding_key`
sharding_key
Key used to access the sharding function in the `child_datastore`
default_sharding_func
Default sharding func to use if one is not found in `child_datastore`

Raises
------
Exception
Empty shard identifier file.
Exception
Prefix was not present in sharding file
Exception
Sharding file was in incorrect format
Exception
Expect `v1` format
Exception
`key_length` shouold be an integer
Exception
`function_name` was not `prefix`, `suffix`, or `next-to-last`
Comment on lines +443 to +454
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing raise Exception("<some text>") is an anti-pattern in Python (unlike JavaScript where the exception type is mostly useless anyways). Please add an exception hierarchy like the following for this:

class InitializationError(RuntimeError):
	"""Raised when there was an error with opening this datastore adapter."""
	datastore: T
	
	def __init__(self, datastore: T, *args):
		self.datastore = datastore
		super().__init__(*args)

# Use for other errors
class ShardingStateError(InitializationError):
	"""Raised when the existing sharding entry could not be read."""
	key: datastore.Key
	
	def __init__(self, datastore: T, key: datastore.Key, *args):
		self.key = key
		super().__init__(datastore, *args)

# Use for: Empty shard identifier file.
class ShardingStateEmptyError(ShardingStateError):
	"""Raised when the existing sharding entry is empty."""

# Use for: Expect `v1` format & `function_name` was not `prefix`, `suffix`, or `next-to-last`
class ShardingStateUnsupportedError(ShardingStateError):
	"""Raised when the existing sharding entry uses an unsupported version number or sharding function."""

(The comment lines are just for your orientation.) Add this to the file top, add the relevant entries to __all__ and update all raise statements to raise <ErrorType>(self, sharding_key, "<some text>"). This way consumers of this API can more easily distinguish between the kinds of errors raised (using try: …; except <ErrorType> as exc: …). Also some extra error data is included to help callers understand in which context the error occurred.

RuntimeError
An internal error occurred
"""

prefix: str = cls.prefix # type: ignore[attr-defined] # noqa: F821

sharding_func: str
need_to_store_sharding_func: bool = False
try:
# Try reading existing sharding func used
sharding_func = next( # type: ignore[assignment] # noqa: F821
iter(await child_datastore.get_all(sharding_key))
)
Comment on lines +465 to +467
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it will return a binary string / bytes object when self is BinaryAdapter, but you need a Unicode string so you'll need to call decode somewhere. I propose the following to support for object and binary datastores:

if isinstance(self, datastore.abc.BinaryAdapter):
	# Decode using latin-1 to get a 1:1 binary to Unicode mapping that cannot fail
	sharding_func = (await child_datastore.get_all(sharding_key)).decode("latin-1")
else:
	# Expect result from object datastore be a Unicode string
	sharding_func = typing.cast(str, await child_datastore.get_all(sharding_key))
	assert isinstance(sharding_func, str)

except KeyError:
# Use the given default sharding func
sharding_func = default_sharding_func
need_to_store_sharding_func = True

# Ensure not empty
if not sharding_func:
raise Exception("Empty shard identifier file.")

sharding_func = sharding_func.strip()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place this line above the above exception as bool(" ") is True (not False)


# Returns a 3-tuple containing the part before the separator,
# the separator itself, and the part after the separator
_, _, sharding_fn = sharding_func.partition(prefix)

# Ensure proper format
if not sharding_fn:
raise Exception(f"Prefix ({prefix}) was not present in {sharding_func}")

parts: list = sharding_fn.split("/")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the type tying.List[str] here to prevent type erasure.


if len(parts) != 3:
raise Exception(
f"invalid shard identifier: {sharding_fn}.\n"
f"Expecting form: {prefix}/version/function_name/key_length"
)

version, function_name, length = parts

if version != "v1":
raise Exception(f"Expected 'v1' for version string got: {version}\n")

try:
length = int(length)
except ValueError:
raise Exception(f"Invalid parameter: {length}. Should be integer representing `key_length`.")

funcs: dict = {
"prefix": cls._prefix, # type: ignore[attr-defined] # noqa: F821
"suffix": cls._suffix, # type: ignore[attr-defined] # noqa: F821
"next-to-last": cls._next_to_last # type: ignore[attr-defined] # noqa: F821
}
Comment on lines +505 to +509
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a class variable and change the type to typing.Dict[str, typing.Callable[[datastore.Key, int], datastore.Key]] for a proper typing signature.


try:
func = funcs[function_name]
except Exception:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like above, except Exception: is an antipattern in Python unless you really mean “catch all non-critical errors”. In this case you only want to catch KeyError (I presume), so write that instead.

Additionally, if you write something like except …: raise <ErrorType>(…) in Python, the interpreter will assume that your code crashed while trying to handle the previous exception and will show this in the backtrace. If you are instead raising an exception because you caught another write except … as exc: raise <ErrorType>(…) from exc instead so that the interpret can show proper explanations of what happened in stack traces.

raise Exception(f"Expected 'prefix', 'suffix', or 'next-to-last' got: {function_name}")

if need_to_store_sharding_func:
# All checks have been passed, put in datastore, remembering it for subsequent use of this repo
await child_datastore.put(sharding_key, [sharding_func]) # type: ignore[list-item] # noqa: F821

return functools.partial(func, length=length)

async def query(self, query: datastore.Query) -> datastore.Cursor:
# Requires supporting * operator on queries.
raise NotImplementedError()



class BinaryFlatFSAdapter(
_FlatFSAdapter[datastore.abc.BinaryDatastore, datastore.abc.ReceiveStream, bytes],
datastore.abc.BinaryAdapter
):
__slots__ = ("key_transform_fn",)


class ObjectFlatFSAdapter(
typing.Generic[T_co],
_FlatFSAdapter[
datastore.abc.ObjectDatastore[T_co],
datastore.abc.ReceiveChannel[T_co],
typing.List[T_co]
],
datastore.abc.ObjectAdapter[T_co, T_co]
):
__slots__ = ("key_transform_fn",)