Skip to content
Snippets Groups Projects
Commit 38df679a authored by Julien (jvoisin) Voisin's avatar Julien (jvoisin) Voisin
Browse files

Optimize the handling of problematic files

parent 44f267a5
No related branches found
Tags 0.12.1
1 merge request!26Optimize handling of problematic files
Pipeline #19891 passed
import abc
import os
import re
from typing import Set, Dict, Union
assert Set # make pyflakes happy
......@@ -17,6 +18,11 @@ class AbstractParser(abc.ABC):
"""
:raises ValueError: Raised upon an invalid file
"""
if re.search('^[a-z0-9./]', filename) is None:
# Some parsers are calling external binaries,
# this prevents shell command injections
filename = os.path.join('.', filename)
self.filename = filename
fname, extension = os.path.splitext(filename)
self.output_filename = fname + '.cleaned' + extension
......
import json
import os
import re
import shutil
import subprocess
import tempfile
from typing import Dict, Union, Set, Callable, Any
from typing import Dict, Union, Set
from . import abstract
......@@ -20,23 +16,8 @@ class ExiftoolParser(abstract.AbstractParser):
"""
meta_whitelist = set() # type: Set[str]
def _handle_problematic_filename(self, callback: Callable[[str], Any]) -> bytes:
""" This method takes a filename with a potentially problematic name,
and safely applies a `callback` to it.
"""
if re.search('^[a-z0-9/]', self.filename) is not None:
return callback(self.filename)
tmpdirname = tempfile.mkdtemp()
fname = os.path.join(tmpdirname, "temp_file")
shutil.copy(self.filename, fname)
out = callback(fname)
shutil.rmtree(tmpdirname)
return out
def get_meta(self) -> Dict[str, Union[str, dict]]:
fun = lambda f: subprocess.check_output([_get_exiftool_path(), '-json', f])
out = self._handle_problematic_filename(fun)
out = subprocess.check_output([_get_exiftool_path(), '-json', self.filename])
meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_whitelist:
meta.pop(key, None)
......
......@@ -24,10 +24,9 @@ class AVIParser(exiftool.ExiftoolParser):
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'}
def __remove_all_internal(self, filename: str):
def remove_all(self):
cmd = [_get_ffmpeg_path(),
'-i', filename, # input file
'-i', self.filename, # input file
'-y', # overwrite existing output file
'-loglevel', 'panic', # Don't show log
'-hide_banner', # hide the banner
......@@ -38,11 +37,8 @@ class AVIParser(exiftool.ExiftoolParser):
'-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata
self.output_filename]
subprocess.check_call(cmd)
def remove_all(self) -> bool:
try:
self._handle_problematic_filename(self.__remove_all_internal)
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
......
......@@ -204,3 +204,14 @@ class TestCorruptedFiles(unittest.TestCase):
p = video.AVIParser('./tests/data/clean.avi')
self.assertFalse(p.remove_all())
os.remove('./tests/data/clean.avi')
def test_avi_injection(self):
try:
video._get_ffmpeg_path()
except RuntimeError:
raise unittest.SkipTest
shutil.copy('./tests/data/dirty.torrent', './tests/data/--output.avi')
p = video.AVIParser('./tests/data/--output.avi')
self.assertFalse(p.remove_all())
os.remove('./tests/data/--output.avi')
......@@ -46,10 +46,23 @@ class TestParameterInjection(unittest.TestCase):
shutil.copy('./tests/data/dirty.avi', './--output')
p = video.AVIParser('--output')
meta = p.get_meta()
print(meta)
self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1')
os.remove('--output')
def test_ffmpeg_injection_complete_path(self):
try:
video._get_ffmpeg_path()
except RuntimeError:
raise unittest.SkipTest
shutil.copy('./tests/data/dirty.avi', './tests/data/ --output.avi')
p = video.AVIParser('./tests/data/ --output.avi')
meta = p.get_meta()
self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1')
self.assertTrue(p.remove_all())
os.remove('./tests/data/ --output.avi')
os.remove('./tests/data/ --output.cleaned.avi')
class TestUnsupportedEmbeddedFiles(unittest.TestCase):
def test_odt_with_svg(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment