Skip to content
Snippets Groups Projects
mat2 8.01 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python3
    
    import os
    
    import shutil
    
    from typing import Tuple, List, Union, Set
    
    import mimetypes
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
    import argparse
    
    dkg's avatar
    dkg committed
    import logging
    
    import concurrent.futures
    
        from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
        from libmat2 import check_dependencies, UnknownMemberPolicy
    
    except ValueError as e:
        print(e)
        sys.exit(1)
    
    __version__ = '0.11.0'
    
    # Make pyflakes happy
    
    assert Set
    
    assert Union
    
    logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
    
    
    def __check_file(filename: str, mode: int = os.R_OK) -> bool:
    
            print("[-] %s doesn't exist." % filename)
    
            return False
        elif not os.path.isfile(filename):
    
            print("[-] %s is not a regular file." % filename)
            return False
        elif not os.access(filename, mode):
    
            mode_str = []  # type: List[str]
            if mode & os.R_OK:
                mode_str += 'readable'
            if mode & os.W_OK:
                mode_str += 'writeable'
            print("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
    
            return False
        return True
    
    def create_arg_parser() -> argparse.ArgumentParser:
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
        parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
    
    dkg's avatar
    dkg committed
        parser.add_argument('-V', '--verbose', action='store_true',
                            help='show more verbose status information')
    
        parser.add_argument('--unknown-members', metavar='policy', default='abort',
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
                            help='how to handle unknown members of archive-style '
                            'files (policy should be one of: %s) [Default: abort]' %
                            ', '.join(p.value for p in UnknownMemberPolicy))
    
        parser.add_argument('--inplace', action='store_true',
                            help='clean in place, without backup')
    
        parser.add_argument('--no-sandbox', dest='sandbox', action='store_true',
    
                            default=False, help='Disable bubblewrap\'s sandboxing')
    
        excl_group = parser.add_mutually_exclusive_group()
        excl_group.add_argument('files', nargs='*', help='the files to process',
                                default=[])
        excl_group.add_argument('-v', '--version', action='version',
    
    georg's avatar
    georg committed
                                version='mat2 %s' % __version__)
    
        excl_group.add_argument('-l', '--list', action='store_true', default=False,
                                help='list all supported fileformats')
        excl_group.add_argument('--check-dependencies', action='store_true',
                                default=False,
    
    georg's avatar
    georg committed
                                help='check if mat2 has all the dependencies it '
    
                                'needs')
    
        excl_group = parser.add_mutually_exclusive_group()
        excl_group.add_argument('-L', '--lightweight', action='store_true',
                                help='remove SOME metadata')
        excl_group.add_argument('-s', '--show', action='store_true',
    
    georg's avatar
    georg committed
                                help='list harmful metadata detectable by mat2 '
    
                                'without removing them')
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
    
        return parser
    
    
    def show_meta(filename: str, sandbox: bool):
    
        if not __check_file(filename):
            return
    
    
        p, mtype = parser_factory.get_parser(filename)  # type: ignore
    
        if p is None:
    
            print("[-] %s's format (%s) is not supported" % (filename, mtype))
    
        p.sandbox = sandbox
    
        __print_meta(filename, p.get_meta())
    
    def __print_meta(filename: str, metadata: dict, depth: int = 1):
    
        padding = " " * depth*2
    
            print(padding + "No metadata found in %s." % filename)
    
        print("[%s] Metadata for %s:" % ('+'*depth, filename))
    
    
        for (k, v) in sorted(metadata.items()):
    
            if isinstance(v, dict):
    
                __print_meta(k, v, depth+1)
                continue
    
    
            # Remove control characters
            # We might use 'Cc' instead of 'C', but better safe than sorry
            # https://www.unicode.org/reports/tr44/#GC_Values_Table
            try:
                v = ''.join(ch for ch in v if not unicodedata.category(ch).startswith('C'))
            except TypeError:
                pass  # for things that aren't iterable
    
    
            try:  # FIXME this is ugly.
                print(padding + "  %s: %s" % (k, v))
            except UnicodeEncodeError:
                print(padding + "  %s: harmful content" % k)
    
    def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
    
                   policy: UnknownMemberPolicy) -> bool:
        mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
        if not __check_file(filename, mode):
    
            return False
    
        p, mtype = parser_factory.get_parser(filename)  # type: ignore
    
        if p is None:
            print("[-] %s's format (%s) is not supported" % (filename, mtype))
    
        p.unknown_member_policy = policy
    
        p.lightweight_cleaning = is_lightweight
    
        p.sandbox = sandbox
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
            logging.debug('Cleaning %s…', filename)
    
            ret = p.remove_all()
    
            if ret is True:
                shutil.copymode(filename, p.output_filename)
                if inplace is True:
                    os.rename(p.output_filename, filename)
    
        except RuntimeError as e:
            print("[-] %s can't be cleaned: %s" % (filename, e))
        return False
    
    
        print('[+] Supported formats:')
    
        formats = set()  # Set[str]
        for parser in parser_factory._get_parsers():  # type: ignore
    
            for mtype in parser.mimetypes:
    
                extensions = set()  # Set[str]
    
                for extension in mimetypes.guess_all_extensions(mtype):
    
                    if extension not in UNSUPPORTED_EXTENSIONS:
    
                        extensions.add(extension)
                if not extensions:
                    # we're not supporting a single extension in the current
                    # mimetype, so there is not point in showing the mimetype at all
                    continue
    
                formats.add('  - %s (%s)' % (mtype, ', '.join(extensions)))
    
        print('\n'.join(sorted(formats)))
    
    def __get_files_recursively(files: List[str]) -> List[str]:
        ret = set()  # type: Set[str]
    
                for path, _, _files in os.walk(f):
                    for _f in _files:
    
                        fname = os.path.join(path, _f)
                        if __check_file(fname):
    
                            ret.add(fname)
    
                ret.add(f)
        return list(ret)
    
    def main() -> int:
    
        arg_parser = create_arg_parser()
        args = arg_parser.parse_args()
    
    dkg's avatar
    dkg committed
        if args.verbose:
    
            logging.getLogger().setLevel(logging.DEBUG)
    
    dkg's avatar
    dkg committed
    
    
            elif args.check_dependencies:
    
    georg's avatar
    georg committed
                print("Dependencies for mat2 %s:" % __version__)
    
                for key, value in sorted(check_dependencies().items()):
    
                    print('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
                                           '(optional)' if not value['required'] else ''))
    
                arg_parser.print_help()
    
            return 0
    
        elif args.show:
    
            for f in __get_files_recursively(args.files):
    
                show_meta(f, args.sandbox)
    
            return 0
    
            inplace = args.inplace
    
            policy = UnknownMemberPolicy(args.unknown_members)
            if policy == UnknownMemberPolicy.KEEP:
    
                logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
    
            no_failure = True
    
            files = __get_files_recursively(args.files)
            # We have to use Processes instead of Threads, since
            # we're using tempfile.mkdtemp, which isn't thread-safe.
            with concurrent.futures.ProcessPoolExecutor() as executor:
                futures = list()
                for f in files:
    
                    future = executor.submit(clean_meta, f, args.lightweight,
    
                                             inplace, args.sandbox, policy)
    
                    futures.append(future)
                for future in concurrent.futures.as_completed(futures):
                    no_failure &= future.result()
    
            return 0 if no_failure is True else -1
    
    Julien (jvoisin) Voisin's avatar
    Julien (jvoisin) Voisin committed
    
    if __name__ == '__main__':