Skip to content
Snippets Groups Projects
Verified Commit 1bce0175 authored by Victor's avatar Victor
Browse files

[feature] add a producer protocol for upstream

-- Resolves: #8809
parent 56a4ffd9
No related branches found
No related tags found
1 merge request!172Upstreaming blobs
...@@ -50,6 +50,7 @@ from .sql import SyncStatus ...@@ -50,6 +50,7 @@ from .sql import SyncStatus
from .sql import Priority from .sql import Priority
from .sql import SQLiteBlobBackend from .sql import SQLiteBlobBackend
from .sync import BlobsSynchronizer from .sync import BlobsSynchronizer
from .upstream_producer import BlobsUpstreamProducer
from .errors import ( from .errors import (
BlobAlreadyExistsError, MaximumRetriesError, BlobAlreadyExistsError, MaximumRetriesError,
RetriableTransferError, BlobNotFoundError, InvalidFlagsError) RetriableTransferError, BlobNotFoundError, InvalidFlagsError)
...@@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer): ...@@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer):
@defer.inlineCallbacks @defer.inlineCallbacks
def _upstream(self, blobs_id_list, namespace=''): def _upstream(self, blobs_id_list, namespace=''):
local, secret = self.local, self.secret
uri = urljoin(self.remote_stream, self.user) uri = urljoin(self.remote_stream, self.user)
params = {'namespace': namespace} if namespace else None params = {'namespace': namespace} if namespace else None
sizes = yield self.local.get_size_list(blobs_id_list, namespace) sizes = yield self.local.get_size_list(blobs_id_list, namespace)
convert = get_unarmored_ciphertext_size convert = get_unarmored_ciphertext_size
sizes = map(lambda (x, y): (x, convert(y)), sizes) sizes = map(lambda (blob_id, size): (blob_id, convert(size)), sizes)
data = BytesIO() # TODO: stream from db producer = BlobsUpstreamProducer(local, sizes, namespace, secret)
data.write(json.dumps(sizes) + '\n')
for blob_id in blobs_id_list:
blob_fd = yield self.local.get(blob_id, namespace=namespace)
doc_info = DocInfo(blob_id, FIXED_REV)
crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
armor=False)
fd = yield crypter.encrypt()
data.write(fd.read())
data.seek(0)
params = {'namespace': namespace} if namespace else {} params = {'namespace': namespace} if namespace else {}
params['direction'] = 'upload' params['direction'] = 'upload'
response = yield self._client.post(uri, data=data, params=params) response = yield self._client.post(uri, data=producer, params=params)
check_http_status(response.code, blob_id) check_http_status(response.code, 'stream')
logger.info("Finished stream up: %s" % (blobs_id_list,)) logger.info("Finished stream up: %s" % (blobs_id_list,))
@defer.inlineCallbacks @defer.inlineCallbacks
......
# -*- coding: utf-8 -*-
# upstream_producer.py
# Copyright (C) 2017 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.iweb import IBodyProducer
from twisted.web.iweb import UNKNOWN_LENGTH
from leap.soledad.client._crypto import DocInfo
from leap.soledad.client._crypto import BlobEncryptor
FIXED_REV = 'ImmutableRevision' # Blob content is immutable
@implementer(IBodyProducer)
class BlobsUpstreamProducer(object):
"""
Blob producer for upload streams.
"""
def __init__(self, database, blobs_lengths, namespace, secret):
"""
Initialize the upload streamer.
:param database: Local blobs SQLCipher backend instance
:type database: .sql.SQLiteBlobBackend
:param blobs_lengths: List of blobs with ids and sizes
:type blobs_lengths: [(blob_id:str, size:int)]
:param namespace: Namespace which this stream belongs
:type namespace: str
:param secret: The secret used to encrypt blobs.
:type secret: str
"""
self.blobs_lengths = blobs_lengths
self.db = database
self.length = UNKNOWN_LENGTH
self.pause = False
self.stop = False
self.namespace = namespace
self.secret = secret
@defer.inlineCallbacks
def startProducing(self, consumer):
"""
Write blobs to the consumer.
:param consumer: Any IConsumer provider.
:type consumer: twisted.internet.interfaces.IConsumer
:return: A Deferred that fires when production ends.
:rtype: twisted.internet.defer.Deferred
"""
consumer.write(json.dumps(self.blobs_lengths) + '\n')
for blob_id, _ in self.blobs_lengths:
blob_fd = yield self.db.get(blob_id, namespace=self.namespace)
doc_info = DocInfo(blob_id, FIXED_REV)
crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
armor=False)
fd = yield crypter.encrypt()
consumer.write(fd.read())
def sleep(self, secs):
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)
return d
def pauseProducing(self):
self.pause = True
def stopProducing(self):
self.stop = True
def resumeProducing(self):
self.pause = False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment