Commit d2b2a54a authored by jvoisin's avatar jvoisin
Browse files

MAT2's cli now uses meaningful return codes

- Simplify the multiprocessing by using a Pool
- Use some functional () constructions to exit
  with a return code
- Add some tests to prove that we're doing things
  that are working correctly
parent a79c9410
Pipeline #14686 passed with stage
in 1 minute and 14 seconds
import os
from typing import Tuple
import sys
import itertools
import mimetypes
import argparse
from threading import Thread
import multiprocessing
from queue import Queue
from src import parser_factory
......@@ -52,14 +53,15 @@ def show_meta(filename:str):
print(" %s: harmful content" % k)
def clean_meta(filename:str, is_lightweigth:bool) -> bool:
def clean_meta(params:Tuple[str, bool]) -> bool:
filename, is_lightweigth = params
if not __check_file(filename, os.R_OK|os.W_OK):
p, mtype = parser_factory.get_parser(filename)
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
if is_lightweigth:
return p.remove_all_lightweight()
return p.remove_all()
......@@ -82,15 +84,6 @@ def __get_files_recursively(files):
for _f in _files:
yield os.path.join(path, _f)
def __do_clean_async(is_lightweigth, q):
while True:
f = q.get()
if f is None: # nothing more to process
clean_meta(f, is_lightweigth)
def main():
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
......@@ -106,24 +99,13 @@ def main():
else: # Thread the cleaning
p = multiprocessing.Pool()
mode = (args.lightweight is True)
q = Queue(maxsize=0)
threads = list()
for f in __get_files_recursively(args.files):
for _ in range(multiprocessing.cpu_count()):
worker = Thread(target=__do_clean_async, args=(mode, q))
for _ in range(multiprocessing.cpu_count()):
q.put(None) # stop the threads
for worker in threads:
l = zip(__get_files_recursively(args.files), itertools.repeat(mode))
ret = list(p.imap_unordered(clean_meta, list(l)))
return 0 if all(ret) else -1
if __name__ == '__main__':
......@@ -16,6 +16,22 @@ class TestHelp(unittest.TestCase):
self.assertIn(b'usage: [-h] [-c] [-l] [-s] [-L] [files [files ...]]', stdout)
class TestReturnValue(unittest.TestCase):
def test_nonzero(self):
ret =['./', './'], stdout=subprocess.DEVNULL)
self.assertEqual(255, ret)
ret =['./', '--whololo'], stderr=subprocess.DEVNULL)
self.assertEqual(2, ret)
def test_zero(self):
ret =['./'], stdout=subprocess.DEVNULL)
self.assertEqual(0, ret)
ret =['./', '--show', './'], stdout=subprocess.DEVNULL)
self.assertEqual(0, ret)
class TestCleanMeta(unittest.TestCase):
def test_jpg(self):
shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment