Verified Commit 8a89f00a authored by drebs's avatar drebs
Browse files

[feature] turn fs lock into a decorator

parent ce01ed16
Pipeline #7688 passed with stage
in 83 minutes and 52 seconds
......@@ -82,6 +82,37 @@ class SingleRangeProducer(SingleRangeStaticProducer):
self.deferred.callback(None)
def isolated(path):
"""
A decorator that isolates execution of the decorated method using a file
system lock based on the given path. A symlink in ``{path}.lock`` will be
used to make sure only one isolated method is executed at a time for that
path.
"""
def decorator(method):
def new_method(*args, **kwargs):
dirname, _ = os.path.split(path)
mkdir_p(dirname)
name = path + '.lock'
# TODO: evaluate the need to replace this for a readers-writer lock
lock = defer.DeferredFilesystemLock(name)
def _release(result):
lock.unlock()
return result
d = lock.deferUntilLocked()
d.addCallback(lambda _: method(*args, **kwargs))
d.addCallbacks(_release, _release)
return d
return new_method
return decorator
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
......@@ -100,22 +131,14 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
def _fslock(self, path):
dirname, _ = os.path.split(path)
mkdir_p(dirname)
name = path + '.lock'
# TODO: evaluate the need to replace this for a readers-writer lock.
return defer.DeferredFilesystemLock(name)
@defer.inlineCallbacks
def read_blob(self, user, blob_id, consumer, namespace='', range=None):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
return defer.fail(BlobNotFound((user, blob_id)))
@isolated(path)
@defer.inlineCallbacks
def _read_blob():
logger.info('reading blob: %s - %s@%s'
% (user, blob_id, namespace))
logger.debug('blob path: %s' % path)
......@@ -129,52 +152,55 @@ class FilesystemBlobsBackend(object):
args = (consumer, fd, offset, size)
producer = SingleRangeProducer(*args)
yield producer.start()
finally:
fslock.unlock()
@defer.inlineCallbacks
return _read_blob()
def get_flags(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
return defer.fail(BlobNotFound((user, blob_id)))
if not os.path.isfile(path + '.flags'):
defer.returnValue([])
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
return defer.succeed([])
with open(path + '.flags', 'r') as flags_file:
flags = json.loads(flags_file.read())
defer.returnValue(flags)
finally:
fslock.unlock()
@isolated(path)
def _get_flags():
try:
with open(path + '.flags', 'r') as flags_file:
flags = json.loads(flags_file.read())
return defer.succeed(flags)
except Exception as e:
return defer.fail(e)
return _get_flags()
@defer.inlineCallbacks
def set_flags(self, user, blob_id, flags, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
for flag in flags:
if flag not in ACCEPTED_FLAGS:
raise InvalidFlag(flag)
with open(path + '.flags', 'w') as flags_file:
raw_flags = json.dumps(flags)
flags_file.write(raw_flags)
finally:
fslock.unlock()
return defer.fail(BlobNotFound((user, blob_id)))
@isolated(path)
def _set_flags():
try:
for flag in flags:
if flag not in ACCEPTED_FLAGS:
raise InvalidFlag(flag)
with open(path + '.flags', 'w') as flags_file:
raw_flags = json.dumps(flags)
flags_file.write(raw_flags)
return defer.succeed(None)
except Exception as e:
return defer.fail(e)
return _set_flags()
@defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
path = self._get_path(user, blob_id, namespace)
if os.path.isfile(path):
raise BlobExists
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
return defer.fail(BlobExists((user, blob_id)))
@isolated(path)
@defer.inlineCallbacks
def _write_blob():
try:
# limit the number of concurrent writes to disk
yield self.semaphore.acquire()
......@@ -195,8 +221,8 @@ class FilesystemBlobsBackend(object):
yield self._update_usage(user, used)
finally:
self.semaphore.release()
finally:
fslock.unlock()
return _write_blob()
@defer.inlineCallbacks
def _update_usage(self, user, used):
......@@ -208,35 +234,37 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
@defer.inlineCallbacks
def delete_blob(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
return defer.fail(BlobNotFound((user, blob_id)))
@isolated(path)
def _delete_blob():
self.__touch(path + '.deleted')
os.unlink(path)
try:
os.unlink(path + '.flags')
except Exception:
pass
finally:
fslock.unlock()
return defer.succeed(None)
return _delete_blob()
@defer.inlineCallbacks
def get_blob_size(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
return defer.fail(BlobNotFound((user, blob_id)))
@isolated(path)
def _get_blob_size():
size = os.stat(path).st_size
defer.returnValue(size)
finally:
fslock.unlock()
try:
return defer.succeed(size)
except Exception as e:
return defer.fail(e)
return _get_blob_size()
def count(self, user, namespace=''):
try:
......@@ -301,20 +329,22 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
@defer.inlineCallbacks
def get_tag(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
with open(path) as doc_file:
doc_file.seek(-16, 2)
tag = base64.urlsafe_b64encode(doc_file.read())
defer.returnValue(tag)
finally:
fslock.unlock()
return defer.fail(BlobNotFound((user, blob_id)))
@isolated(path)
def _get_tag():
try:
with open(path) as doc_file:
doc_file.seek(-16, 2)
tag = base64.urlsafe_b64encode(doc_file.read())
return defer.succeed(tag)
except Exception as e:
return defer.fail(e)
return _get_tag()
@defer.inlineCallbacks
def _get_disk_usage(self, start_path):
......@@ -341,12 +371,13 @@ class FilesystemBlobsBackend(object):
@defer.inlineCallbacks
def exists(self, user, blob_id, namespace):
path = self._get_path(user, blob_id, namespace)
fslock = self._fslock(path)
try:
yield fslock.deferUntilLocked()
@isolated(path)
@defer.inlineCallbacks
def _exists():
defer.returnValue(os.path.isfile(path))
finally:
fslock.unlock()
return _exists()
def _get_path(self, user, blob_id='', namespace=''):
parts = [user]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment