From e8cf7f5bfbf5cfe83c82e06ab1ad028642bdf44a Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Mon, 27 Nov 2017 08:47:12 -0300
Subject: [PATCH] [feature] add a streaming resource

-- Related: #8809
---
 src/leap/soledad/server/_resource.py          |  4 +-
 .../soledad/server/_streaming_resource.py     | 69 +++++++++++++++++++
 src/leap/soledad/server/auth.py               |  6 ++
 src/leap/soledad/server/url_mapper.py         |  3 +
 tests/server/test__resource.py                |  9 ++-
 5 files changed, 89 insertions(+), 2 deletions(-)
 create mode 100644 src/leap/soledad/server/_streaming_resource.py

diff --git a/src/leap/soledad/server/_resource.py b/src/leap/soledad/server/_resource.py
index e3693c80..a6a76f4c 100644
--- a/src/leap/soledad/server/_resource.py
+++ b/src/leap/soledad/server/_resource.py
@@ -66,7 +66,8 @@ class PublicResource(Resource):
     for the Soledad Server.
     """
 
-    def __init__(self, blobs_resource=None, sync_pool=None):
+    def __init__(self, blobs_resource=None, streaming_resource=None,
+                 sync_pool=None):
         """
         Initialize the Soledad resource.
 
@@ -85,6 +86,7 @@ class PublicResource(Resource):
         # requests to /blobs will serve blobs if enabled
         if blobs_resource:
             self.putChild('blobs', blobs_resource)
+            self.putChild('stream', streaming_resource)
 
         # other requests are routed to legacy sync resource
         self._sync_resource = get_sync_resource(sync_pool)
diff --git a/src/leap/soledad/server/_streaming_resource.py b/src/leap/soledad/server/_streaming_resource.py
new file mode 100644
index 00000000..9805bb7e
--- /dev/null
+++ b/src/leap/soledad/server/_streaming_resource.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# _streaming_resource.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+A twisted resource that serves download as a single stream of multiple blobs.
+-> POST .../uuid/namespace/ DATA: [blob_id, blob_id2, ..., blob_idn]
+<- [(size(blob_id), content(blob_id)) for blob_id in DATA] (as a binary stream)
+"""
+import json
+import struct
+
+from twisted.web.server import NOT_DONE_YET
+from twisted.web.resource import Resource
+
+from leap.soledad.common.log import getLogger
+from . import interfaces
+from ._blobs import FilesystemBlobsBackend
+from ._blobs import ImproperlyConfiguredException
+
+
+__all__ = ['StreamingResource']
+
+
+logger = getLogger(__name__)
+SIZE_PACKER = struct.Struct("I")
+
+
+class StreamingResource(Resource):
+    isLeaf = True
+
+    # Allowed backend classes are defined here
+    handlers = {"filesystem": FilesystemBlobsBackend}
+
+    def __init__(self, backend, blobs_path, **backend_kwargs):
+        Resource.__init__(self)
+        self._blobs_path = blobs_path
+        backend_kwargs.update({'blobs_path': blobs_path})
+        if backend not in self.handlers:
+            raise ImproperlyConfiguredException("No such backend: %s", backend)
+        self._handler = self.handlers[backend](**backend_kwargs)
+        assert interfaces.IBlobsBackend.providedBy(self._handler)
+
+    def render_POST(self, request):
+        user, namespace = request.postpath
+        db = self.factory.open_database(user)
+        raw_content = request.content.read()
+        blob_ids = json.loads(raw_content)
+        for blob_id in blob_ids:
+            path = db._get_path(user, blob_id, namespace)
+            size = db.get_blob_size(user, blob_id, namespace)
+            request.write(SIZE_PACKER.pack(size))
+            with open(path, 'rb') as blob_fd:
+                request.content.write(blob_fd.read())
+
+        request.finish()
+        return NOT_DONE_YET
diff --git a/src/leap/soledad/server/auth.py b/src/leap/soledad/server/auth.py
index 637666da..a0fea7e0 100644
--- a/src/leap/soledad/server/auth.py
+++ b/src/leap/soledad/server/auth.py
@@ -42,6 +42,7 @@ from leap.soledad.common.log import getLogger
 from ._resource import PublicResource, AnonymousResource
 from ._resource import LocalResource
 from ._blobs import BlobsResource
+from ._streaming_resource import StreamingResource
 from ._config import get_config
 
 
@@ -65,10 +66,15 @@ class SoledadRealm(object):
             "filesystem",
             conf['blobs_path'],
             concurrent_writes=concurrent_writes) if blobs else None
+        streaming_resource = StreamingResource(
+            "filesystem",
+            conf['blobs_path'],
+            concurrent_writes=concurrent_writes) if blobs else None
         self.anon_resource = AnonymousResource(
             enable_blobs=blobs)
         self.auth_resource = PublicResource(
             blobs_resource=blobs_resource,
+            streaming_resource=streaming_resource,
             sync_pool=sync_pool)
 
     def requestAvatar(self, avatarId, mind, *interfaces):
diff --git a/src/leap/soledad/server/url_mapper.py b/src/leap/soledad/server/url_mapper.py
index 9578911a..a658b712 100644
--- a/src/leap/soledad/server/url_mapper.py
+++ b/src/leap/soledad/server/url_mapper.py
@@ -60,6 +60,7 @@ class URLMapper(object):
             /blobs/{uuid}/{blob_id}         | GET, PUT, DELETE, POST
             /blobs/{uuid}                   | GET
             /incoming/                      | PUT
+            /stream/{uuid}/{namespace}      | POST
         """
         # global resource
         self._connect('/', ['GET'])
@@ -77,5 +78,7 @@ class URLMapper(object):
         self._connect('/blobs/{uuid}/{blob_id}',
                       ['GET', 'PUT', 'DELETE', 'POST'])
         self._connect('/blobs/{uuid}/', ['GET'])
+        # streaming resource
+        self._connect('/stream/{uuid}/{namespace}', ['POST'])
         # incoming resource
         self._connect('/incoming/{target_user_uuid}/{incoming_id}', ['PUT'])
diff --git a/tests/server/test__resource.py b/tests/server/test__resource.py
index a43ac19f..f3788de1 100644
--- a/tests/server/test__resource.py
+++ b/tests/server/test__resource.py
@@ -28,6 +28,7 @@ from leap.soledad.server._resource import LocalResource
 from leap.soledad.server._server_info import ServerInfo
 from leap.soledad.server._blobs import BlobsResource
 from leap.soledad.server._incoming import IncomingResource
+from leap.soledad.server._streaming_resource import StreamingResource
 from leap.soledad.server.gzip_middleware import GzipMiddleware
 
 
@@ -46,11 +47,17 @@ class PublicResourceTestCase(unittest.TestCase):
 
     def test_get_blobs_enabled(self):
         blobs_resource = BlobsResource("filesystem", '/tmp')
+        streaming_resource = StreamingResource("filesystem", '/tmp')
         resource = PublicResource(
-            blobs_resource=blobs_resource, sync_pool=_pool)
+            blobs_resource=blobs_resource,
+            streaming_resource=streaming_resource,
+            sync_pool=_pool)
         request = DummyRequest(['blobs'])
         child = getChildForRequest(resource, request)
         self.assertIsInstance(child, BlobsResource)
+        request = DummyRequest(['stream'])
+        child = getChildForRequest(resource, request)
+        self.assertIsInstance(child, StreamingResource)
 
     def test_get_blobs_disabled(self):
         blobs_resource = None
-- 
GitLab