Skip to content
Snippets Groups Projects
Verified Commit a3d267e0 authored by Victor's avatar Victor
Browse files

[feature] consume received stream in a thread

As it's blocking and Twisted already stored everything on a file.

-- Related: #8809
parent 1bce0175
No related branches found
No related tags found
1 merge request!172Upstreaming blobs
...@@ -25,7 +25,7 @@ import base64 ...@@ -25,7 +25,7 @@ import base64
from zope.interface import implementer from zope.interface import implementer
from twisted.internet.interfaces import IPushProducer from twisted.internet.interfaces import IPushProducer
from twisted.internet import task, defer from twisted.internet import task, defer, threads
from twisted.web.server import NOT_DONE_YET from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource from twisted.web.resource import Resource
...@@ -75,11 +75,19 @@ class StreamingResource(Resource): ...@@ -75,11 +75,19 @@ class StreamingResource(Resource):
# TODO: at this point, Twisted wrote the request to a temporary file, # TODO: at this point, Twisted wrote the request to a temporary file,
# so it's a disk->disk operation. This has to be improved if benchmark # so it's a disk->disk operation. This has to be improved if benchmark
# shows its worth. # shows its worth.
args = (user, namespace, request)
d = threads.deferToThread(self._consume_stream, *args)
d.addCallback(lambda _: request.finish())
return NOT_DONE_YET
def _consume_stream(self, user, namespace, request):
chunk_size = 2**14
content = request.content content = request.content
incoming_list = json.loads(content.readline()) incoming_list = json.loads(content.readline())
# TODO: NEEDS SANITIZING
for (blob_id, size) in incoming_list: for (blob_id, size) in incoming_list:
db = self._handler db = self._handler
# TODO: NEEDS SANITIZING
path = db._get_path(user, blob_id, namespace) path = db._get_path(user, blob_id, namespace)
try: try:
mkdir_p(os.path.split(path)[0]) mkdir_p(os.path.split(path)[0])
...@@ -88,11 +96,10 @@ class StreamingResource(Resource): ...@@ -88,11 +96,10 @@ class StreamingResource(Resource):
with open(path, 'wb') as blob_fd: with open(path, 'wb') as blob_fd:
consumed = 0 consumed = 0
while consumed < size: while consumed < size:
read_size = min(size - consumed, 2**16) read_size = min(size - consumed, chunk_size)
data = content.read(read_size) data = content.read(read_size)
consumed += read_size consumed += read_size
blob_fd.write(data) blob_fd.write(data)
return ''
def _startDownstream(self, user, namespace, request): def _startDownstream(self, user, namespace, request):
raw_content = request.content.read() raw_content = request.content.read()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment