Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"Query", "Cursor",
"SerializerAdapter",

"abc", "typing", "util"
"datastore_abc", "datastore_typing", "util"
)


Expand All @@ -41,6 +41,6 @@


### Exposed submodules ###
from . import abc
from . import typing
from . import datastore_abc
from . import datastore_typing
from . import util
43 changes: 35 additions & 8 deletions datastore/adapter/_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
#
# * https://github.com/python/mypy/issues/7790 (Associated types)
# * https://github.com/python/mypy/issues/7791 (Types of generic classes)
DS = typing.TypeVar("DS", datastore.abc.BinaryDatastore,
datastore.abc.ObjectDatastore[T_co]) # type: ignore[valid-type] # noqa: F821
DA = typing.TypeVar("DA", datastore.abc.BinaryAdapter,
datastore.abc.ObjectAdapter[T_co, T_co]) # type: ignore[valid-type] # noqa: F821, E501
DS = typing.TypeVar("DS", datastore.datastore_abc.BinaryDatastore,
datastore.datastore_abc.ObjectDatastore[T_co]) # type: ignore[valid-type]
DA = typing.TypeVar("DA", datastore.datastore_abc.BinaryAdapter,
datastore.datastore_abc.ObjectAdapter[T_co, T_co]) # type: ignore[valid-type]
MD = typing.TypeVar("MD", datastore.util.StreamMetadata,
datastore.util.ChannelMetadata) # type: ignore[valid-type] # noqa: F821
RT = typing.TypeVar("RT", datastore.abc.ReceiveStream,
datastore.abc.ReceiveChannel[T_co]) # type: ignore[valid-type] # noqa: F821
RV = typing.TypeVar("RV", bytes, typing.List[T_co]) # type: ignore[valid-type] # noqa: F821
datastore.util.ChannelMetadata)
RT = typing.TypeVar("RT", datastore.datastore_abc.ReceiveStream,
datastore.datastore_abc.ReceiveChannel[T_co]) # type: ignore[valid-type]
RV = typing.TypeVar("RV", bytes, typing.List[T_co]) # type: ignore[valid-type]


# Workaround for https://github.com/python/mypy/issues/708
Expand Down Expand Up @@ -63,6 +63,33 @@ def insert_datastore(self, index: int, store: DS) -> None:
"""Inserts datastore `store` into this collection at `index`."""
self._stores.insert(index, store)

def datastore_stats(self, selector: datastore.Key = None, *, _seen: typing.Set[int] = None) \
-> datastore.util.DatastoreMetadata:
"""Returns the metadata sum of all children

Arguments
---------
selector
Passed down to queried child datastores but otherwise ignored, as this
datastore always queries all children

Raises
------
RuntimeError
An internal error occurred in one of the child datastores
"""
_seen = _seen if _seen is not None else set()

metadata = datastore.util.DatastoreMetadata.IGNORE
for store in self._stores:
if id(store) in _seen:
continue

_seen.add(id(store))
metadata += store.datastore_stats(selector, _seen=_seen)
return metadata


async def _stores_cleanup(self) -> None:
"""Closes and removes all added datastores"""
errors: typing.List[Exception] = []
Expand Down
78 changes: 69 additions & 9 deletions datastore/adapter/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ class ObjectDirectorySupport:

@typing.no_type_check
async def directory(self, dir_key: datastore.Key, exist_ok: bool = False) -> bool:
"""Initializes directory at dir_key.
"""Initializes directory at dir_key

Returns a boolean of whether a new directory was actually created or
not."""
not.
"""
try:
await (await super().get(dir_key)).aclose()
except KeyError:
await super()._put(dir_key, datastore.util.receive_channel_from([]))
await super().put(dir_key, [])
return True
else:
if not exist_ok:
Expand Down Expand Up @@ -84,7 +85,7 @@ async def directory_add(self, dir_key: datastore.Key, key: datastore.Key,

if key_str not in dir_items:
dir_items.append(key_str)
await super()._put(dir_key, datastore.util.receive_channel_from(dir_items))
await super().put(dir_key, dir_items)


@typing.no_type_check
Expand All @@ -110,13 +111,13 @@ async def directory_remove(self, dir_key: datastore.Key, key: datastore.Key,
if not missing_ok:
raise KeyError(f"{key} in {dir_key}") from None
else:
await super()._put(dir_key, datastore.util.receive_channel_from(dir_items))
await super().put(dir_key, dir_items)



class ObjectDatastore(
ObjectDirectorySupport,
datastore.abc.ObjectAdapter[T_co, typing.Union[T_co, str]],
datastore.datastore_abc.ObjectAdapter[T_co, typing.Union[T_co, str]],
typing.Generic[T_co]
):
"""Datastore that tracks directory entries, like in a filesystem.
Expand Down Expand Up @@ -156,14 +157,17 @@ class ObjectDatastore(

FORWARD_CONTAINS = True
FORWARD_GET_ALL = True
FORWARD_PUT_NEW = True
FORWARD_RENAME = True
FORWARD_STAT = True


async def _put(self, key: datastore.Key, value: datastore.abc.ReceiveChannel[T_co]) -> None:
async def _put(self, key: datastore.Key, value: datastore.datastore_abc.ReceiveChannel[T_co],
**kwargs: typing.Any) -> None:
"""Stores the object `value` named by `key`.
DirectoryTreeDatastore stores a directory entry.
"""
await super()._put(key, value)
await super()._put(key, value, **kwargs)

# ignore root
if key.is_top_level():
Expand All @@ -174,6 +178,46 @@ async def _put(self, key: datastore.Key, value: datastore.abc.ReceiveChannel[T_c
await super().directory_add(dir_key, key, create=True)


async def _put_new(self,
prefix: datastore.Key,
value: datastore.datastore_abc.ReceiveChannel[T_co],
**kwargs: typing.Any) -> datastore.Key:
"""Stores the object `value` named by `key`.
DirectoryTreeDatastore stores a directory entry.
"""
key = await super()._put_new(prefix, value, **kwargs)

# ignore root
if key.is_top_level():
return key

# Add entry to directory
dir_key = key.parent.instance('directory')
await super().directory_add(dir_key, key, create=True)

return key


async def _put_new_indirect(self, prefix: datastore.Key, **kwargs: typing.Any) \
-> datastore.datastore_abc.ObjectAdapter._PUT_NEW_INDIRECT_RT[T_co]:
"""Stores the object `value` named by `key`.
DirectoryTreeDatastore stores a directory entry.
"""
key, callback = await super()._put_new_indirect(prefix, **kwargs)

# ignore root
if key.is_top_level():
return key, callback

async def callback_wrapper(value: datastore.datastore_abc.ReceiveChannel[T_co]) -> None:
# Add entry to directory
dir_key = key.parent.instance('directory')
await super(ObjectDatastore, self).directory_add(dir_key, key, create=True)

await callback(value)
return key, callback_wrapper


async def delete(self, key: datastore.Key) -> None:
"""Removes the object named by `key`.
DirectoryTreeDatastore removes the directory entry.
Expand All @@ -184,8 +228,24 @@ async def delete(self, key: datastore.Key) -> None:
await super().directory_remove(dir_key, key, missing_ok=True)


async def rename(self, key1: datastore.Key, key2: datastore.Key, *,
replace: bool = True) -> None:
"""Renames item *key1* to *key2*

DirectoryTreeDatastore removes the previous directory entry and add a new one.
"""
await super().rename(key1, key2, replace=replace)

if key1 != key2:
dir_key1 = key1.parent.instance('directory')
await super().directory_remove(dir_key1, key1, missing_ok=True)

dir_key2 = key2.parent.instance('directory')
await super().directory_add(dir_key2, key2, create=True)


async def query(self, query: datastore.Query) -> datastore.Cursor:
"""Returns objects matching criteria expressed in `query`.
DirectoryTreeDatastore uses directory entries.
"""
return query(super().directory_read(query.key))
return query(super().directory_read(query.key)) # type: ignore[no-any-return]
Loading