Commit 42f81219 authored by jvoisin's avatar jvoisin

Add support for inplace cleaning

This is done in mat2 and not in libmat2 to make the
implementation as simple as possible.
parent 49e0c43a
Pipeline #26041 passed with stages
in 3 minutes and 52 seconds
......@@ -49,6 +49,8 @@ def create_arg_parser() -> argparse.ArgumentParser:
parser.add_argument('files', nargs='*', help='the files to process')
parser.add_argument('-v', '--version', action='version',
version='MAT2 %s' % __version__)
parser.add_argument('--inplace', action='store_true',
help='clean the files in-place')
parser.add_argument('-l', '--list', action='store_true',
help='list all supported fileformats')
parser.add_argument('--check-dependencies', action='store_true',
......@@ -107,8 +109,10 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1):
print(padding + " %s: harmful content" % k)
def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool:
if not __check_file(filename, os.R_OK):
def clean_meta(filename: str, is_lightweight: bool,
policy: UnknownMemberPolicy, inplace: bool) -> bool:
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
if not __check_file(filename, mode):
return False
p, mtype = parser_factory.get_parser(filename) # type: ignore
......@@ -120,7 +124,10 @@ def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy)
try:
logging.debug('Cleaning %s…', filename)
return p.remove_all()
ret = p.remove_all()
if inplace is True:
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
print("[-] %s can't be cleaned: %s" % (filename, e))
return False
......@@ -183,6 +190,7 @@ def main() -> int:
return 0
else:
inplace = args.inplace
policy = UnknownMemberPolicy(args.unknown_members)
if policy == UnknownMemberPolicy.KEEP:
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
......@@ -194,7 +202,7 @@ def main() -> int:
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = list()
for f in files:
future = executor.submit(clean_meta, f, args.lightweight, policy)
future = executor.submit(clean_meta, f, args.lightweight, policy, inplace)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
......
......@@ -20,14 +20,14 @@ class TestHelp(unittest.TestCase):
def test_help(self):
proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b'usage: mat2 [-h] [-v] [-l] [--check-dependencies] [-V]',
self.assertIn(b'usage: mat2 [-h]',
stdout)
self.assertIn(b'[--unknown-members policy] [-s | -L]', stdout)
def test_no_arg(self):
proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b'usage: mat2 [-h] [-v] [-l] [--check-dependencies] [-V]',
self.assertIn(b'usage: mat2 [-h]',
stdout)
self.assertIn(b'[--unknown-members policy] [-s | -L]', stdout)
......@@ -239,3 +239,15 @@ class TestCommandLineParallel(unittest.TestCase):
os.remove('./tests/data/dirty_%d.cleaned.jpg' % i)
os.remove(path)
os.remove('./tests/data/dirty_%d.docx' % i)
class TestInplaceCleaning(unittest.TestCase):
def test_cleaning(self):
shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg')
proc = subprocess.Popen(mat2_binary + ['--inplace', './tests/data/clean.jpg'],
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.jpg'],
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b' No metadata found in ./tests/data/clean.jpg.\n', stdout)
os.remove('./tests/data/clean.jpg')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment