Skip to content
Snippets Groups Projects
Verified Commit 56a4ffd9 authored by Victor's avatar Victor
Browse files

[feature] add a resource for streaming

-- Related: #8809
parent 5ca40e18
No related branches found
No related tags found
1 merge request!172Upstreaming blobs
......@@ -19,6 +19,7 @@ A twisted resource that serves download as a single stream of multiple blobs.
-> POST .../uuid/ DATA: [blob_id, blob_id2, ..., blob_idn]
<- [(size(blob_id), content(blob_id)) for blob_id in DATA] (as a binary stream)
"""
import os
import json
import base64
......@@ -28,6 +29,7 @@ from twisted.internet import task, defer
from twisted.web.server import NOT_DONE_YET
from twisted.web.resource import Resource
from leap.common.files import mkdir_p
from leap.soledad.common.log import getLogger
from . import interfaces
from ._blobs import FilesystemBlobsBackend
......@@ -58,10 +60,45 @@ class StreamingResource(Resource):
def render_POST(self, request):
user = request.postpath[0]
namespace = request.args.get('namespace', ['default'])[0]
db = self._handler
direction = request.args.get('direction', ['download'])[0]
if direction == 'download':
return self._startDownstream(user, namespace, request)
elif direction == 'upload':
return self._startUpstream(user, namespace, request)
logger.error("Invalid direction value: %s - %s" % (user, direction))
request.setResponseCode(500)
request.write('error, supported direction values are download/upload')
request.finish()
return ''
def _startUpstream(self, user, namespace, request):
# TODO: at this point, Twisted wrote the request to a temporary file,
# so it's a disk->disk operation. This has to be improved if benchmark
# shows its worth.
content = request.content
incoming_list = json.loads(content.readline())
# TODO: NEEDS SANITIZING
for (blob_id, size) in incoming_list:
db = self._handler
path = db._get_path(user, blob_id, namespace)
try:
mkdir_p(os.path.split(path)[0])
except OSError as e:
logger.warn("Got exception trying to create directory: %r" % e)
with open(path, 'wb') as blob_fd:
consumed = 0
while consumed < size:
read_size = min(size - consumed, 2**16)
data = content.read(read_size)
consumed += read_size
blob_fd.write(data)
return ''
def _startDownstream(self, user, namespace, request):
raw_content = request.content.read()
blob_ids = json.loads(raw_content)
deferreds = []
db = self._handler
for blob_id in blob_ids:
def _get_blob_info(blob_id, path):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment