Verified Commit ce01ed16 authored by drebs's avatar drebs
Browse files

[bug] isolate operations in blobs file system backend

Closes: #9025
parent 804d47c9
......@@ -100,75 +100,103 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
def _fslock(self, path):
dirname, _ = os.path.split(path)
mkdir_p(dirname)
name = path + '.lock'
# TODO: evaluate the need to replace this for a readers-writer lock.
return defer.DeferredFilesystemLock(name)
@defer.inlineCallbacks
def read_blob(self, user, blob_id, consumer, namespace='', range=None):
logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
logger.debug('blob path: %s' % path)
with open(path) as fd:
if range is None:
producer = NoRangeProducer(consumer, fd)
else:
start, end = range
offset = start
size = end - start
args = (consumer, fd, offset, size)
producer = SingleRangeProducer(*args)
yield producer.start()
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
logger.info('reading blob: %s - %s@%s'
% (user, blob_id, namespace))
logger.debug('blob path: %s' % path)
with open(path) as fd:
if range is None:
producer = NoRangeProducer(consumer, fd)
else:
start, end = range
offset = start
size = end - start
args = (consumer, fd, offset, size)
producer = SingleRangeProducer(*args)
yield producer.start()
finally:
fslock.unlock()
@defer.inlineCallbacks
def get_flags(self, user, blob_id, namespace=''):
try:
path = self._get_path(user, blob_id, namespace)
except Exception as e:
return defer.fail(e)
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
return defer.fail(BlobNotFound((user, blob_id)))
raise BlobNotFound((user, blob_id))
if not os.path.isfile(path + '.flags'):
return defer.succeed([])
with open(path + '.flags', 'r') as flags_file:
flags = json.loads(flags_file.read())
return defer.succeed(flags)
defer.returnValue([])
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
with open(path + '.flags', 'r') as flags_file:
flags = json.loads(flags_file.read())
defer.returnValue(flags)
finally:
fslock.unlock()
@defer.inlineCallbacks
def set_flags(self, user, blob_id, flags, namespace=''):
try:
path = self._get_path(user, blob_id, namespace)
except Exception as e:
return defer.fail(e)
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
return defer.fail(BlobNotFound((user, blob_id)))
for flag in flags:
if flag not in ACCEPTED_FLAGS:
return defer.fail(InvalidFlag(flag))
with open(path + '.flags', 'w') as flags_file:
raw_flags = json.dumps(flags)
flags_file.write(raw_flags)
return defer.succeed(None)
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
for flag in flags:
if flag not in ACCEPTED_FLAGS:
raise InvalidFlag(flag)
with open(path + '.flags', 'w') as flags_file:
raw_flags = json.dumps(flags)
flags_file.write(raw_flags)
finally:
fslock.unlock()
@defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
# limit the number of concurrent writes to disk
yield self.semaphore.acquire()
path = self._get_path(user, blob_id, namespace)
if os.path.isfile(path):
raise BlobExists
fslock = self._fslock(path)
try:
path = self._get_path(user, blob_id, namespace)
yield fslock.deferUntilLocked()
try:
mkdir_p(os.path.split(path)[0])
except OSError as e:
logger.warn("Got exception trying to create directory: %r" % e)
if os.path.isfile(path):
raise BlobExists
used = yield self.get_total_storage(user)
length = producer.length / 1024.0 # original length is in bytes
if used + length > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
with open(path, 'wb') as blobfile:
yield producer.startProducing(blobfile)
used += length
yield self._update_usage(user, used)
# limit the number of concurrent writes to disk
yield self.semaphore.acquire()
try:
mkdir_p(os.path.split(path)[0])
except OSError as e:
logger.warn(
"Got exception trying to create directory: %r" % e)
used = yield self.get_total_storage(user)
length = producer.length / 1024.0
if used + length > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
with open(path, 'wb') as blobfile:
yield producer.startProducing(blobfile)
used += length
yield self._update_usage(user, used)
finally:
self.semaphore.release()
finally:
self.semaphore.release()
fslock.unlock()
@defer.inlineCallbacks
def _update_usage(self, user, used):
......@@ -180,30 +208,35 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
@defer.inlineCallbacks
def delete_blob(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
blob_path = self._get_path(user, blob_id, namespace)
except Exception as e:
return defer.fail(e)
if not os.path.isfile(blob_path):
return defer.fail(BlobNotFound((user, blob_id)))
self.__touch(blob_path + '.deleted')
os.unlink(blob_path)
try:
os.unlink(blob_path + '.flags')
except Exception:
pass
return defer.succeed(None)
yield fslock.deferUntilLocked()
self.__touch(path + '.deleted')
os.unlink(path)
try:
os.unlink(path + '.flags')
except Exception:
pass
finally:
fslock.unlock()
@defer.inlineCallbacks
def get_blob_size(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
blob_path = self._get_path(user, blob_id, namespace)
except Exception as e:
return defer.fail(e)
if not os.path.isfile(blob_path):
return defer.fail(BlobNotFound((user, blob_id)))
size = os.stat(blob_path).st_size
return defer.succeed(size)
yield fslock.deferUntilLocked()
size = os.stat(path).st_size
defer.returnValue(size)
finally:
fslock.unlock()
def count(self, user, namespace=''):
try:
......@@ -268,17 +301,20 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
@defer.inlineCallbacks
def get_tag(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
blob_path = self._get_path(user, blob_id, namespace)
except Exception as e:
return defer.fail(e)
if not os.path.isfile(blob_path):
return defer.fail(BlobNotFound((user, blob_id)))
with open(blob_path) as doc_file:
doc_file.seek(-16, 2)
tag = base64.urlsafe_b64encode(doc_file.read())
return defer.succeed(tag)
yield fslock.deferUntilLocked()
with open(path) as doc_file:
doc_file.seek(-16, 2)
tag = base64.urlsafe_b64encode(doc_file.read())
defer.returnValue(tag)
finally:
fslock.unlock()
@defer.inlineCallbacks
def _get_disk_usage(self, start_path):
......@@ -302,12 +338,15 @@ class FilesystemBlobsBackend(object):
raise Exception(err)
return desired_path
@defer.inlineCallbacks
def exists(self, user, blob_id, namespace):
path = self._get_path(user, blob_id, namespace)
fslock = self._fslock(path)
try:
path = self._get_path(user, blob_id=blob_id, namespace=namespace)
except Exception as e:
return defer.fail(e)
return os.path.isfile(path)
yield fslock.deferUntilLocked()
defer.returnValue(os.path.isfile(path))
finally:
fslock.unlock()
def _get_path(self, user, blob_id='', namespace=''):
parts = [user]
......
......@@ -23,6 +23,10 @@ class IBlobsBackend(Interface):
"""
An interface for a backend that can store blobs.
There might be concurrent calls to methods that modify the same blob, so
it's the backend implementation's responsibility to ensure isolation of
such actions.
"""
def read_blob(user, blob_id, consumer, namespace='', range=None):
......@@ -66,8 +70,11 @@ class IBlobsBackend(Interface):
backend storage.
:rtype: twisted.internet.defer.Deferred
:raise BlobExists: Raised when a blob with that id already exists.
:raise QuotaExceeded: Raised when the quota for that user was exceeded.
:raise BlobExists: Raised (asynchronously) when a blob with that id
already exists.
:raise QuotaExceeded: Raised (asynchronously) when the quota for that
user was exceeded.
"""
def delete_blob(user, blob_id, namespace=''):
......@@ -218,5 +225,7 @@ class IBlobsBackend(Interface):
:raise BlobNotFound: Raised (asynchronously) when the blob was not
found in the backend.
:raise InvalidFlag: Raised when one of the flags passed is invalid.
:raise InvalidFlag: Raised (asynchronously) when one of the flags
passed is invalid.
"""
......@@ -77,8 +77,6 @@ class FilesystemBackendTestCase(unittest.TestCase):
@pytest.mark.usefixtures("method_tmpdir")
@mock.patch.object(os.path, 'isfile')
@mock.patch.object(_blobs.FilesystemBlobsBackend, '_get_path',
Mock(return_value='path'))
@defer.inlineCallbacks
def test_cannot_overwrite(self, isfile):
isfile.return_value = True
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment