Skip to content
Snippets Groups Projects
Verified Commit d7894edd authored by drebs's avatar drebs
Browse files

[bug] transfer maximum allowed number of blobs

-- Closes: #9004
parent 7f2eec3d
Branches
No related tags found
1 merge request!168[bug] transfer maximum allowed number of blobs
Pipeline #
......@@ -309,17 +309,15 @@ class BlobManager(BlobsSynchronizer):
yield self.local.update_sync_status(
doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace,
priority=priority)
yield self._send(doc.blob_id, namespace, 1, 1)
yield self._send(doc.blob_id, namespace)
def _send(self, blob_id, namespace, i, total):
def _send(self, blob_id, namespace):
lock = self.locks[blob_id]
d = lock.run(self.__send, blob_id, namespace, i, total)
d = lock.run(self.__send, blob_id, namespace)
return d
@defer.inlineCallbacks
def __send(self, blob_id, namespace, i, total):
logger.info("Sending blob to server (%d/%d): %s"
% (i, total, blob_id))
def __send(self, blob_id, namespace):
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
fd = yield self.local.get(blob_id, namespace=namespace)
......
......@@ -106,29 +106,33 @@ class BlobsSynchronizer(object):
@defer.inlineCallbacks
def _send_missing(self, namespace):
# the list of priorities must be refreshed every time a new blob will
# be transferred. To do that, we use a semaphore and get a new ordered
# list only when there are free slots for new transfers.
max_transfers = self.concurrent_transfers_limit
semaphore = defer.DeferredSemaphore(max_transfers)
# the list of blobs should be refreshed often, so we run as many
# concurrent transfers as we can and then refresh the list
scheduled = set()
while True:
d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace)
missing = yield d
if not missing:
d = semaphore.run(self._send_next, namespace, scheduled)
success = yield d
if not success:
break
total = len(missing)
now = min(total, max_transfers)
logger.info("There are %d pending blob uploads." % total)
logger.info("Will send %d blobs to server now." % now)
missing = missing[:now]
deferreds = []
for i in xrange(now):
blob_id = missing.pop(0)
d = semaphore.run(
with_retry, self._send, blob_id, namespace, i, total)
deferreds.append(d)
yield defer.gatherResults(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def _send_next(self, namespace, scheduled):
status = SyncStatus.PENDING_UPLOAD
pending = yield self.local_list_status(status, namespace)
pending = [x for x in pending if x not in scheduled]
logger.info("There are %d pending blob uploads." % len(pending))
if not pending:
# we are finished, indicate that to our caller
defer.returnValue(False)
blob_id = pending[0]
logger.info("Sending blob: %s" % (blob_id,))
yield with_retry(self._send, blob_id, namespace)
defer.returnValue(True)
def fetch_missing(self, namespace=''):
"""
......@@ -149,30 +153,33 @@ class BlobsSynchronizer(object):
@defer.inlineCallbacks
def _fetch_missing(self, namespace=''):
# the list of priorities must be refreshed every time a new blob will
# be transferred. To do that, we use a semaphore and get a new ordered
# list only when there are free slots for new transfers.
max_transfers = self.concurrent_transfers_limit
semaphore = defer.DeferredSemaphore(max_transfers)
# in order to make sure that transfer priorities will be met, the list
# of blobs to transfer should be refreshed often. What we do is run as
# many concurrent transfers as we can and then refresh the list
scheduled = set()
while True:
d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
docs_we_want = yield d
if not docs_we_want:
d = semaphore.run(self._fetch_next, namespace, scheduled)
success = yield d
if not success:
break
total = len(docs_we_want)
now = min(total, max_transfers)
logger.info("There are %d pending blob downloads." % total)
logger.info("Will fetch %d blobs from server now." % now)
docs_we_want = docs_we_want[:now]
deferreds = []
for i in xrange(now):
blob_id = docs_we_want.pop(0)
logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id))
d = semaphore.run(with_retry, self._fetch, blob_id, namespace)
deferreds.append(d)
yield defer.gatherResults(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def _fetch_next(self, namespace, scheduled):
status = SyncStatus.PENDING_DOWNLOAD
pending = yield self.local_list_status(status, namespace)
pending = [x for x in pending if x not in scheduled]
logger.info("There are %d pending blob downloads." % len(pending))
if not pending:
# we are finished, indicate that to our caller
defer.returnValue(False)
blob_id = pending[0]
logger.info("Fetching blob: %s" % (blob_id,))
yield with_retry(self._fetch, blob_id, namespace)
defer.returnValue(True)
@defer.inlineCallbacks
def sync(self, namespace=''):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment