Skip to content
Snippets Groups Projects
mat2 8.01 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python3
    import os
    import shutil
    from typing import Tuple, List, Union, Set
    import mimetypes
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
    import argparse
    dkg's avatar
    dkg committed
    import logging
    import concurrent.futures
        from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
        from libmat2 import check_dependencies, UnknownMemberPolicy
    except ValueError as e:
    __version__ = '0.11.0'
    # Make pyflakes happy
    assert Set
    assert Union
    logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
    def __check_file(filename: str, mode: int = os.R_OK) -> bool:
            print("[-] %s doesn't exist." % filename)
            return False
        elif not os.path.isfile(filename):
            print("[-] %s is not a regular file." % filename)
            return False
        elif not os.access(filename, mode):
            mode_str = []  # type: List[str]
            if mode & os.R_OK:
                mode_str += 'readable'
            if mode & os.W_OK:
                mode_str += 'writeable'
            print("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
            return False
        return True
    def create_arg_parser() -> argparse.ArgumentParser:
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
        parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
    dkg's avatar
    dkg committed
        parser.add_argument('-V', '--verbose', action='store_true',
                            help='show more verbose status information')
        parser.add_argument('--unknown-members', metavar='policy', default='abort',
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
                            help='how to handle unknown members of archive-style '
                            'files (policy should be one of: %s) [Default: abort]' %
                            ', '.join(p.value for p in UnknownMemberPolicy))
        parser.add_argument('--inplace', action='store_true',
                            help='clean in place, without backup')
        parser.add_argument('--no-sandbox', dest='sandbox', action='store_true',
                            default=False, help='Disable bubblewrap\'s sandboxing')
        excl_group = parser.add_mutually_exclusive_group()
        excl_group.add_argument('files', nargs='*', help='the files to process',
        excl_group.add_argument('-v', '--version', action='version',
    georg's avatar
    georg committed
                                version='mat2 %s' % __version__)
        excl_group.add_argument('-l', '--list', action='store_true', default=False,
                                help='list all supported fileformats')
        excl_group.add_argument('--check-dependencies', action='store_true',
    georg's avatar
    georg committed
                                help='check if mat2 has all the dependencies it '
        excl_group = parser.add_mutually_exclusive_group()
        excl_group.add_argument('-L', '--lightweight', action='store_true',
                                help='remove SOME metadata')
        excl_group.add_argument('-s', '--show', action='store_true',
    georg's avatar
    georg committed
                                help='list harmful metadata detectable by mat2 '
                                'without removing them')
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
        return parser
    def show_meta(filename: str, sandbox: bool):
        if not __check_file(filename):
        p, mtype = parser_factory.get_parser(filename)  # type: ignore
        if p is None:
            print("[-] %s's format (%s) is not supported" % (filename, mtype))
        p.sandbox = sandbox
        __print_meta(filename, p.get_meta())
    def __print_meta(filename: str, metadata: dict, depth: int = 1):
        padding = " " * depth*2
            print(padding + "No metadata found in %s." % filename)
        print("[%s] Metadata for %s:" % ('+'*depth, filename))
        for (k, v) in sorted(metadata.items()):
            if isinstance(v, dict):
                __print_meta(k, v, depth+1)
            # Remove control characters
            # We might use 'Cc' instead of 'C', but better safe than sorry
                v = ''.join(ch for ch in v if not unicodedata.category(ch).startswith('C'))
            except TypeError:
                pass  # for things that aren't iterable
            try:  # FIXME this is ugly.
                print(padding + "  %s: %s" % (k, v))
            except UnicodeEncodeError:
                print(padding + "  %s: harmful content" % k)
    def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
                   policy: UnknownMemberPolicy) -> bool:
        mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
        if not __check_file(filename, mode):
            return False
        p, mtype = parser_factory.get_parser(filename)  # type: ignore
        if p is None:
            print("[-] %s's format (%s) is not supported" % (filename, mtype))
        p.unknown_member_policy = policy
        p.lightweight_cleaning = is_lightweight
        p.sandbox = sandbox
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
            logging.debug('Cleaning %s…', filename)
            ret = p.remove_all()
            if ret is True:
                shutil.copymode(filename, p.output_filename)
                if inplace is True:
                    os.rename(p.output_filename, filename)
        except RuntimeError as e:
            print("[-] %s can't be cleaned: %s" % (filename, e))
        return False
        print('[+] Supported formats:')
        formats = set()  # Set[str]
        for parser in parser_factory._get_parsers():  # type: ignore
            for mtype in parser.mimetypes:
                extensions = set()  # Set[str]
                for extension in mimetypes.guess_all_extensions(mtype):
                    if extension not in UNSUPPORTED_EXTENSIONS:
                if not extensions:
                    # we're not supporting a single extension in the current
                    # mimetype, so there is not point in showing the mimetype at all
                formats.add('  - %s (%s)' % (mtype, ', '.join(extensions)))
    def __get_files_recursively(files: List[str]) -> List[str]:
        ret = set()  # type: Set[str]
                for path, _, _files in os.walk(f):
                    for _f in _files:
                        fname = os.path.join(path, _f)
                        if __check_file(fname):
        return list(ret)
    def main() -> int:
        arg_parser = create_arg_parser()
        args = arg_parser.parse_args()
    dkg's avatar
    dkg committed
        if args.verbose:
    dkg's avatar
    dkg committed
            elif args.check_dependencies:
    georg's avatar
    georg committed
                print("Dependencies for mat2 %s:" % __version__)
                for key, value in sorted(check_dependencies().items()):
                    print('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
                                           '(optional)' if not value['required'] else ''))
            return 0
            for f in __get_files_recursively(args.files):
                show_meta(f, args.sandbox)
            return 0
            inplace = args.inplace
            policy = UnknownMemberPolicy(args.unknown_members)
            if policy == UnknownMemberPolicy.KEEP:
                logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
            no_failure = True
            files = __get_files_recursively(args.files)
            # We have to use Processes instead of Threads, since
            # we're using tempfile.mkdtemp, which isn't thread-safe.
            with concurrent.futures.ProcessPoolExecutor() as executor:
                futures = list()
                for f in files:
                    future = executor.submit(clean_meta, f, args.lightweight,
                                             inplace, args.sandbox, policy)
                for future in concurrent.futures.as_completed(futures):
                    no_failure &= future.result()
            return 0 if no_failure is True else -1
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
    if __name__ == '__main__':