diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py index bb5677ca62bf2e3cacf4a5180da5cf33b9fdce8a..3cf798e3316fbaaf837b61a0d947d072f811fdd1 100644 --- a/src/leap/soledad/server/_streaming_resource.py +++ b/src/leap/soledad/server/_streaming_resource.py @@ -25,7 +25,7 @@ import base64 from zope.interface import implementer from twisted.internet.interfaces import IPushProducer -from twisted.internet import task, defer +from twisted.internet import task, defer, threads from twisted.web.server import NOT_DONE_YET from twisted.web.resource import Resource @@ -75,11 +75,19 @@ class StreamingResource(Resource): # TODO: at this point, Twisted wrote the request to a temporary file, # so it's a disk->disk operation. This has to be improved if benchmark # shows its worth. + args = (user, namespace, request) + d = threads.deferToThread(self._consume_stream, *args) + d.addCallback(lambda _: request.finish()) + + return NOT_DONE_YET + + def _consume_stream(self, user, namespace, request): + chunk_size = 2**14 content = request.content incoming_list = json.loads(content.readline()) - # TODO: NEEDS SANITIZING for (blob_id, size) in incoming_list: db = self._handler + # TODO: NEEDS SANITIZING path = db._get_path(user, blob_id, namespace) try: mkdir_p(os.path.split(path)[0]) @@ -88,11 +96,10 @@ class StreamingResource(Resource): with open(path, 'wb') as blob_fd: consumed = 0 while consumed < size: - read_size = min(size - consumed, 2**16) + read_size = min(size - consumed, chunk_size) data = content.read(read_size) consumed += read_size blob_fd.write(data) - return '' def _startDownstream(self, user, namespace, request): raw_content = request.content.read()