diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index a0b48035629abea6040be29e2207e73f05f4eadb..8d469760fda82cae382ee2c2d8c3bc5ae5090599 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -50,6 +50,7 @@ from .sql import SyncStatus
 from .sql import Priority
 from .sql import SQLiteBlobBackend
 from .sync import BlobsSynchronizer
+from .upstream_producer import BlobsUpstreamProducer
 from .errors import (
     BlobAlreadyExistsError, MaximumRetriesError,
     RetriableTransferError, BlobNotFoundError, InvalidFlagsError)
@@ -498,25 +499,17 @@ class BlobManager(BlobsSynchronizer):
 
     @defer.inlineCallbacks
     def _upstream(self, blobs_id_list, namespace=''):
+        local, secret = self.local, self.secret
         uri = urljoin(self.remote_stream, self.user)
         params = {'namespace': namespace} if namespace else None
         sizes = yield self.local.get_size_list(blobs_id_list, namespace)
         convert = get_unarmored_ciphertext_size
-        sizes = map(lambda (x, y): (x, convert(y)), sizes)
-        data = BytesIO()  # TODO: stream from db
-        data.write(json.dumps(sizes) + '\n')
-        for blob_id in blobs_id_list:
-            blob_fd = yield self.local.get(blob_id, namespace=namespace)
-            doc_info = DocInfo(blob_id, FIXED_REV)
-            crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
-                                    armor=False)
-            fd = yield crypter.encrypt()
-            data.write(fd.read())
-        data.seek(0)
+        sizes = map(lambda (blob_id, size): (blob_id, convert(size)), sizes)
+        producer = BlobsUpstreamProducer(local, sizes, namespace, secret)
         params = {'namespace': namespace} if namespace else {}
         params['direction'] = 'upload'
-        response = yield self._client.post(uri, data=data, params=params)
-        check_http_status(response.code, blob_id)
+        response = yield self._client.post(uri, data=producer, params=params)
+        check_http_status(response.code, 'stream')
         logger.info("Finished stream up: %s" % (blobs_id_list,))
 
     @defer.inlineCallbacks
diff --git a/src/leap/soledad/client/_db/blobs/upstream_producer.py b/src/leap/soledad/client/_db/blobs/upstream_producer.py
new file mode 100644
index 0000000000000000000000000000000000000000..5c8f0530cb8588adf53dd6b1f402c50226018817
--- /dev/null
+++ b/src/leap/soledad/client/_db/blobs/upstream_producer.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+# upstream_producer.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+from zope.interface import implementer
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.iweb import IBodyProducer
+from twisted.web.iweb import UNKNOWN_LENGTH
+from leap.soledad.client._crypto import DocInfo
+from leap.soledad.client._crypto import BlobEncryptor
+
+
+FIXED_REV = 'ImmutableRevision'  # Blob content is immutable
+
+
+@implementer(IBodyProducer)
+class BlobsUpstreamProducer(object):
+    """
+    Blob producer for upload streams.
+    """
+
+    def __init__(self, database, blobs_lengths, namespace, secret):
+        """
+        Initialize the upload streamer.
+
+        :param database: Local blobs SQLCipher backend instance
+        :type database: .sql.SQLiteBlobBackend
+        :param blobs_lengths: List of blobs with ids and sizes
+        :type blobs_lengths: [(blob_id:str, size:int)]
+        :param namespace: Namespace which this stream belongs
+        :type namespace: str
+        :param secret: The secret used to encrypt blobs.
+        :type secret: str
+        """
+        self.blobs_lengths = blobs_lengths
+        self.db = database
+        self.length = UNKNOWN_LENGTH
+        self.pause = False
+        self.stop = False
+        self.namespace = namespace
+        self.secret = secret
+
+    @defer.inlineCallbacks
+    def startProducing(self, consumer):
+        """
+        Write blobs to the consumer.
+
+        :param consumer: Any IConsumer provider.
+        :type consumer: twisted.internet.interfaces.IConsumer
+
+        :return: A Deferred that fires when production ends.
+        :rtype: twisted.internet.defer.Deferred
+        """
+        consumer.write(json.dumps(self.blobs_lengths) + '\n')
+        for blob_id, _ in self.blobs_lengths:
+            blob_fd = yield self.db.get(blob_id, namespace=self.namespace)
+            doc_info = DocInfo(blob_id, FIXED_REV)
+            crypter = BlobEncryptor(doc_info, blob_fd, secret=self.secret,
+                                    armor=False)
+            fd = yield crypter.encrypt()
+            consumer.write(fd.read())
+
+    def sleep(self, secs):
+        d = defer.Deferred()
+        reactor.callLater(secs, d.callback, None)
+        return d
+
+    def pauseProducing(self):
+        self.pause = True
+
+    def stopProducing(self):
+        self.stop = True
+
+    def resumeProducing(self):
+        self.pause = False