Skip to content
Snippets Groups Projects
Commit 40669186 authored by Julien (jvoisin) Voisin's avatar Julien (jvoisin) Voisin
Browse files

Add support for inplace cleaning

parent d76a6cbb
No related branches found
No related tags found
No related merge requests found
......@@ -53,6 +53,8 @@ def create_arg_parser() -> argparse.ArgumentParser:
help='how to handle unknown members of archive-style '
'files (policy should be one of: %s) [Default: abort]' %
', '.join(p.value for p in UnknownMemberPolicy))
parser.add_argument('--inplace', action='store_true',
help='clean in place, without backup')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('files', nargs='*', help='the files to process',
......@@ -114,8 +116,10 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1):
print(padding + " %s: harmful content" % k)
def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool:
if not __check_file(filename, os.R_OK):
def clean_meta(filename: str, is_lightweight: bool, inplace: bool,
policy: UnknownMemberPolicy) -> bool:
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
if not __check_file(filename, mode):
return False
p, mtype = parser_factory.get_parser(filename) # type: ignore
......@@ -127,7 +131,10 @@ def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy)
try:
logging.debug('Cleaning %s…', filename)
return p.remove_all()
ret = p.remove_all()
if inplace is True:
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
print("[-] %s can't be cleaned: %s" % (filename, e))
return False
......@@ -190,6 +197,7 @@ def main() -> int:
return 0
else:
inplace = args.inplace
policy = UnknownMemberPolicy(args.unknown_members)
if policy == UnknownMemberPolicy.KEEP:
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
......@@ -201,7 +209,8 @@ def main() -> int:
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = list()
for f in files:
future = executor.submit(clean_meta, f, args.lightweight, policy)
future = executor.submit(clean_meta, f, args.lightweight,
inplace, policy)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
......
......@@ -20,7 +20,7 @@ class TestHelp(unittest.TestCase):
def test_help(self):
proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [-v] [-l]',
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]',
stdout)
self.assertIn(b'[--check-dependencies] [-L | -s]', stdout)
self.assertIn(b'[files [files ...]]', stdout)
......@@ -28,7 +28,7 @@ class TestHelp(unittest.TestCase):
def test_no_arg(self):
proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [-v] [-l]',
self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]',
stdout)
self.assertIn(b'[--check-dependencies] [-L | -s]', stdout)
self.assertIn(b'[files [files ...]]', stdout)
......@@ -241,3 +241,34 @@ class TestCommandLineParallel(unittest.TestCase):
os.remove('./tests/data/dirty_%d.cleaned.jpg' % i)
os.remove(path)
os.remove('./tests/data/dirty_%d.docx' % i)
class TestInplaceCleaning(unittest.TestCase):
def test_cleaning(self):
shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg')
proc = subprocess.Popen(mat2_binary + ['--inplace', './tests/data/clean.jpg'],
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.jpg'],
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b' No metadata found in ./tests/data/clean.jpg.\n', stdout)
os.remove('./tests/data/clean.jpg')
def test_cleaning_multiple_one_fails(self):
files = ['./tests/data/clean_%d.jpg' % i for i in range(9)]
for f in files:
shutil.copy('./tests/data/dirty.jpg', f)
shutil.copy('./tests/data/dirty.torrent', './tests/data/clean_9.jpg')
proc = subprocess.Popen(mat2_binary + ['--inplace'] + files,
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
for f in files:
p = images.JPGParser(f)
meta = p.get_meta()
self.assertEqual(meta, {})
for i in range(10):
os.remove('./tests/data/clean_%d.jpg' % i)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment