Skip to content

Commit b1b1d3a

Browse files
committed
Make serializer interface streaming
1 parent e136c9a commit b1b1d3a

File tree

6 files changed

+298
-119
lines changed

6 files changed

+298
-119
lines changed

datastore/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"BinaryNullDatastore", "BinaryDictDatastore",
1616
"ObjectNullDatastore", "ObjectDictDatastore",
1717
"Query", "Cursor",
18-
"SerializerAdapter",
18+
"SerializerAdapter", "SerializingError",
19+
"SerializeError", "ParseError",
1920

2021
"abc", "typing", "util"
2122
)
@@ -38,6 +39,9 @@
3839

3940
# import core.serialize
4041
from .core.serialize import SerializerAdapter
42+
from .core.serialize import SerializingError
43+
from .core.serialize import SerializeError
44+
from .core.serialize import ParseError
4145

4246

4347
### Exposed submodules ###

datastore/core/serialize.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,35 @@ class util: # noqa
1414
T_co = typing.TypeVar("T_co", covariant=True)
1515

1616

17-
#FIXME: This stuff should support streaming data to the maximum extent possible
17+
class SerializingError(ValueError):
18+
"""Base class for all encoding and decoding related errors."""
19+
20+
def __init__(self, message, encoder_name):
21+
self.encoder_name = encoder_name
22+
23+
super().__init__(message)
24+
25+
26+
class SerializeError(SerializingError):
27+
"""Raised when encoding a Python object into a byte string has failed
28+
due to some problem with the input data."""
29+
30+
def __init__(self, encoder_name, original):
31+
self.original = original
32+
33+
msg = "Object serialization error: {}".format(original)
34+
super().__init__(msg, encoder_name)
35+
36+
37+
class ParseError(SerializingError):
38+
"""Raised when decoding a byte string to a Python object has failed due to
39+
some problem with the input data."""
40+
41+
def __init__(self, encoder_name, original):
42+
self.original = original
43+
44+
msg = "Object parsing error: {}".format(original)
45+
super().__init__(msg, encoder_name)
1846

1947

2048
class Serializer(typing.Generic[T_co], metaclass=abc.ABCMeta):
@@ -24,13 +52,15 @@ class Serializer(typing.Generic[T_co], metaclass=abc.ABCMeta):
2452

2553

2654
@abc.abstractmethod
27-
def loads(self, value: bytes) -> typing.List[T_co]:
55+
def parse(self, value: util.stream.ReceiveStream) \
56+
-> util.stream.ArbitraryReceiveChannel[T_co]:
2857
"""returns deserialized `value`."""
2958
pass
3059

3160

3261
@abc.abstractmethod
33-
def dumps(self, value: typing.List[T_co]) -> bytes:
62+
def serialize(self, value: util.stream.ReceiveChannel[T_co]) \
63+
-> util.stream.ArbitraryReceiveStream:
3464
"""returns serialized `value`."""
3565
pass
3666

@@ -87,8 +117,8 @@ async def get(self, key: key_.Key) -> util.stream.ReceiveChannel[T_co]:
87117
key
88118
Key naming the object to retrieve
89119
"""
90-
value = await (await self.child_datastore.get(key)).collect() #FIXME
91-
return util.stream.receive_channel_from(self.serializer.loads(value))
120+
value = await self.child_datastore.get(key)
121+
return util.stream.receive_channel_from(self.serializer.parse(value))
92122

93123

94124
async def _put(self, key: key_.Key, value: util.stream.ReceiveChannel[T_co]) -> None:
@@ -104,8 +134,8 @@ async def _put(self, key: key_.Key, value: util.stream.ReceiveChannel[T_co]) ->
104134
value
105135
The object to store.
106136
"""
107-
value_items = await value.collect() #FIXME
108-
value_bytes = self.serializer.dumps(value_items)
137+
value_items = util.stream.receive_channel_from(value)
138+
value_bytes = self.serializer.serialize(value_items)
109139
await self.child_datastore.put(key, value_bytes)
110140

111141

@@ -130,7 +160,10 @@ async def query(self, query: query_.Query) -> query_.Cursor:
130160
result = cursor._iterable
131161
cursor._iterable = []
132162
for field in result:
133-
cursor._iterable.extend(self.serializer.loads(field))
163+
field_stream = util.stream.receive_stream_from(field)
164+
parse_items = util.stream.receive_channel_from(self.serializer.parse(field_stream))
165+
async for item in parse_items:
166+
cursor._iterable.append(item)
134167

135168
return cursor
136169

datastore/core/util/stream.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,15 @@ async def collect(self) -> bytes:
555555
break
556556
value += chunk
557557
return bytes(value)
558+
559+
def __aiter__(self):
560+
return self
561+
562+
async def __anext__(self):
563+
value = await self.receive_some()
564+
if len(value) < 1:
565+
raise StopAsyncIteration
566+
return value
558567

559568

560569
class TeeingReceiveStream(ReceiveStream):

datastore/serializer/json.py

Lines changed: 163 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,192 @@
1+
import codecs
12
import json
23
import typing
34

5+
import datastore
46
import datastore.abc
57

68
__all__ = ("Serializer", "PrettySerializer")
79

810
T_co = typing.TypeVar("T_co", covariant=True)
911

1012

11-
#FIXME: This stuff should support streaming data to the maximum extent possible
13+
#XXX: Switch to [`ijson`](https://pypi.org/project/ijson/) once it supports a
14+
# non-blocking parsing mode.
15+
# Upstream issue: https://github.com/ICRAR/ijson/issues/22 (“working on this”)
1216

1317

1418
class Serializer(datastore.abc.Serializer[T_co], typing.Generic[T_co]):
15-
"""json wrapper serializer that gives the most compact representation possible.
16-
"""
17-
18-
__slots__ = ("encoding", "indent", "separators")
19+
"""JSON parser that handles concatenated JSON messages and encodes to the
20+
most compact representation possible by default."""
1921

22+
__slots__ = ("encoding", "indent", "separators",
23+
"_buffer", "_decoder1", "_decoder2", "_lasterror")
24+
2025
encoding: str
2126
indent: typing.Optional[typing.Union[int, str]]
2227
separators: typing.Tuple[str, str]
2328

24-
29+
_buffer: typing.List[str]
30+
_decoder1: codecs.IncrementalDecoder
31+
_decoder2: json.JSONDecoder
32+
_lasterror: typing.Optional[Exception]
33+
34+
2535
def __init__(self, *, encoding: str = "utf-8",
2636
indent: typing.Optional[typing.Union[int, str]] = None,
2737
separators: typing.Tuple[str, str] = (',', ':')):
2838
self.encoding = encoding
2939
self.indent = indent
3040
self.separators = separators
41+
42+
self._buffer = []
43+
self._decoder1 = codecs.getincrementaldecoder(encoding)()
44+
self._decoder2 = json.JSONDecoder()
45+
self._lasterror = None
46+
47+
def parse_partial(self, data: bytes) -> typing.Iterator[T_co]:
48+
"""Incrementally decodes JSON data sets into Python objects.
49+
50+
Raises
51+
------
52+
~datastore.DecodingError
53+
"""
54+
try:
55+
# Python 3 requires all JSON data to be a text string
56+
lines = self._decoder1.decode(data, False).split("\n")
57+
58+
# Add first input line to last buffer line, if applicable, to
59+
# handle cases where the JSON string has been chopped in half
60+
# at the network level due to streaming
61+
if len(self._buffer) > 0 and self._buffer[-1] is not None:
62+
self._buffer[-1] += lines[0]
63+
self._buffer.extend(lines[1:])
64+
else:
65+
self._buffer.extend(lines)
66+
except UnicodeDecodeError as error:
67+
raise datastore.ParseError("json", error) from error
68+
69+
# Process data buffer
70+
index = 0
71+
try:
72+
# Process each line as separate buffer
73+
#PERF: This way the `.lstrip()` call becomes almost always a NOP
74+
# even if it does return a different string it will only
75+
# have to allocate a new buffer for the currently processed
76+
# line.
77+
while index < len(self._buffer):
78+
while self._buffer[index]:
79+
# Make sure buffer does not start with whitespace
80+
#PERF: `.lstrip()` does not reallocate if the string does
81+
# not actually start with whitespace.
82+
self._buffer[index] = self._buffer[index].lstrip()
83+
84+
# Handle case where the remainder of the line contained
85+
# only whitespace
86+
if not self._buffer[index]:
87+
self._buffer[index] = None
88+
continue
89+
90+
# Try decoding the partial data buffer and return results
91+
# from this
92+
data = self._buffer[index]
93+
for index2 in range(index, len(self._buffer)):
94+
# If decoding doesn't succeed with the currently
95+
# selected buffer (very unlikely with our current
96+
# class of input data) then retry with appending
97+
# any other pending pieces of input data
98+
# This will happen with JSON data that contains
99+
# arbitrary new-lines: "{1:\n2,\n3:4}"
100+
if index2 > index:
101+
data += "\n" + self._buffer[index2]
102+
103+
try:
104+
print(repr(data))
105+
(obj, offset) = self._decoder2.raw_decode(data)
106+
except ValueError:
107+
# Treat error as fatal if we have already added
108+
# the final buffer to the input
109+
if (index2 + 1) == len(self._buffer):
110+
raise
111+
else:
112+
index = index2
113+
break
114+
115+
# Decoding succeeded – yield result and shorten buffer
116+
yield obj
117+
118+
if (len(data) - offset) > 0:
119+
buffer_offset = len(self._buffer[index]) - len(data) + offset
120+
self._buffer[index] = self._buffer[index][buffer_offset:]
121+
else:
122+
self._buffer[index] = None
123+
index += 1
124+
except ValueError as error:
125+
# It is unfortunately not possible to reliably detect whether
126+
# parsing ended because of an error *within* the JSON string, or
127+
# an unexpected *end* of the JSON string.
128+
# We therefor have to assume that any error that occurs here
129+
# *might* be related to the JSON parser hitting EOF and therefor
130+
# have to postpone error reporting until `parse_finalize` is
131+
# called.
132+
self._lasterror = error
133+
finally:
134+
# Remove all processed buffers
135+
del self._buffer[0:index]
136+
137+
def parse_finalize(self) -> typing.Iterator[T_co]:
138+
"""Raises errors for incomplete buffered data that could not be parsed
139+
because the end of the input data has been reached.
140+
141+
Raises
142+
------
143+
~ipfshttpclient.exceptions.DecodingError
144+
145+
Returns
146+
-------
147+
tuple : Always empty
148+
"""
149+
try:
150+
try:
151+
# Raise exception for remaining bytes in bytes decoder
152+
self._decoder1.decode(b"", True)
153+
except UnicodeDecodeError as error:
154+
raise datastore.ParseError("json", error) from error
155+
156+
# Late raise errors that looked like they could have been fixed if
157+
# the caller had provided more data
158+
if self._buffer:
159+
raise datastore.ParseError("json", self._lasterror) from self._lasterror
160+
finally:
161+
# Reset state
162+
self._buffer = []
163+
self._lasterror = None
164+
self._decoder1.reset()
165+
166+
return ()
31167

32-
33-
def loads(self, value: bytes) -> typing.List[T_co]:
34-
"""returns json deserialized `value`."""
35-
return [json.loads(value)] #FIXME: Broken if more than one object
36-
37-
38-
def dumps(self, value: typing.Iterable[T_co]) -> bytes:
39-
"""returns json serialized `value` (pretty-printed)."""
40-
result = bytearray()
41-
for item in value:
42-
# We force `ensure_ascii=False` here to most compact encoding of
43-
# having non-ascii characters be encoded by the given encoding
44-
# rather then using JSON string "\uXXXX" notation.
45-
# We also force `sort_keys=True` to ensure the output is
46-
# reproducible between interpreter runs.
47-
result += json.dumps(item, ensure_ascii=False, sort_keys=True,
48-
indent=self.indent, separators=self.separators
49-
).encode(self.encoding)
50-
return bytes(result)
168+
async def parse(self, source: datastore.abc.ReceiveStream) -> typing.Iterator[T_co]:
169+
async for chunk in source:
170+
for obj in self.parse_partial(chunk):
171+
yield obj
172+
for obj in self.parse_finalize():
173+
yield obj
51174

175+
async def serialize(self, value: datastore.abc.ReceiveChannel[T_co]) \
176+
-> typing.Iterator[bytes]:
177+
"""returns json serialized `value` (pretty-printed)."""
178+
try:
179+
async for obj in value:
180+
# We force `ensure_ascii=False` here to most compact encoding of
181+
# having non-ascii characters be encoded by the given encoding
182+
# rather then using JSON string "\uXXXX" notation.
183+
# We also force `sort_keys=True` to ensure the output is
184+
# reproducible between interpreter runs.
185+
yield json.dumps(obj, ensure_ascii=False, sort_keys=True,
186+
indent=self.indent, separators=self.separators
187+
).encode(self.encoding)
188+
except (UnicodeEncodeError, TypeError) as error:
189+
raise datastore.SerializeError("json", error) from error
52190

53191

54192
class PrettySerializer(Serializer[T_co], typing.Generic[T_co]):

0 commit comments

Comments
 (0)