Skip to content

Commit

Permalink
progress on store interface (memory and local stores only)
Browse files Browse the repository at this point in the history
fixes after rebas
e

make all tests pass
  • Loading branch information
jhamman committed Feb 7, 2024
1 parent 0b01bfb commit 661c0d6
Show file tree
Hide file tree
Showing 18 changed files with 3,505 additions and 3,884 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ src/zarr/_version.py
#doesnotexist
#test_sync*
data/*
src/fixture/

.DS_Store
5 changes: 0 additions & 5 deletions src/zarr/v3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
from zarr.v3.group import Group # noqa: F401
from zarr.v3.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401
from zarr.v3.store import ( # noqa: F401
LocalStore,
RemoteStore,
Store,
StoreLike,
StorePath,
make_store_path,
)
from zarr.v3.sync import sync as _sync
Expand All @@ -27,7 +23,6 @@ async def open_auto_async(
return await Array.open(store_path, runtime_configuration=runtime_configuration_)
except KeyError:
return await Group.open(store_path, runtime_configuration=runtime_configuration_)



def open_auto(
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/v3/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np

from zarr.v3.common import BytesLike, SliceSelection
from zarr.v3.stores import StorePath
from zarr.v3.store import StorePath


if TYPE_CHECKING:
Expand Down
48 changes: 26 additions & 22 deletions src/zarr/v3/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from abc import abstractmethod, ABC

from typing import List, Tuple
from typing import List, Tuple, Optional


class Store(ABC):
pass


class ReadStore(Store):
@abstractmethod
async def get(self, key: str) -> bytes:
async def get(
self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> Optional[bytes]:
"""Retrieve the value associated with a given key.
Parameters
----------
key : str
byte_range : tuple[int, Optional[int]], optional
Returns
-------
Expand All @@ -23,7 +22,9 @@ async def get(self, key: str) -> bytes:
...

@abstractmethod
async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]) -> List[bytes]:
async def get_partial_values(
self, key_ranges: List[Tuple[str, Tuple[int, int]]]
) -> List[bytes]:
"""Retrieve possibly partial values from given key_ranges.
Parameters
Expand All @@ -38,6 +39,7 @@ async def get_partial_values(self, key_ranges: List[Tuple[str, Tuple[int, int]]]
"""
...

@abstractmethod
async def exists(self, key: str) -> bool:
"""Check if a key exists in the store.
Expand All @@ -51,8 +53,12 @@ async def exists(self, key: str) -> bool:
"""
...

@property
@abstractmethod
def supports_writes(self) -> bool:
"""Does the store support writes?"""
...

class WriteStore(ReadStore):
@abstractmethod
async def set(self, key: str, value: bytes) -> None:
"""Store a (key, value) pair.
Expand All @@ -64,7 +70,8 @@ async def set(self, key: str, value: bytes) -> None:
"""
...

async def delete(self, key: str) -> None
@abstractmethod
async def delete(self, key: str) -> None:
"""Remove a key from the store
Parameters
Expand All @@ -73,10 +80,11 @@ async def delete(self, key: str) -> None
"""
...


class PartialWriteStore(WriteStore):
# TODO, instead of using this, should we just check if the store is a PartialWriteStore?
supports_partial_writes = True
@property
@abstractmethod
def supports_partial_writes(self) -> bool:
"""Does the store support partial writes?"""
...

@abstractmethod
async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None:
Expand All @@ -91,8 +99,12 @@ async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]
"""
...

@property
@abstractmethod
def supports_listing(self) -> bool:
"""Does the store support listing?"""
...

class ListMixin:
@abstractmethod
async def list(self) -> List[str]:
"""Retrieve all keys in the store.
Expand Down Expand Up @@ -132,11 +144,3 @@ async def list_dir(self, prefix: str) -> List[str]:
list[str]
"""
...


class ReadListStore(ReadStore, ListMixin):
pass


class WriteListStore(WriteStore, ListMixin):
pass
14 changes: 7 additions & 7 deletions src/zarr/v3/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
CodecMetadata,
ShardingCodecIndexLocation,
)
from zarr.v3.stores import StorePath
from zarr.v3.store import StorePath

MAX_UINT_64 = 2**64 - 1

Expand Down Expand Up @@ -354,7 +354,7 @@ async def decode_partial(
for chunk_coords in all_chunk_coords:
chunk_byte_slice = shard_index.get_chunk_slice(chunk_coords)
if chunk_byte_slice:
chunk_bytes = await store_path.get_async(chunk_byte_slice)
chunk_bytes = await store_path.get(chunk_byte_slice)
if chunk_bytes:
shard_dict[chunk_coords] = chunk_bytes

Expand Down Expand Up @@ -533,9 +533,9 @@ async def _write_chunk(
)

if shard_builder.index.is_all_empty():
await store_path.delete_async()
await store_path.delete()
else:
await store_path.set_async(
await store_path.set(
await shard_builder.finalize(
self.configuration.index_location,
self._encode_shard_index,
Expand All @@ -561,9 +561,9 @@ def _shard_index_size(self) -> int:
async def _load_shard_index_maybe(self, store_path: StorePath) -> Optional[_ShardIndex]:
shard_index_size = self._shard_index_size()
if self.configuration.index_location == ShardingCodecIndexLocation.start:
index_bytes = await store_path.get_async((0, shard_index_size))
index_bytes = await store_path.get((0, shard_index_size))
else:
index_bytes = await store_path.get_async((-shard_index_size, None))
index_bytes = await store_path.get((-shard_index_size, None))
if index_bytes is not None:
return await self._decode_shard_index(index_bytes)
return None
Expand All @@ -574,7 +574,7 @@ async def _load_shard_index(self, store_path: StorePath) -> _ShardIndex:
)

async def _load_full_shard_maybe(self, store_path: StorePath) -> Optional[_ShardProxy]:
shard_bytes = await store_path.get_async()
shard_bytes = await store_path.get()

return await _ShardProxy.from_bytes(shard_bytes, self) if shard_bytes else None

Expand Down
8 changes: 4 additions & 4 deletions src/zarr/v3/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def delitem(self, key: str) -> None:

async def _save_metadata(self) -> None:
to_save = self.metadata.to_bytes()
awaitables = [(self.store_path / key).set_async(value) for key, value in to_save.items()]
awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()]
await asyncio.gather(*awaitables)

@property
Expand Down Expand Up @@ -227,9 +227,9 @@ async def update_attributes(self, new_attributes: Dict[str, Any]):
to_save = self.metadata.to_bytes()
if self.metadata.zarr_format == 2:
# only save the .zattrs object
await (self.store_path / ZATTRS_JSON).set_async(to_save[ZATTRS_JSON])
await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON])
else:
await (self.store_path / ZARR_JSON).set_async(to_save[ZARR_JSON])
await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON])

self.metadata.attributes.clear()
self.metadata.attributes.update(new_attributes)
Expand Down Expand Up @@ -333,7 +333,7 @@ def __getitem__(self, path: str) -> Union[Array, Group]:
return Group(obj)

def __delitem__(self, key) -> None:
self._sync(self._async_group.delitem(path))
self._sync(self._async_group.delitem(key))

def __iter__(self):
raise NotImplementedError
Expand Down
Loading

0 comments on commit 661c0d6

Please sign in to comment.