Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tguinot/mat2
  • jvoisin/mat2
  • dachary/mat2
  • mejo-/mat2
  • LogicalDash/mat2
  • dkg/mat2
  • christian/mat2
  • Selflike323/mat2
  • fz/mat2
  • iwwmidatlanticgdc/mat2
  • Gu1nn3zz/mat2
  • smagnin/mat2
  • flashcode/mat2
  • MANCASTILLEJA/mat2
  • jboursier/mat2
  • tails/mat2
  • matiargs/mat2
  • Brolf/mat2
  • madaidan/mat2
  • Delmer84/mat2
  • yuebyzua/mat2
  • yyyyyyyan/mat2
  • rmnvgr/mat2
  • Marxism-Leninism/mat2
  • GNUtoo/mat2
  • allexj/mat2
  • b068931cc450442b63f5b3d276ea4297/mat2
  • chenrui/mat2
  • nosec13346/mat2
  • anelki/mat2
30 results
Show changes
"""
Wrapper around a subset of the subprocess module,
that uses bwrap (bubblewrap) when it is available.
Instead of importing subprocess, other modules should use this as follows:
from . import subprocess
"""
import os
import shutil
import subprocess
import tempfile
import functools
from typing import Optional, List
__all__ = ['PIPE', 'run', 'CalledProcessError']
PIPE = subprocess.PIPE
CalledProcessError = subprocess.CalledProcessError
# pylint: disable=subprocess-run-check
@functools.lru_cache(maxsize=None)
def _get_bwrap_path() -> str:
which_path = shutil.which('bwrap')
if which_path:
return which_path
raise RuntimeError("Unable to find bwrap") # pragma: no cover
def _get_bwrap_args(tempdir: str,
input_filename: str,
output_filename: Optional[str] = None) -> List[str]:
ro_bind_args = []
cwd = os.getcwd()
# XXX: use --ro-bind-try once all supported platforms
# have a bubblewrap recent enough to support it.
ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', '/etc/alternatives', cwd]
for bind_dir in ro_bind_dirs:
if os.path.isdir(bind_dir): # pragma: no cover
ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir])
ro_bind_files = ['/etc/ld.so.cache']
for bind_file in ro_bind_files:
if os.path.isfile(bind_file): # pragma: no cover
ro_bind_args.extend(['--ro-bind', bind_file, bind_file])
args = ro_bind_args + \
['--dev', '/dev',
'--proc', '/proc',
'--chdir', cwd,
'--unshare-user-try',
'--unshare-ipc',
'--unshare-pid',
'--unshare-net',
'--unshare-uts',
'--unshare-cgroup-try',
'--new-session',
'--cap-drop', 'all',
# XXX: enable --die-with-parent once all supported platforms have
# a bubblewrap recent enough to support it.
# '--die-with-parent',
]
if output_filename:
# Mount an empty temporary directory where the sandboxed
# process will create its output file
output_dirname = os.path.dirname(os.path.abspath(output_filename))
args.extend(['--bind', tempdir, output_dirname])
absolute_input_filename = os.path.abspath(input_filename)
args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename])
return args
def run(args: List[str],
input_filename: str,
output_filename: Optional[str] = None,
**kwargs) -> subprocess.CompletedProcess:
"""Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it
is available.
Extra supported keyword arguments:
- `input_filename`, made available read-only in the sandbox
- `output_filename`, where the file created by the sandboxed process
is copied upon successful completion; an empty temporary directory
is made visible as the parent directory of this file in the sandbox.
Optional: one valid use case is to invoke an external process
to inspect metadata present in a file.
"""
try:
bwrap_path = _get_bwrap_path()
except RuntimeError: # pragma: no cover
# bubblewrap is not installed ⇒ short-circuit
return subprocess.run(args, **kwargs)
with tempfile.TemporaryDirectory() as tempdir:
prefix_args = [bwrap_path] + \
_get_bwrap_args(input_filename=input_filename,
output_filename=output_filename,
tempdir=tempdir)
completed_process = subprocess.run(prefix_args + args, **kwargs)
if output_filename and completed_process.returncode == 0:
shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)),
output_filename)
return completed_process
import logging
import re
import uuid
import zipfile
import xml.etree.ElementTree as ET # type: ignore
from typing import Any, Dict
from . import archive, office
class EPUBParser(archive.ZipParser):
mimetypes = {'application/epub+zip', }
metadata_namespace = '{http://purl.org/dc/elements/1.1/}'
def __init__(self, filename):
super().__init__(filename)
self.files_to_keep = set(map(re.compile, { # type: ignore
'META-INF/container.xml',
'mimetype',
'OEBPS/content.opf',
'content.opf',
'hmh.opf',
'OPS/.+.xml'
}))
self.files_to_omit = set(map(re.compile, { # type: ignore
'iTunesMetadata.plist',
'META-INF/calibre_bookmarks.txt',
'OEBPS/package.opf',
}))
self.uniqid = uuid.uuid4()
def is_archive_valid(self):
super().is_archive_valid()
with zipfile.ZipFile(self.filename) as zin:
for item in self._get_all_members(zin):
member_name = self._get_member_name(item)
if member_name.endswith('META-INF/encryption.xml'):
raise ValueError('the file contains encrypted fonts')
def _specific_get_meta(self, full_path, file_path) -> Dict[str, Any]:
if not file_path.endswith('.opf'):
return {}
with open(full_path, encoding='utf-8') as f:
try:
results = re.findall(r"<((?:meta|dc|cp).+?)[^>]*>(.+)</\1>",
f.read(), re.I|re.M)
return {k:v for (k, v) in results}
except (TypeError, UnicodeDecodeError):
return {file_path: 'harmful content', }
def _specific_cleanup(self, full_path: str) -> bool:
if full_path.endswith('hmh.opf') or full_path.endswith('content.opf'):
return self.__handle_contentopf(full_path)
elif full_path.endswith('OEBPS/toc.ncx'):
return self.__handle_tocncx(full_path)
elif re.search('/OPS/[^/]+.xml$', full_path):
return self.__handle_ops_xml(full_path)
return True
def __handle_ops_xml(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError: # pragma: nocover
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('head'):
item.clear()
break
tree.write(full_path, xml_declaration=True, encoding='utf-8',
short_empty_elements=False)
return True
def __handle_tocncx(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError: # pragma: nocover
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('head'):
item.clear()
ET.SubElement(item, 'meta', attrib={'name': '', 'content': ''})
break
tree.write(full_path, xml_declaration=True, encoding='utf-8',
short_empty_elements=False)
return True
def __handle_contentopf(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError:
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('metadata'):
item.clear()
# item with mandatory content
uniqid = ET.Element(self.metadata_namespace + 'identifier')
uniqid.text = str(self.uniqid)
uniqid.set('id', 'id')
item.append(uniqid)
# items without mandatory content
for name in ['language', 'title']:
uniqid = ET.Element(self.metadata_namespace + name)
item.append(uniqid)
break # there is only a single <metadata> block
tree.write(full_path, xml_declaration=True, encoding='utf-8')
return True
import functools
import json
import logging
import os
import shutil
import subprocess
from typing import Union, Set, Dict
from . import abstract
from . import bubblewrap
class ExiftoolParser(abstract.AbstractParser):
""" Exiftool is often the easiest way to get all the metadata
from a import file, hence why several parsers are re-using its `get_meta`
method.
"""
meta_allowlist: Set[str] = set()
def get_meta(self) -> Dict[str, Union[str, Dict]]:
try:
if self.sandbox:
out = bubblewrap.run([_get_exiftool_path(), '-json',
self.filename],
input_filename=self.filename,
check=True, stdout=subprocess.PIPE).stdout
else:
out = subprocess.run([_get_exiftool_path(), '-json',
self.filename],
check=True, stdout=subprocess.PIPE).stdout
except subprocess.CalledProcessError: # pragma: no cover
raise ValueError
meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_allowlist:
meta.pop(key, None)
return meta
def _lightweight_cleanup(self) -> bool:
if os.path.exists(self.output_filename):
try: # exiftool can't force output to existing files
os.remove(self.output_filename)
except OSError as e: # pragma: no cover
logging.error("The output file %s is already existing and \
can't be overwritten: %s.", self.filename, e)
return False
# Note: '-All=' must be followed by a known exiftool option.
# Also, '-CommonIFD0' is needed for .tiff files
cmd = [_get_exiftool_path(),
'-all=', # remove metadata
'-adobe=', # remove adobe-specific metadata
'-exif:all=', # remove all exif metadata
'-Time:All=', # remove all timestamps
'-quiet', # don't show useless logs
'-CommonIFD0=', # remove IFD0 metadata
'-o', self.output_filename,
self.filename]
try:
if self.sandbox:
bubblewrap.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e: # pragma: no cover
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
return True
@functools.lru_cache(maxsize=None)
def _get_exiftool_path() -> str: # pragma: no cover
which_path = shutil.which('exiftool')
if which_path:
return which_path
# Exiftool on Arch Linux has a weird path
if os.access('/usr/bin/vendor_perl/exiftool', os.X_OK):
return '/usr/bin/vendor_perl/exiftool'
raise RuntimeError("Unable to find exiftool")
import shutil
from typing import Union, Dict
from . import abstract
class HarmlessParser(abstract.AbstractParser):
""" This is the parser for filetypes that can not contain metadata. """
mimetypes = {'text/plain', 'image/x-ms-bmp', 'image/bmp'}
def get_meta(self) -> Dict[str, Union[str, Dict]]:
return dict()
def remove_all(self) -> bool:
shutil.copy(self.filename, self.output_filename)
return True
import os
import re
from typing import Union, Any, Dict
import cairo
import gi
gi.require_version('GdkPixbuf', '2.0')
gi.require_version('Rsvg', '2.0')
from gi.repository import GdkPixbuf, GLib, Rsvg
from . import exiftool, abstract
class SVGParser(exiftool.ExiftoolParser):
mimetypes = {'image/svg+xml', }
meta_allowlist = {'Directory', 'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageWidth',
'MIMEType', 'SVGVersion', 'SourceFile', 'ViewBox'
}
def remove_all(self) -> bool:
try:
svg = Rsvg.Handle.new_from_file(self.filename)
except GLib.GError:
raise ValueError
try:
_, _, _, _, has_viewbox, viewbox = svg.get_intrinsic_dimensions()
if has_viewbox is False:
raise ValueError
_, width, height = svg.get_intrinsic_size_in_pixels()
except AttributeError:
dimensions = svg.get_dimensions()
height, width = dimensions.height, dimensions.width
surface = cairo.SVGSurface(self.output_filename, height, width)
context = cairo.Context(surface)
try:
svg.render_document(context, viewbox)
except AttributeError:
svg.render_cairo(context)
surface.finish()
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
# The namespace is mandatory, but only the …/2000/svg is valid.
ns = 'http://www.w3.org/2000/svg'
if meta.get('Xmlns') == ns:
meta.pop('Xmlns')
return meta
class PNGParser(exiftool.ExiftoolParser):
mimetypes = {'image/png', }
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', 'FileInodeChangeDate',
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType',
'Compression', 'Filter', 'Interlace', 'BackgroundColor',
'ImageSize', 'Megapixels', 'ImageHeight'}
def __init__(self, filename):
super().__init__(filename)
try: # better fail here than later
cairo.ImageSurface.create_from_png(self.filename)
except: # pragma: no cover
# Cairo is returning some weird exceptions :/
raise ValueError
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
surface = cairo.ImageSurface.create_from_png(self.filename)
surface.write_to_png(self.output_filename)
return True
class GIFParser(exiftool.ExiftoolParser):
mimetypes = {'image/gif'}
meta_allowlist = {'AnimationIterations', 'BackgroundColor', 'BitsPerPixel',
'ColorResolutionDepth', 'Directory', 'Duration',
'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'FrameCount', 'GIFVersion',
'HasColorMap', 'ImageHeight', 'ImageSize', 'ImageWidth',
'MIMEType', 'Megapixels', 'SourceFile',}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class GdkPixbufAbstractParser(exiftool.ExiftoolParser):
""" GdkPixbuf can handle a lot of surfaces, so we're rending images on it,
this has the side-effect of completely removing metadata.
"""
_type = ''
def __init__(self, filename):
super().__init__(filename)
try:
GdkPixbuf.Pixbuf.new_from_file(self.filename)
except GLib.GError:
raise ValueError
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
_, extension = os.path.splitext(self.filename)
pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename)
pixbuf = GdkPixbuf.Pixbuf.apply_embedded_orientation(pixbuf)
if extension.lower() == '.jpg':
extension = '.jpeg' # gdk is picky
elif extension.lower() == '.tif':
extension = '.tiff' # gdk is picky
try:
pixbuf.savev(self.output_filename, type=extension[1:],
option_keys=[], option_values=[])
except GLib.GError: # pragma: no cover
return False
return True
class JPGParser(GdkPixbufAbstractParser):
_type = 'jpeg'
mimetypes = {'image/jpeg'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation'}
class TiffParser(GdkPixbufAbstractParser):
_type = 'tiff'
mimetypes = {'image/tiff'}
meta_allowlist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
'FillOrder', 'PhotometricInterpretation',
'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel',
'StripByteCounts', 'StripOffsets', 'BitsPerSample',
'Directory', 'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile', 'Orientation'}
class PPMParser(abstract.AbstractParser):
mimetypes = {'image/x-portable-pixmap'}
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta: Dict[str, Union[str, Dict[Any, Any]]] = dict()
with open(self.filename) as f:
for idx, line in enumerate(f):
if line.lstrip().startswith('#'):
meta[str(idx)] = line.lstrip().rstrip()
return meta
def remove_all(self) -> bool:
with open(self.filename) as fin:
with open(self.output_filename, 'w') as fout:
for line in fin:
if not line.lstrip().startswith('#'):
line = re.sub(r"\s+", "", line, flags=re.UNICODE)
fout.write(line)
return True
class HEICParser(exiftool.ExiftoolParser):
mimetypes = {'image/heic'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'MajorBrand', 'MinorVersion',
'CompatibleBrands','HandlerType', 'PrimaryItemReference',
'HEVCConfigurationVersion', 'GeneralProfileSpace',
'GeneralTierFlag', 'GeneralProfileIDC',
'GenProfileCompatibilityFlags', 'ConstraintIndicatorFlags',
'GeneralLevelIDC', 'MinSpatialSegmentationIDC',
'ParallelismType','ChromaFormat', 'BitDepthLuma', 'BitDepthChroma',
'NumTemporalLayers', 'TemporalIDNested', 'ImageWidth',
'ImageHeight', 'ImageSpatialExtent', 'ImagePixelDepth',
'AverageFrameRate', 'ConstantFrameRate', 'MediaDataSize',
'MediaDataOffset','ImageSize', 'Megapixels'}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class WEBPParser(GdkPixbufAbstractParser):
mimetypes = {'image/webp'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation',
'HorizontalScale', 'VerticalScale', 'VP8Version'}
This diff is collapsed.
import glob
import os
import mimetypes
import importlib
from typing import TypeVar, Optional, List, Tuple
from . import abstract, UNSUPPORTED_EXTENSIONS
T = TypeVar('T', bound='abstract.AbstractParser')
mimetypes.add_type('application/epub+zip', '.epub')
mimetypes.add_type('application/x-dtbncx+xml', '.ncx') # EPUB Navigation Control XML File
# This should be removed after we move to python3.10
# https://github.com/python/cpython/commit/20a5b7e986377bdfd929d7e8c4e3db5847dfdb2d
mimetypes.add_type('image/heic', '.heic')
def __load_all_parsers():
""" Loads every parser in a dynamic way """
current_dir = os.path.dirname(__file__)
for fname in glob.glob(os.path.join(current_dir, '*.py')):
if fname.endswith('abstract.py'):
continue
elif fname.endswith('__init__.py'):
continue
elif fname.endswith('exiftool.py'):
continue
basename = os.path.basename(fname)
name, _ = os.path.splitext(basename)
importlib.import_module('.' + name, package='libmat2')
__load_all_parsers()
def _get_parsers() -> List[T]:
""" Get all our parsers!"""
def __get_parsers(cls):
return cls.__subclasses__() + \
[g for s in cls.__subclasses__() for g in __get_parsers(s)]
return __get_parsers(abstract.AbstractParser)
def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]:
""" Return the appropriate parser for a given filename.
:raises ValueError: Raised if the instantiation of the parser went wrong.
"""
mtype, _ = mimetypes.guess_type(filename)
_, extension = os.path.splitext(filename)
if extension.lower() in UNSUPPORTED_EXTENSIONS:
return None, mtype
if mtype == 'application/x-tar':
if extension[1:] in ('bz2', 'gz', 'xz'):
mtype = mtype + '+' + extension[1:]
for parser_class in _get_parsers(): # type: ignore
if mtype in parser_class.mimetypes:
# This instantiation might raise a ValueError on malformed files
return parser_class(filename), mtype
return None, mtype
......@@ -7,29 +7,42 @@ import re
import logging
import tempfile
import io
from typing import Union, Dict
import cairo
import gi
gi.require_version('Poppler', '0.18')
from gi.repository import Poppler
from gi.repository import Poppler, GLib
from . import abstract
logging.basicConfig(level=logging.DEBUG)
FIXED_PDF_VERSION = cairo.PDFVersion.VERSION_1_5
class PDFParser(abstract.AbstractParser):
mimetypes = {'application/pdf', }
meta_list = {'author', 'creation-date', 'creator', 'format', 'keywords',
'metadata', 'mod-date', 'producer', 'subject', 'title',
'viewer-preferences'}
'metadata', 'mod-date', 'producer', 'subject', 'title',
'viewer-preferences'}
def __init__(self, filename):
super().__init__(filename)
self.uri = 'file://' + os.path.abspath(self.filename)
self.__scale = 2 # how much precision do we want for the render
def remove_all_lightweight(self):
self.__scale = 200 / 72.0 # how much precision do we want for the render
try: # Check now that the file is valid, to avoid surprises later
Poppler.Document.new_from_file(self.uri, None)
except GLib.GError: # Invalid PDF
raise ValueError
def remove_all(self) -> bool:
if self.lightweight_cleaning is True:
try:
return self.__remove_all_lightweight()
except (cairo.Error, MemoryError) as e:
raise RuntimeError(e)
return self.__remove_all_thorough()
def __remove_all_lightweight(self) -> bool:
"""
Load the document into Poppler, render pages on a new PDFSurface.
"""
......@@ -37,7 +50,8 @@ class PDFParser(abstract.AbstractParser):
pages_count = document.get_n_pages()
tmp_path = tempfile.mkstemp()[1]
pdf_surface = cairo.PDFSurface(tmp_path, 10, 10)
pdf_surface = cairo.PDFSurface(tmp_path, 10, 10) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface) # context draws on the surface
for pagenum in range(pages_count):
......@@ -56,7 +70,7 @@ class PDFParser(abstract.AbstractParser):
return True
def remove_all(self):
def __remove_all_thorough(self) -> bool:
"""
Load the document into Poppler, render pages on PNG,
and shove those PNG into a new PDF.
......@@ -66,14 +80,20 @@ class PDFParser(abstract.AbstractParser):
_, tmp_path = tempfile.mkstemp()
pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface)
for pagenum in range(pages_count):
page = document.get_page(pagenum)
if page is None: # pragma: no cover
logging.error("Unable to get PDF pages")
return False
page_width, page_height = page.get_size()
logging.info("Rendering page %d/%d", pagenum + 1, pages_count)
img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, int(page_width) * self.__scale, int(page_height) * self.__scale)
width = int(page_width * self.__scale)
height = int(page_height * self.__scale)
img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
img_context = cairo.Context(img_surface)
img_context.scale(self.__scale, self.__scale)
......@@ -86,10 +106,14 @@ class PDFParser(abstract.AbstractParser):
buf.seek(0)
img = cairo.ImageSurface.create_from_png(buf)
pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale)
if cairo.version_info < (1, 12, 0):
pdf_surface.set_size(width, height)
else:
pdf_surface.set_size(page_width, page_height)
pdf_surface.set_device_scale(1 / self.__scale, 1 / self.__scale)
pdf_context.set_source_surface(img, 0, 0)
pdf_context.paint()
pdf_context.show_page()
pdf_context.show_page() # draw pdf_context on pdf_surface
pdf_surface.finish()
......@@ -99,29 +123,44 @@ class PDFParser(abstract.AbstractParser):
return True
def __remove_superficial_meta(self, in_file:str, out_file: str) -> bool:
@staticmethod
def __remove_superficial_meta(in_file: str, out_file: str) -> bool:
document = Poppler.Document.new_from_file('file://' + in_file)
document.set_producer('')
document.set_creator('')
document.set_creation_date(-1)
document.save('file://' + os.path.abspath(out_file))
return True
# Cairo adds "/Producer" and "/CreationDate", and Poppler sometimes
# fails to remove them, we have to use this terrible regex.
# It should(tm) be alright though, because cairo's output format
# for metadata is fixed.
with open(out_file, 'rb') as f:
out = re.sub(rb'<<[\s\n]*/Producer.*?>>', b' << >>', f.read(),
count=0, flags=re.DOTALL | re.IGNORECASE)
with open(out_file, 'wb') as f:
f.write(out)
return True
def __parse_metadata_field(self, data:str) -> dict:
@staticmethod
def __parse_metadata_field(data: str) -> Dict[str, str]:
metadata = {}
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
metadata[key] = value
return metadata
def get_meta(self):
def get_meta(self) -> Dict[str, Union[str, Dict]]:
""" Return a dict with all the meta of the file
"""
document = Poppler.Document.new_from_file(self.uri, None)
metadata = {}
document = Poppler.Document.new_from_file(self.uri, None)
for key in self.meta_list:
if document.get_property(key):
metadata[key] = document.get_property(key)
if 'metadata' in metadata:
parsed_meta = self.__parse_metadata_field(metadata['metadata'])
return {**metadata, **parsed_meta}
parsed_meta = self.__parse_metadata_field(metadata['metadata'])
for key, value in parsed_meta.items():
metadata[key] = value
return metadata
import logging
from typing import Union, Dict, List, Tuple
from . import abstract
class TorrentParser(abstract.AbstractParser):
mimetypes = {'application/x-bittorrent', }
whitelist = {b'announce', b'announce-list', b'info'}
allowlist = {b'announce', b'announce-list', b'info'}
def get_meta(self) -> dict:
metadata = {}
def __init__(self, filename):
super().__init__(filename)
with open(self.filename, 'rb') as f:
d = _BencodeHandler().bdecode(f.read())
if d is None:
return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
for k,v in d.items():
if k not in self.whitelist:
metadata[k.decode('utf-8')] = v
return metadata
self.dict_repr = _BencodeHandler().bdecode(f.read())
if self.dict_repr is None:
raise ValueError
def get_meta(self) -> Dict[str, Union[str, Dict]]:
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.allowlist:
metadata[key.decode('utf-8')] = value
return metadata
def remove_all(self) -> bool:
cleaned = dict()
with open(self.filename, 'rb') as f:
d = _BencodeHandler().bdecode(f.read())
if d is None:
return False
for k,v in d.items():
if k in self.whitelist:
cleaned[k] = v
for key, value in self.dict_repr.items():
if key in self.allowlist:
cleaned[key] = value
with open(self.output_filename, 'wb') as f:
f.write(_BencodeHandler().bencode(cleaned))
self.dict_repr = cleaned # since we're stateful
return True
class _BencodeHandler(object):
class _BencodeHandler:
"""
Since bencode isn't that hard to parse,
MAT2 comes with its own parser, based on the spec
mat2 comes with its own parser, based on the spec
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
"""
def __init__(self):
self.__decode_func = {
ord('d'): self.__decode_dict,
ord('i'): self.__decode_int,
ord('l'): self.__decode_list,
}
ord('d'): self.__decode_dict,
ord('i'): self.__decode_int,
ord('l'): self.__decode_list,
}
for i in range(0, 10):
self.__decode_func[ord(str(i))] = self.__decode_string
self.__encode_func = {
bytes: self.__encode_string,
dict: self.__encode_dict,
int: self.__encode_int,
list: self.__encode_list,
bytes: self.__encode_string,
dict: self.__encode_dict,
int: self.__encode_int,
list: self.__encode_list,
}
def __decode_int(self, s:str) -> (int, str):
@staticmethod
def __decode_int(s: bytes) -> Tuple[int, bytes]:
s = s[1:]
next_idx = s.index(b'e')
if s.startswith(b'-0'):
......@@ -62,63 +65,64 @@ class _BencodeHandler(object):
raise ValueError # no leading zero except for zero itself
return int(s[:next_idx]), s[next_idx+1:]
def __decode_string(self, s:str) -> (str, str):
sep = s.index(b':')
str_len = int(s[:sep])
if str_len < 0:
raise ValueError
elif s[0] == b'0' and sep != 1:
@staticmethod
def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
colon = s.index(b':')
# FIXME Python3 is broken here, the call to `ord` shouldn't be needed,
# but apparently it is. This is utterly idiotic.
if (s[0] == ord('0') or s[0] == '0') and colon != 1:
raise ValueError
str_len = int(s[:colon])
s = s[1:]
return s[sep:sep+str_len], s[sep+str_len:]
return s[colon:colon+str_len], s[colon+str_len:]
def __decode_list(self, s:str) -> (list, str):
r = list()
def __decode_list(self, s: bytes) -> Tuple[List, bytes]:
ret = list()
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
v, s = self.__decode_func[s[0]](s)
r.append(v)
return r, s[1:]
value, s = self.__decode_func[s[0]](s)
ret.append(value)
return ret, s[1:]
def __decode_dict(self, s:str) -> (dict, str):
r = dict()
def __decode_dict(self, s: bytes) -> Tuple[Dict, bytes]:
ret = dict()
s = s[1:] # skip leading `d`
while s[0] != ord(b'e'):
k, s = self.__decode_string(s)
r[k], s = self.__decode_func[s[0]](s)
return r, s[1:]
key, s = self.__decode_string(s)
ret[key], s = self.__decode_func[s[0]](s)
return ret, s[1:]
@staticmethod
def __encode_int(x:str) -> bytes:
def __encode_int(x: bytes) -> bytes:
return b'i' + bytes(str(x), 'utf-8') + b'e'
@staticmethod
def __encode_string(x:str) -> bytes:
def __encode_string(x: bytes) -> bytes:
return bytes((str(len(x))), 'utf-8') + b':' + x
def __encode_list(self, x:str) -> bytes:
def __encode_list(self, x: str) -> bytes:
ret = b''
for i in x:
ret += self.__encode_func[type(i)](i)
return b'l' + ret + b'e'
def __encode_dict(self, x:str) -> bytes:
def __encode_dict(self, x: dict) -> bytes:
ret = b''
for k, v in sorted(x.items()):
ret += self.__encode_func[type(k)](k)
ret += self.__encode_func[type(v)](v)
for key, value in sorted(x.items()):
ret += self.__encode_func[type(key)](key)
ret += self.__encode_func[type(value)](value)
return b'd' + ret + b'e'
def bencode(self, s:str) -> bytes:
def bencode(self, s: Union[Dict, List, bytes, int]) -> bytes:
return self.__encode_func[type(s)](s)
def bdecode(self, s:str):
def bdecode(self, s: bytes) -> Union[Dict, None]:
try:
r, l = self.__decode_func[s[0]](s)
ret, trail = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e:
print("not a valid bencoded string: %s" % e)
logging.warning("Not a valid bencoded string: %s", e)
return None
if l != b'':
print("invalid bencoded value (data after valid prefix)")
if trail != b'':
logging.warning("Invalid bencoded value (data after valid prefix)")
return None
return r
return ret
import subprocess
import functools
import shutil
import logging
from typing import Union, Dict
from . import exiftool
from . import bubblewrap
class AbstractFFmpegParser(exiftool.ExiftoolParser):
""" Abstract parser for all FFmpeg-based ones, mainly for video. """
# Some fileformats have mandatory metadata fields
meta_key_value_allowlist: Dict[str, Union[str, int]] = dict()
def remove_all(self) -> bool:
if self.meta_key_value_allowlist:
logging.warning('The format of "%s" (%s) has some mandatory '
'metadata fields; mat2 filled them with standard '
'data.', self.filename, ', '.join(self.mimetypes))
cmd = [_get_ffmpeg_path(),
'-i', self.filename, # input file
'-y', # overwrite existing output file
'-map', '0', # copy everything all streams from input to output
'-codec', 'copy', # don't decode anything, just copy (speed!)
'-loglevel', 'panic', # Don't show log
'-hide_banner', # hide the banner
'-map_metadata', '-1', # remove supperficial metadata
'-map_chapters', '-1', # remove chapters
'-disposition', '0', # Remove dispositions (check ffmpeg's manpage)
'-fflags', '+bitexact', # don't add any metadata
'-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata
self.output_filename]
try:
if self.sandbox:
bubblewrap.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
ret: Dict[str, Union[str, Dict]] = dict()
for key, value in meta.items():
if key in self.meta_key_value_allowlist:
if value == self.meta_key_value_allowlist[key]:
continue
ret[key] = value
return ret
class WMVParser(AbstractFFmpegParser):
mimetypes = {'video/x-ms-wmv', }
meta_allowlist = {'AudioChannels', 'AudioCodecID', 'AudioCodecName',
'ErrorCorrectionType', 'AudioSampleRate', 'DataPackets',
'Directory', 'Duration', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileLength',
'FileModifyDate', 'FileName', 'FilePermissions',
'FileSize', 'FileType', 'FileTypeExtension',
'FrameCount', 'FrameRate', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'MaxBitrate', 'MaxPacketSize',
'Megapixels', 'MinPacketSize', 'Preroll', 'SendDuration',
'SourceFile', 'StreamNumber', 'VideoCodecName', }
meta_key_value_allowlist = { # some metadata are mandatory :/
'AudioCodecDescription': '',
'CreationDate': '0000:00:00 00:00:00Z',
'FileID': '00000000-0000-0000-0000-000000000000',
'Flags': 2, # FIXME: What is this? Why 2?
'ModifyDate': '0000:00:00 00:00:00',
'TimeOffset': '0 s',
'VideoCodecDescription': '',
'StreamType': 'Audio',
}
class AVIParser(AbstractFFmpegParser):
mimetypes = {'video/x-msvideo', }
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'FrameRate', 'MaxDataRate',
'FrameCount', 'StreamCount', 'StreamType', 'VideoCodec',
'VideoFrameRate', 'VideoFrameCount', 'Quality',
'SampleSize', 'BMPVersion', 'ImageWidth', 'ImageHeight',
'Planes', 'BitDepth', 'Compression', 'ImageLength',
'PixelsPerMeterX', 'PixelsPerMeterY',
'NumImportantColors', 'NumColors',
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
'ColorSpace', 'AudioCodec', 'AudioCodecRate',
'AudioSampleCount',
'AudioSampleRate', 'Encoding', 'NumChannels',
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'}
class MP4Parser(AbstractFFmpegParser):
mimetypes = {'video/mp4', }
meta_allowlist = {'AudioFormat', 'AvgBitrate', 'Balance', 'TrackDuration',
'XResolution', 'YResolution', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileModifyDate',
'FileName', 'FilePermissions', 'MIMEType', 'FileType',
'FileTypeExtension', 'Directory', 'ImageWidth',
'ImageSize', 'ImageHeight', 'FileSize', 'SourceFile',
'BitDepth', 'Duration', 'AudioChannels',
'AudioBitsPerSample', 'AudioSampleRate', 'Megapixels',
'MovieDataSize', 'VideoFrameRate', 'MediaTimeScale',
'SourceImageHeight', 'SourceImageWidth',
'MatrixStructure', 'MediaDuration'}
meta_key_value_allowlist = { # some metadata are mandatory :/
'CreateDate': '0000:00:00 00:00:00',
'CurrentTime': '0 s',
'MediaCreateDate': '0000:00:00 00:00:00',
'MediaLanguageCode': 'und',
'MediaModifyDate': '0000:00:00 00:00:00',
'ModifyDate': '0000:00:00 00:00:00',
'OpColor': '0 0 0',
'PosterTime': '0 s',
'PreferredRate': '1',
'PreferredVolume': '100.00%',
'PreviewDuration': '0 s',
'PreviewTime': '0 s',
'SelectionDuration': '0 s',
'SelectionTime': '0 s',
'TrackCreateDate': '0000:00:00 00:00:00',
'TrackModifyDate': '0000:00:00 00:00:00',
'TrackVolume': '0.00%',
}
@functools.lru_cache(maxsize=None)
def _get_ffmpeg_path() -> str: # pragma: no cover
which_path = shutil.which('ffmpeg')
if which_path:
return which_path
raise RuntimeError("Unable to find ffmpeg")
from html import parser, escape
from typing import Any, Optional, Dict, List, Tuple, Set
import re
import string
from . import abstract
# pylint: disable=too-many-instance-attributes
class CSSParser(abstract.AbstractParser):
"""There is no such things as metadata in CSS files,
only comments of the form `/* … */`, so we're removing the laters."""
mimetypes = {'text/css', }
flags = re.MULTILINE | re.DOTALL
def remove_all(self) -> bool:
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cleaned = re.sub(r'/\*.*?\*/', '', content, count=0, flags=self.flags)
with open(self.output_filename, 'w', encoding='utf-8') as f:
f.write(cleaned)
return True
def get_meta(self) -> Dict[str, Any]:
metadata = {}
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cssdoc = re.findall(r'/\*(.*?)\*/', content, self.flags)
for match in cssdoc:
for line in match.splitlines():
try:
k, v = line.split(':')
metadata[k.strip(string.whitespace + '*')] = v.strip()
except ValueError:
metadata['harmful data'] = line.strip()
return metadata
class AbstractHTMLParser(abstract.AbstractParser):
tags_blocklist: Set[str] = set()
# In some html/xml-based formats some tags are mandatory,
# so we're keeping them, but are discarding their content
tags_required_blocklist: Set[str] = set()
def __init__(self, filename):
super().__init__(filename)
self.__parser = _HTMLParser(self.filename, self.tags_blocklist,
self.tags_required_blocklist)
with open(filename, encoding='utf-8') as f:
self.__parser.feed(f.read())
self.__parser.close()
def get_meta(self) -> Dict[str, Any]:
return self.__parser.get_meta()
def remove_all(self) -> bool:
return self.__parser.remove_all(self.output_filename)
class HTMLParser(AbstractHTMLParser):
mimetypes = {'text/html', 'application/xhtml+xml'}
tags_blocklist = {'meta', }
tags_required_blocklist = {'title', }
class DTBNCXParser(AbstractHTMLParser):
mimetypes = {'application/x-dtbncx+xml', }
tags_required_blocklist = {'title', 'doctitle', 'meta'}
class _HTMLParser(parser.HTMLParser):
"""Python doesn't have a validating html parser in its stdlib, so
we're using an internal queue to track all the opening/closing tags,
and hoping for the best.
Moreover, the parser.HTMLParser call doesn't provide a get_endtag_text
method, so we have to use get_starttag_text instead, put its result in a
LIFO, and transform it in a closing tag when needed.
Also, gotcha: the `tag` parameters are always in lowercase.
"""
def __init__(self, filename, blocklisted_tags, required_blocklisted_tags):
super().__init__()
self.filename = filename
self.__textrepr = ''
self.__meta = {}
self.__validation_queue: List[str] = list()
# We're using counters instead of booleans, to handle nested tags
self.__in_dangerous_but_required_tag = 0
self.__in_dangerous_tag = 0
if required_blocklisted_tags & blocklisted_tags: # pragma: nocover
raise ValueError("There is an overlap between %s and %s" % (
required_blocklisted_tags, blocklisted_tags))
self.tag_required_blocklist = required_blocklisted_tags
self.tag_blocklist = blocklisted_tags
def error(self, message): # pragma: no cover
""" Amusingly, Python's documentation doesn't mention that this
function needs to be implemented in subclasses of the parent class
of parser.HTMLParser. This was found by fuzzing,
triggering the following exception:
NotImplementedError: subclasses of ParserBase must override error()
"""
raise ValueError(message)
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]):
# Ignore the type, because mypy is too stupid to infer
# that get_starttag_text() can't return None.
original_tag = self.get_starttag_text() # type: ignore
self.__validation_queue.append(original_tag) # type: ignore
if tag in self.tag_blocklist:
self.__in_dangerous_tag += 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += original_tag
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag += 1
def handle_endtag(self, tag: str):
if not self.__validation_queue:
raise ValueError("The closing tag %s doesn't have a corresponding "
"opening one in %s." % (tag, self.filename))
previous_tag = self.__validation_queue.pop()
previous_tag = previous_tag[1:-1] # remove < and >
previous_tag = previous_tag.split(' ')[0] # remove attributes
if tag != previous_tag.lower():
raise ValueError("The closing tag %s doesn't match the previous "
"tag %s in %s" %
(tag, previous_tag, self.filename))
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag -= 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
# There is no `get_endtag_text()` method :/
self.__textrepr += '</' + previous_tag + '>'
if tag in self.tag_blocklist:
self.__in_dangerous_tag -= 1
def handle_data(self, data: str):
if self.__in_dangerous_but_required_tag == 0:
if self.__in_dangerous_tag == 0:
if data.strip():
self.__textrepr += escape(data)
def handle_startendtag(self, tag: str,
attrs: List[Tuple[str, Optional[str]]]):
if tag in self.tag_required_blocklist | self.tag_blocklist:
meta = {k:v for k, v in attrs}
name = meta.get('name', 'harmful metadata')
content = meta.get('content', 'harmful data')
self.__meta[name] = content
if self.__in_dangerous_tag == 0:
if tag in self.tag_required_blocklist:
self.__textrepr += '<' + tag + ' />'
return
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += self.get_starttag_text()
def remove_all(self, output_filename: str) -> bool:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
with open(output_filename, 'w', encoding='utf-8') as f:
f.write(self.__textrepr)
return True
def get_meta(self) -> Dict[str, Any]:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
return self.__meta
#!/usr/bin/python3
import os
from typing import Tuple
import sys
import itertools
import mimetypes
import argparse
import multiprocessing
from src import parser_factory
def __check_file(filename:str, mode:int = os.R_OK) -> bool:
if not os.path.isfile(filename):
print("[-] %s is not a regular file." % filename)
return False
elif not os.access(filename, mode):
print("[-] %s is not readable and writeable." % filename)
return False
return True
def create_arg_parser():
parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
parser.add_argument('files', nargs='*')
info = parser.add_argument_group('Information')
info.add_argument('-c', '--check', action='store_true',
help='check if a file is free of harmful metadatas')
info.add_argument('-l', '--list', action='store_true',
help='list all supported fileformats')
info.add_argument('-s', '--show', action='store_true',
help='list all the harmful metadata of a file without removing them')
info.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
return parser
def show_meta(filename:str):
if not __check_file(filename):
return
p, mtype = parser_factory.get_parser(filename)
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
return
print("[+] Metadata for %s:" % filename)
for k,v in p.get_meta().items():
try: # FIXME this is ugly.
print(" %s: %s" % (k, v))
except UnicodeEncodeError:
print(" %s: harmful content" % k)
def clean_meta(params:Tuple[str, bool]) -> bool:
filename, is_lightweigth = params
if not __check_file(filename, os.R_OK|os.W_OK):
return
p, mtype = parser_factory.get_parser(filename)
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
if is_lightweigth:
return p.remove_all_lightweight()
return p.remove_all()
def show_parsers():
print('[+] Supported formats:')
for parser in parser_factory._get_parsers():
for mtype in parser.mimetypes:
extensions = ', '.join(mimetypes.guess_all_extensions(mtype))
print(' - %s (%s)' % (mtype, extensions))
def __get_files_recursively(files):
for f in files:
if os.path.isfile(f):
yield f
else:
for path, _, _files in os.walk(f):
for _f in _files:
yield os.path.join(path, _f)
def main():
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
if not args.files:
if not args.list:
return arg_parser.print_help()
show_parsers()
return
elif args.show:
for f in __get_files_recursively(args.files):
show_meta(f)
return
else:
p = multiprocessing.Pool()
mode = (args.lightweight is True)
l = zip(__get_files_recursively(args.files), itertools.repeat(mode))
ret = list(p.imap_unordered(clean_meta, list(l)))
return 0 if all(ret) else -1
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python3
import os
import shutil
from typing import List, Set, Dict
import sys
import mimetypes
import argparse
import logging
import unicodedata
import concurrent.futures
try:
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
from libmat2 import check_dependencies, UnknownMemberPolicy
except ValueError as ex:
print(ex)
sys.exit(1)
__version__ = '0.13.5'
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
def __print_without_chars(s: str):
""" Remove control characters
We might use 'Cc' instead of 'C', but better safe than sorry
https://www.unicode.org/reports/tr44/#GC_Values_Table
"""
print(''.join(ch for ch in s if not unicodedata.category(ch).startswith('C')))
def __check_file(filename: str, mode: int = os.R_OK) -> bool:
if not os.path.exists(filename):
__print_without_chars("[-] %s doesn't exist." % filename)
return False
elif not os.path.isfile(filename):
__print_without_chars("[-] %s is not a regular file." % filename)
return False
elif not os.access(filename, mode):
mode_str: List[str] = list()
if mode & os.R_OK:
mode_str += 'readable'
if mode & os.W_OK:
mode_str += 'writeable'
__print_without_chars("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
return False
return True
def create_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
parser.add_argument('-V', '--verbose', action='store_true',
help='show more verbose status information')
parser.add_argument('--unknown-members', metavar='policy', default='abort',
help='how to handle unknown members of archive-style '
'files (policy should be one of: %s) [Default: abort]' %
', '.join(p.value for p in UnknownMemberPolicy))
parser.add_argument('--inplace', action='store_true',
help='clean in place, without backup')
parser.add_argument('--no-sandbox', dest='sandbox', action='store_false',
default=True, help='Disable bubblewrap\'s sandboxing')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('files', nargs='*', help='the files to process',
default=[])
excl_group.add_argument('-v', '--version', action='version',
version='mat2 %s' % __version__)
excl_group.add_argument('-l', '--list', action='store_true', default=False,
help='list all supported fileformats')
excl_group.add_argument('--check-dependencies', action='store_true',
default=False,
help='check if mat2 has all the dependencies it '
'needs')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
excl_group.add_argument('-s', '--show', action='store_true',
help='list harmful metadata detectable by mat2 '
'without removing them')
return parser
def show_meta(filename: str, sandbox: bool):
if not __check_file(filename):
return
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when processing %s: %s" % (filename, e))
return
if p is None:
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return
p.sandbox = sandbox
__print_meta(filename, p.get_meta())
def __print_meta(filename: str, metadata: Dict, depth: int = 1):
padding = " " * depth*2
if not metadata:
__print_without_chars(padding + "No metadata found in %s." % filename)
return
__print_without_chars("[%s] Metadata for %s:" % ('+'*depth, filename))
for (k, v) in sorted(metadata.items()):
if isinstance(v, dict):
__print_meta(k, v, depth+1)
continue
try: # FIXME this is ugly.
__print_without_chars(padding + " %s: %s" % (k, v))
except UnicodeEncodeError:
__print_without_chars(padding + " %s: harmful content" % k)
except TypeError:
pass # for things that aren't iterable
def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
policy: UnknownMemberPolicy) -> bool:
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
if not __check_file(filename, mode):
return False
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when cleaning %s: %s" % (filename, e))
return False
if p is None:
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
p.unknown_member_policy = policy
p.lightweight_cleaning = is_lightweight
p.sandbox = sandbox
try:
logging.debug('Cleaning %s…', filename)
ret = p.remove_all()
if ret is True:
shutil.copymode(filename, p.output_filename)
if inplace is True:
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
__print_without_chars("[-] %s can't be cleaned: %s" % (filename, e))
return False
def show_parsers():
print('[+] Supported formats:')
formats = set() # Set[str]
for parser in parser_factory._get_parsers(): # type: ignore
for mtype in parser.mimetypes:
extensions = set() # Set[str]
for extension in mimetypes.guess_all_extensions(mtype):
if extension not in UNSUPPORTED_EXTENSIONS:
extensions.add(extension)
if not extensions:
# we're not supporting a single extension in the current
# mimetype, so there is not point in showing the mimetype at all
continue
formats.add(' - %s (%s)' % (mtype, ', '.join(extensions)))
print('\n'.join(sorted(formats)))
def __get_files_recursively(files: List[str]) -> List[str]:
ret: Set[str] = set()
for f in files:
if os.path.isdir(f):
for path, _, _files in os.walk(f):
for _f in _files:
fname = os.path.join(path, _f)
if __check_file(fname):
ret.add(fname)
elif __check_file(f):
ret.add(f)
return list(ret)
def main() -> int:
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger(__name__).setLevel(logging.DEBUG)
if not args.files:
if args.list:
show_parsers()
return 0
elif args.check_dependencies:
__print_without_chars("Dependencies for mat2 %s:" % __version__)
for key, value in sorted(check_dependencies().items()):
__print_without_chars('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
'(optional)' if not value['required'] else ''))
else:
arg_parser.print_help()
return 0
elif args.show:
for f in __get_files_recursively(args.files):
show_meta(f, args.sandbox)
return 0
else:
inplace = args.inplace
policy = UnknownMemberPolicy(args.unknown_members)
if policy == UnknownMemberPolicy.KEEP:
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
no_failure = True
files = __get_files_recursively(args.files)
# We have to use Processes instead of Threads, since
# we're using tempfile.mkdtemp, which isn't thread-safe.
futures = list()
with concurrent.futures.ProcessPoolExecutor() as executor:
for f in files:
future = executor.submit(clean_meta, f, args.lightweight,
inplace, args.sandbox, policy)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
return 0 if no_failure is True else -1
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python3
import gi
gi.require_version('Nautilus', '3.0')
from gi.repository import Nautilus, GObject
class ColumnExtension(GObject.GObject, Nautilus.MenuProvider):
def menu_activate_cb(self, menu, file):
print "menu_activate_cb", file
# TODO: clean metadata here
def get_background_items(self, window, file):
""" https://bugzilla.gnome.org/show_bug.cgi?id=784278 """
return None
def get_file_items(self, window, files):
if len(files) != 1: # we're not supporting multiple files for now
return
file = files[0]
item = Nautilus.MenuItem(
name="MAT2::Remove_metadata",
label="Remove metadata from %s" % file.get_name(),
tip="Remove metadata from %s" % file.get_name()
)
item.connect('activate', self.menu_activate_cb, file)
return [item]
[project]
name = "mat2"
version = "0.13.5"
description = "mat2 is a metadata removal tool, supporting a wide range of commonly used file formats, written in python3: at its core, it's a library, used by an eponymous command-line interface, as well as several file manager extensions."
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.9"
dependencies = [
'mutagen',
'PyGObject',
'pycairo',
]
[project.urls]
Repository = "https://0xacab.org/jvoisin/mat2"
Issues = "https://0xacab.org/jvoisin/mat2/-/issues"
Changelog = "https://0xacab.org/jvoisin/mat2/-/blob/master/CHANGELOG.md"
[tool.ruff]
target-version = "py39"
# E501 Line too long
ignore = ["E501", "F401", "E402", "E722"]
import setuptools
with open("README.md", encoding='utf-8') as fh:
long_description = fh.read()
setuptools.setup(
name="mat2",
version='0.13.5',
author="Julien (jvoisin) Voisin",
author_email="julien.voisin+mat2@dustri.org",
description="A handy tool to trash your metadata",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://0xacab.org/jvoisin/mat2",
python_requires = '>=3.5.0',
scripts=['mat2'],
install_requires=[
'mutagen',
'PyGObject',
'pycairo',
],
packages=setuptools.find_packages(exclude=('tests', )),
data_files = [('share/man/man1', ['doc/mat2.1'])],
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
"License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Security",
"Intended Audience :: End Users/Desktop",
],
project_urls={
'bugtacker': 'https://0xacab.org/jvoisin/mat2/issues',
},
)
#!/bin/env python3
\ No newline at end of file
import abc
class AbstractParser(abc.ABC):
meta_list = set()
mimetypes = set()
def __init__(self, filename: str):
self.filename = filename
self.output_filename = filename + '.cleaned'
@abc.abstractmethod
def get_meta(self) -> dict:
pass
@abc.abstractmethod
def remove_all(self) -> bool:
pass
def remove_all_lightweight(self) -> bool:
""" Remove _SOME_ metadata. """
return self.remove_all()
import shutil
import mutagen
from . import abstract
class MutagenParser(abstract.AbstractParser):
def get_meta(self):
f = mutagen.File(self.filename)
if f.tags:
return {k:', '.join(v) for k,v in f.tags.items()}
return {}
def remove_all(self):
shutil.copy(self.filename, self.output_filename)
f = mutagen.File(self.output_filename)
f.delete()
f.save()
return True
class MP3Parser(MutagenParser):
mimetypes = {'audio/mpeg', }
def get_meta(self):
metadata = {}
meta = mutagen.File(self.filename).tags
for key in meta:
metadata[key.rstrip(' \t\r\n\0')] = ', '.join(map(str, meta[key].text))
return metadata
class OGGParser(MutagenParser):
mimetypes = {'audio/ogg', }
class FLACParser(MutagenParser):
mimetypes = {'audio/flac', }
from . import abstract
class HarmlessParser(abstract.AbstractParser):
""" This is the parser for filetypes that do not contain metadata. """
mimetypes = {'application/xml', 'text/plain', 'application/rdf+xml'}
def __init__(self, filename: str):
self.filename = filename
self.output_filename = filename
def get_meta(self):
return dict()
def remove_all(self):
return True