Skip to content
Snippets Groups Projects
Verified Commit 481afd8f authored by drebs's avatar drebs
Browse files

[feature] cache user quota on blobs filesystem backend

Closes: 9016
parent 383a19aa
Branches
No related tags found
1 merge request!173#9016 - Cache user quota in blobs filesystem backend
......@@ -28,12 +28,14 @@ import os
import base64
import json
import re
import time
from twisted.web import resource
from twisted.web.client import FileBodyProducer
from twisted.web.server import NOT_DONE_YET
from twisted.internet import utils, defer
from collections import defaultdict
from zope.interface import implementer
from leap.common.files import mkdir_p
......@@ -79,6 +81,8 @@ class QuotaExceeded(Exception):
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
USAGE_TIMEOUT = 30
def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024,
concurrent_writes=50):
self.quota = quota
......@@ -86,6 +90,8 @@ class FilesystemBlobsBackend(object):
if not os.path.isdir(blobs_path):
os.makedirs(blobs_path)
self.path = blobs_path
self.usage = defaultdict(lambda: (None, None))
self.usage_locks = defaultdict(defer.DeferredLock)
def __touch(self, path):
open(path, 'a')
......@@ -129,21 +135,37 @@ class FilesystemBlobsBackend(object):
@defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
# limit the number of concurrent writes to disk
yield self.semaphore.acquire()
path = self._get_path(user, blob_id, namespace)
try:
mkdir_p(os.path.split(path)[0])
except OSError as e:
logger.warn("Got exception trying to create directory: %r" % e)
if os.path.isfile(path):
raise BlobExists
used = yield self.get_total_storage(user)
if used > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
with open(path, 'wb') as blobfile:
yield producer.startProducing(blobfile)
yield self.semaphore.release()
path = self._get_path(user, blob_id, namespace)
try:
mkdir_p(os.path.split(path)[0])
except OSError as e:
logger.warn("Got exception trying to create directory: %r" % e)
if os.path.isfile(path):
raise BlobExists
used = yield self.get_total_storage(user)
length = producer.length / 1024.0 # original length is in bytes
if used + length > self.quota:
raise QuotaExceeded
logger.info('writing blob: %s - %s' % (user, blob_id))
with open(path, 'wb') as blobfile:
yield producer.startProducing(blobfile)
used += length
yield self._update_usage(user, used)
finally:
self.semaphore.release()
@defer.inlineCallbacks
def _update_usage(self, user, used):
lock = self.usage_locks[user]
yield lock.acquire()
try:
_, timestamp = self.usage[user]
self.usage[user] = (used, timestamp)
finally:
lock.release()
def delete_blob(self, user, blob_id, namespace=''):
try:
......@@ -217,12 +239,19 @@ class FilesystemBlobsBackend(object):
if flag in blob_flags:
yield blob_path
@defer.inlineCallbacks
def get_total_storage(self, user):
lock = self.usage_locks[user]
yield lock.acquire()
try:
path = self._get_path(user)
except Exception as e:
return defer.fail(e)
return self._get_disk_usage(path)
used, timestamp = self.usage[user]
if used is None or time.time() > timestamp + self.USAGE_TIMEOUT:
path = self._get_path(user)
used = yield self._get_disk_usage(path)
self.usage[user] = (used, time.time())
defer.returnValue(used)
finally:
lock.release()
def get_tag(self, user, blob_id, namespace=''):
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment