From 56a4ffd94a529722d50367ada38c1bcff64446fe Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Thu, 14 Dec 2017 14:33:21 -0300
Subject: [PATCH] [feature] add a resource for streaming

-- Related: #8809
---
 .../soledad/server/_streaming_resource.py     | 39 ++++++++++++++++++-
 1 file changed, 38 insertions(+), 1 deletion(-)

diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
index 59bb383e..bb5677ca 100644
--- a/src/leap/soledad/server/_streaming_resource.py
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -19,6 +19,7 @@ A twisted resource that serves download as a single stream of multiple blobs.
 -> POST .../uuid/ DATA: [blob_id, blob_id2, ..., blob_idn]
 <- [(size(blob_id), content(blob_id)) for blob_id in DATA] (as a binary stream)
 """
+import os
 import json
 import base64
 
@@ -28,6 +29,7 @@ from twisted.internet import task, defer
 from twisted.web.server import NOT_DONE_YET
 from twisted.web.resource import Resource
 
+from leap.common.files import mkdir_p
 from leap.soledad.common.log import getLogger
 from . import interfaces
 from ._blobs import FilesystemBlobsBackend
@@ -58,10 +60,45 @@ class StreamingResource(Resource):
     def render_POST(self, request):
         user = request.postpath[0]
         namespace = request.args.get('namespace', ['default'])[0]
-        db = self._handler
+        direction = request.args.get('direction', ['download'])[0]
+        if direction == 'download':
+            return self._startDownstream(user, namespace, request)
+        elif direction == 'upload':
+            return self._startUpstream(user, namespace, request)
+        logger.error("Invalid direction value: %s - %s" % (user, direction))
+        request.setResponseCode(500)
+        request.write('error, supported direction values are download/upload')
+        request.finish()
+        return ''
+
+    def _startUpstream(self, user, namespace, request):
+        # TODO: at this point, Twisted wrote the request to a temporary file,
+        # so it's a disk->disk operation. This has to be improved if benchmark
+        # shows its worth.
+        content = request.content
+        incoming_list = json.loads(content.readline())
+        # TODO: NEEDS SANITIZING
+        for (blob_id, size) in incoming_list:
+            db = self._handler
+            path = db._get_path(user, blob_id, namespace)
+            try:
+                mkdir_p(os.path.split(path)[0])
+            except OSError as e:
+                logger.warn("Got exception trying to create directory: %r" % e)
+            with open(path, 'wb') as blob_fd:
+                consumed = 0
+                while consumed < size:
+                    read_size = min(size - consumed, 2**16)
+                    data = content.read(read_size)
+                    consumed += read_size
+                    blob_fd.write(data)
+        return ''
+
+    def _startDownstream(self, user, namespace, request):
         raw_content = request.content.read()
         blob_ids = json.loads(raw_content)
         deferreds = []
+        db = self._handler
         for blob_id in blob_ids:
 
             def _get_blob_info(blob_id, path):
-- 
GitLab