Verified Commit ad1d5384 authored by jfriedli's avatar jfriedli
Browse files

merged refactoring from master

parents 38cd722d e1bac8b6
Pipeline #38071 passed with stages
in 1 minute and 34 seconds
import os
import hmac
import mimetypes as mtype
from uuid import uuid4
import jinja2
import base64
import io
import binascii
import zipfile
from cerberus import Validator
import utils
import file_removal_scheduler
from libmat2 import parser_factory
from flask import Flask, flash, request, redirect, url_for, render_template, send_from_directory, after_this_request
from flask_restful import Resource, Api, reqparse, abort
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
from matweb import utils, rest_api, frontend
from flask import Flask
from flask_restful import Api
from flask_cors import CORS
from urllib.parse import urljoin
def create_app(test_config=None):
......@@ -32,240 +19,33 @@ def create_app(test_config=None):
if test_config is not None:
app.config.update(test_config)
# Non JS Frontend
app.jinja_loader = jinja2.ChoiceLoader([ # type: ignore
jinja2.FileSystemLoader(app.config['CUSTOM_TEMPLATES_DIR']),
app.jinja_loader,
])
app.register_blueprint(frontend.routes)
# Restful API hookup
api = Api(app)
CORS(app, resources={r"/api/*": {"origins": utils.get_allow_origin_header_value()}})
@app.route('/download/<string:key>/<string:filename>')
def download_file(key: str, filename: str):
if filename != secure_filename(filename):
return redirect(url_for('upload_file'))
complete_path, filepath = get_file_paths(filename)
file_removal_scheduler.run_file_removal_job(app.config['UPLOAD_FOLDER'])
if not os.path.exists(complete_path):
return redirect(url_for('upload_file'))
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
return redirect(url_for('upload_file'))
@after_this_request
def remove_file(response):
if os.path.exists(complete_path):
os.remove(complete_path)
return response
return send_from_directory(app.config['UPLOAD_FOLDER'], filepath, as_attachment=True)
@app.route('/', methods=['GET', 'POST'])
def upload_file():
utils.check_upload_folder(app.config['UPLOAD_FOLDER'])
mimetypes = get_supported_extensions()
if request.method == 'POST':
if 'file' not in request.files: # check if the post request has the file part
flash('No file part')
return redirect(request.url)
uploaded_file = request.files['file']
if not uploaded_file.filename:
flash('No selected file')
return redirect(request.url)
filename, filepath = save_file(uploaded_file)
parser, mime = get_file_parser(filepath)
if parser is None:
flash('The type %s is not supported' % mime)
return redirect(url_for('upload_file'))
try:
meta = parser.get_meta()
if parser.remove_all() is not True:
return flash_cleanup_error_response(mime)
except ValueError:
return flash_cleanup_error_response(mime)
key, meta_after, output_filename = cleanup(parser, filepath)
return render_template(
'download.html', mimetypes=mimetypes, meta=meta, filename=output_filename, meta_after=meta_after, key=key
)
max_file_size = int(app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024)
return render_template('index.html', max_file_size=max_file_size, mimetypes=mimetypes)
def get_supported_extensions():
extensions = set()
for parser in parser_factory._get_parsers():
for m in parser.mimetypes:
extensions |= set(mtype.guess_all_extensions(m, strict=False))
# since `guess_extension` might return `None`, we need to filter it out
return sorted(filter(None, extensions))
def save_file(file):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(os.path.join(filepath))
return filename, filepath
def get_file_parser(filepath: str):
parser, mime = parser_factory.get_parser(filepath)
return parser, mime
def flash_cleanup_error_response(mime):
flash('Unable to clean %s' % mime)
return redirect(url_for('upload_file'))
def api_cleanup_error(mime):
abort(500, message='Unable to clean %s' % mime)
def cleanup(parser, filepath):
output_filename = os.path.basename(parser.output_filename)
parser, _ = parser_factory.get_parser(parser.output_filename)
meta_after = parser.get_meta()
os.remove(filepath)
key = utils.hash_file(os.path.join(app.config['UPLOAD_FOLDER'], output_filename))
return key, meta_after, output_filename
def get_file_paths(filename):
filepath = secure_filename(filename)
complete_path = os.path.join(app.config['UPLOAD_FOLDER'], filepath)
return complete_path, filepath
def is_valid_api_download_file(filename, key):
if filename != secure_filename(filename):
abort(400, message='Insecure filename')
complete_path, filepath = get_file_paths(filename)
if not os.path.exists(complete_path):
abort(404, message='File not found')
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
abort(400, message='The file hash does not match')
return complete_path, filepath
class APIUpload(Resource):
def post(self):
utils.check_upload_folder(app.config['UPLOAD_FOLDER'])
req_parser = reqparse.RequestParser()
req_parser.add_argument('file_name', type=str, required=True, help='Post parameter is not specified: file_name')
req_parser.add_argument('file', type=str, required=True, help='Post parameter is not specified: file')
args = req_parser.parse_args()
try:
file_data = base64.b64decode(args['file'])
except binascii.Error as err:
abort(400, message='Failed decoding file: ' + str(err))
file = FileStorage(stream=io.BytesIO(file_data), filename=args['file_name'])
filename, filepath = save_file(file)
parser, mime = get_file_parser(filepath)
if parser is None:
abort(415, message='The type %s is not supported' % mime)
try:
meta = parser.get_meta()
if not parser.remove_all():
api_cleanup_error(mime)
except ValueError:
api_cleanup_error(mime)
key, meta_after, output_filename = cleanup(parser, filepath)
return utils.return_file_created_response(
output_filename,
mime,
key,
meta,
meta_after,
urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
)
class APIDownload(Resource):
def get(self, key: str, filename: str):
complete_path, filepath = is_valid_api_download_file(filename, key)
# Make sure the file is NOT deleted on HEAD requests
if request.method == 'GET':
file_removal_scheduler.run_file_removal_job(app.config['UPLOAD_FOLDER'])
@after_this_request
def remove_file(response):
if os.path.exists(complete_path):
os.remove(complete_path)
return response
return send_from_directory(app.config['UPLOAD_FOLDER'], filepath, as_attachment=True)
class APIBulkDownloadCreator(Resource):
schema = {
'download_list': {
'type': 'list',
'minlength': 2,
'maxlength': int(os.environ.get('MAT2_MAX_FILES_BULK_DOWNLOAD', 10)),
'schema': {
'type': 'dict',
'schema': {
'key': {'type': 'string', 'required': True},
'file_name': {'type': 'string', 'required': True}
}
}
}
}
v = Validator(schema)
def post(self):
utils.check_upload_folder(app.config['UPLOAD_FOLDER'])
data = request.json
if not self.v.validate(data):
abort(400, message=self.v.errors)
# prevent the zip file from being overwritten
zip_filename = 'files.' + str(uuid4()) + '.zip'
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename)
cleaned_files_zip = zipfile.ZipFile(zip_path, 'w')
with cleaned_files_zip:
for file_candidate in data['download_list']:
complete_path, file_path = is_valid_api_download_file(
file_candidate['file_name'],
file_candidate['key']
)
try:
cleaned_files_zip.write(complete_path)
os.remove(complete_path)
except ValueError:
abort(400, message='Creating the archive failed')
try:
cleaned_files_zip.testzip()
except ValueError as e:
abort(400, message=str(e))
parser, mime = get_file_parser(zip_path)
if not parser.remove_all():
abort(500, message='Unable to clean %s' % mime)
key, meta_after, output_filename = cleanup(parser, zip_path)
return {
'output_filename': output_filename,
'mime': mime,
'key': key,
'meta_after': meta_after,
'download_link': urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
}, 201
class APISupportedExtensions(Resource):
def get(self):
return get_supported_extensions()
api.add_resource(APIUpload, '/api/upload')
api.add_resource(APIDownload, '/api/download/<string:key>/<string:filename>')
api.add_resource(APIBulkDownloadCreator, '/api/download/bulk')
api.add_resource(APISupportedExtensions, '/api/extension')
api.add_resource(
rest_api.APIUpload,
'/api/upload',
resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']}
)
api.add_resource(
rest_api.APIDownload,
'/api/download/<string:key>/<string:filename>',
resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']}
)
api.add_resource(
rest_api.APIBulkDownloadCreator,
'/api/download/bulk',
resource_class_kwargs={'upload_folder': app.config['UPLOAD_FOLDER']}
)
api.add_resource(rest_api.APISupportedExtensions, '/api/extension')
return app
......
import hmac
import os
from flask import Blueprint, render_template, url_for, current_app, after_this_request, send_from_directory, request, \
flash
from werkzeug.utils import secure_filename, redirect
from matweb import file_removal_scheduler, utils
routes = Blueprint('routes', __name__)
@routes.route('/info')
def info():
utils.get_supported_extensions()
return render_template(
'info.html', extensions=utils.get_supported_extensions()
)
@routes.route('/download/<string:key>/<string:filename>')
def download_file(key: str, filename: str):
if filename != secure_filename(filename):
return redirect(url_for('routes.upload_file'))
complete_path, filepath = utils.get_file_paths(filename, current_app.config['UPLOAD_FOLDER'])
file_removal_scheduler.run_file_removal_job(current_app.config['UPLOAD_FOLDER'])
if not os.path.exists(complete_path):
return redirect(url_for('routes.upload_file'))
if hmac.compare_digest(utils.hash_file(complete_path), key) is False:
return redirect(url_for('routes.upload_file'))
@after_this_request
def remove_file(response):
if os.path.exists(complete_path):
os.remove(complete_path)
return response
return send_from_directory(current_app.config['UPLOAD_FOLDER'], filepath, as_attachment=True)
@routes.route('/', methods=['GET', 'POST'])
def upload_file():
utils.check_upload_folder(current_app.config['UPLOAD_FOLDER'])
mime_types = utils.get_supported_extensions()
if request.method == 'POST':
if 'file' not in request.files: # check if the post request has the file part
flash('No file part')
return redirect(request.url)
uploaded_file = request.files['file']
if not uploaded_file.filename:
flash('No selected file')
return redirect(request.url)
filename, filepath = utils.save_file(uploaded_file, current_app.config['UPLOAD_FOLDER'])
parser, mime = utils.get_file_parser(filepath)
if parser is None:
flash('The type %s is not supported' % mime)
return redirect(url_for('routes.upload_file'))
meta = parser.get_meta()
if parser.remove_all() is not True:
flash('Unable to clean %s' % mime)
return redirect(url_for('routes.upload_file'))
key, meta_after, output_filename = utils.cleanup(parser, filepath, current_app.config['UPLOAD_FOLDER'])
return render_template(
'download.html', mimetypes=mime_types, meta=meta, filename=output_filename, meta_after=meta_after, key=key
)
max_file_size = int(current_app.config['MAX_CONTENT_LENGTH'] / 1024 / 1024)
return render_template('index.html', max_file_size=max_file_size, mimetypes=mime_types)
\ No newline at end of file
import os
import base64
import io
import binascii
import zipfile
from uuid import uuid4
from flask import after_this_request, send_from_directory
from flask_restful import Resource, reqparse, abort, request
from cerberus import Validator
from werkzeug.datastructures import FileStorage
from urllib.parse import urljoin
from matweb import file_removal_scheduler, utils
class APIUpload(Resource):
def __init__(self, **kwargs):
self.upload_folder = kwargs['upload_folder']
def post(self):
utils.check_upload_folder(self.upload_folder)
req_parser = reqparse.RequestParser()
req_parser.add_argument('file_name', type=str, required=True, help='Post parameter is not specified: file_name')
req_parser.add_argument('file', type=str, required=True, help='Post parameter is not specified: file')
args = req_parser.parse_args()
try:
file_data = base64.b64decode(args['file'])
except binascii.Error as err:
abort(400, message='Failed decoding file: ' + str(err))
file = FileStorage(stream=io.BytesIO(file_data), filename=args['file_name'])
filename, filepath = utils.save_file(file, self.upload_folder)
parser, mime = utils.get_file_parser(filepath)
if parser is None:
abort(415, message='The type %s is not supported' % mime)
meta = parser.get_meta()
if not parser.remove_all():
abort(500, message='Unable to clean %s' % mime)
key, meta_after, output_filename = utils.cleanup(parser, filepath, self.upload_folder)
return utils.return_file_created_response(
output_filename,
mime,
key,
meta,
meta_after,
urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
)
class APIDownload(Resource):
def __init__(self, **kwargs):
self.upload_folder = kwargs['upload_folder']
def get(self, key: str, filename: str):
complete_path, filepath = utils.is_valid_api_download_file(filename, key, self.upload_folder)
# Make sure the file is NOT deleted on HEAD requests
if request.method == 'GET':
file_removal_scheduler.run_file_removal_job(self.upload_folder)
@after_this_request
def remove_file(response):
if os.path.exists(complete_path):
os.remove(complete_path)
return response
return send_from_directory(self.upload_folder, filepath, as_attachment=True)
class APIBulkDownloadCreator(Resource):
def __init__(self, **kwargs):
self.upload_folder = kwargs['upload_folder']
schema = {
'download_list': {
'type': 'list',
'minlength': 2,
'maxlength': int(os.environ.get('MAT2_MAX_FILES_BULK_DOWNLOAD', 10)),
'schema': {
'type': 'dict',
'schema': {
'key': {'type': 'string', 'required': True},
'file_name': {'type': 'string', 'required': True}
}
}
}
}
v = Validator(schema)
def post(self):
utils.check_upload_folder(self.upload_folder)
data = request.json
if not self.v.validate(data):
abort(400, message=self.v.errors)
# prevent the zip file from being overwritten
zip_filename = 'files.' + str(uuid4()) + '.zip'
zip_path = os.path.join(self.upload_folder, zip_filename)
cleaned_files_zip = zipfile.ZipFile(zip_path, 'w')
with cleaned_files_zip:
for file_candidate in data['download_list']:
complete_path, file_path = utils.is_valid_api_download_file(
file_candidate['file_name'],
file_candidate['key'],
self.upload_folder
)
try:
cleaned_files_zip.write(complete_path)
os.remove(complete_path)
except ValueError:
abort(400, message='Creating the archive failed')
try:
cleaned_files_zip.testzip()
except ValueError as e:
abort(400, message=str(e))
parser, mime = utils.get_file_parser(zip_path)
if not parser.remove_all():
abort(500, message='Unable to clean %s' % mime)
key, meta_after, output_filename = utils.cleanup(parser, zip_path, self.upload_folder)
return {
'output_filename': output_filename,
'mime': mime,
'key': key,
'meta_after': meta_after,
'download_link': urljoin(request.host_url, '%s/%s/%s/%s' % ('api', 'download', key, output_filename))
}, 201
class APISupportedExtensions(Resource):
def get(self):
return utils.get_supported_extensions()
import hmac
import os
import hashlib
import mimetypes as mtype
from flask_restful import abort
from libmat2 import parser_factory
from werkzeug.utils import secure_filename
def get_allow_origin_header_value():
......@@ -31,3 +37,55 @@ def return_file_created_response(output_filename, mime, key, meta, meta_after, d
'meta_after': meta_after,
'download_link': download_link
}
def get_supported_extensions():
extensions = set()
for parser in parser_factory._get_parsers():
for m in parser.mimetypes:
extensions |= set(mtype.guess_all_extensions(m, strict=False))
# since `guess_extension` might return `None`, we need to filter it out
return sorted(filter(None, extensions))
def save_file(file, upload_folder):
filename = secure_filename(file.filename)
filepath = os.path.join(upload_folder, filename)
file.save(os.path.join(filepath))
return filename, filepath
def get_file_parser(filepath: str):
parser, mime = parser_factory.get_parser(filepath)
return parser, mime
def cleanup(parser, filepath, upload_folder):
output_filename = os.path.basename(parser.output_filename)
parser, _ = parser_factory.get_parser(parser.output_filename)
meta_after = parser.get_meta()
os.remove(filepath)
key = hash_file(os.path.join(upload_folder, output_filename))
return key, meta_after, output_filename
def get_file_paths(filename, upload_folder):
filepath = secure_filename(filename)
complete_path = os.path.join(upload_folder, filepath)
return complete_path, filepath
def is_valid_api_download_file(filename, key, upload_folder):
if filename != secure_filename(filename):
abort(400, message='Insecure filename')
complete_path, filepath = get_file_paths(filename, upload_folder)
if not os.path.exists(complete_path):
abort(404, message='File not found')
if hmac.compare_digest(hash_file(complete_path), key) is False:
abort(400, message='The file hash does not match')
return complete_path, filepath
......@@ -14,11 +14,6 @@
unicode-range: U+0000-00FF, U+0131, U+0152-0153, U+02C6, U+02DA, U+02DC, U+2000-206F, U+2074, U+20AC, U+2212, U+2215, U+E0FF, U+EFFD, U+F000;
}
.container {
margin-top: 18rem;
}
.flashes {
text-align: center;
border: 1px solid #1EAEDB;
......@@ -26,8 +21,6 @@
footer {
width: 100%;
position: absolute;
bottom: 0;
height: 1%;
text-align: center;
}
......@@ -61,3 +54,111 @@ details[open] > summary:before {
.hover {
background-color: #f1f1f1;
}
.main-header {
background: #f4f7fb;
width: 100%;
height: 5rem;
margin-bottom: 5rem;
}
.logo {
margin-top: 2rem;
height: 7rem;