Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tguinot/mat2
  • jvoisin/mat2
  • dachary/mat2
  • mejo-/mat2
  • LogicalDash/mat2
  • dkg/mat2
  • christian/mat2
  • Selflike323/mat2
  • fz/mat2
  • iwwmidatlanticgdc/mat2
  • Gu1nn3zz/mat2
  • smagnin/mat2
  • flashcode/mat2
  • MANCASTILLEJA/mat2
  • jboursier/mat2
  • tails/mat2
  • matiargs/mat2
  • Brolf/mat2
  • madaidan/mat2
  • Delmer84/mat2
  • yuebyzua/mat2
  • yyyyyyyan/mat2
  • rmnvgr/mat2
  • Marxism-Leninism/mat2
  • GNUtoo/mat2
  • allexj/mat2
  • b068931cc450442b63f5b3d276ea4297/mat2
  • chenrui/mat2
  • nosec13346/mat2
  • anelki/mat2
30 results
Show changes
Showing
with 1573 additions and 595 deletions
"""
Wrapper around a subset of the subprocess module,
that uses bwrap (bubblewrap) when it is available.
Instead of importing subprocess, other modules should use this as follows:
from . import subprocess
"""
import os
import shutil
import subprocess
import tempfile
import functools
from typing import Optional, List
__all__ = ['PIPE', 'run', 'CalledProcessError']
PIPE = subprocess.PIPE
CalledProcessError = subprocess.CalledProcessError
# pylint: disable=subprocess-run-check
@functools.lru_cache(maxsize=None)
def _get_bwrap_path() -> str:
which_path = shutil.which('bwrap')
if which_path:
return which_path
raise RuntimeError("Unable to find bwrap") # pragma: no cover
def _get_bwrap_args(tempdir: str,
input_filename: str,
output_filename: Optional[str] = None) -> List[str]:
ro_bind_args = []
cwd = os.getcwd()
# XXX: use --ro-bind-try once all supported platforms
# have a bubblewrap recent enough to support it.
ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', '/etc/alternatives', cwd]
for bind_dir in ro_bind_dirs:
if os.path.isdir(bind_dir): # pragma: no cover
ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir])
ro_bind_files = ['/etc/ld.so.cache']
for bind_file in ro_bind_files:
if os.path.isfile(bind_file): # pragma: no cover
ro_bind_args.extend(['--ro-bind', bind_file, bind_file])
args = ro_bind_args + \
['--dev', '/dev',
'--proc', '/proc',
'--chdir', cwd,
'--unshare-user-try',
'--unshare-ipc',
'--unshare-pid',
'--unshare-net',
'--unshare-uts',
'--unshare-cgroup-try',
'--new-session',
'--cap-drop', 'all',
# XXX: enable --die-with-parent once all supported platforms have
# a bubblewrap recent enough to support it.
# '--die-with-parent',
]
if output_filename:
# Mount an empty temporary directory where the sandboxed
# process will create its output file
output_dirname = os.path.dirname(os.path.abspath(output_filename))
args.extend(['--bind', tempdir, output_dirname])
absolute_input_filename = os.path.abspath(input_filename)
args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename])
return args
def run(args: List[str],
input_filename: str,
output_filename: Optional[str] = None,
**kwargs) -> subprocess.CompletedProcess:
"""Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it
is available.
Extra supported keyword arguments:
- `input_filename`, made available read-only in the sandbox
- `output_filename`, where the file created by the sandboxed process
is copied upon successful completion; an empty temporary directory
is made visible as the parent directory of this file in the sandbox.
Optional: one valid use case is to invoke an external process
to inspect metadata present in a file.
"""
try:
bwrap_path = _get_bwrap_path()
except RuntimeError: # pragma: no cover
# bubblewrap is not installed ⇒ short-circuit
return subprocess.run(args, **kwargs)
with tempfile.TemporaryDirectory() as tempdir:
prefix_args = [bwrap_path] + \
_get_bwrap_args(input_filename=input_filename,
output_filename=output_filename,
tempdir=tempdir)
completed_process = subprocess.run(prefix_args + args, **kwargs)
if output_filename and completed_process.returncode == 0:
shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)),
output_filename)
return completed_process
import logging
import re
import uuid
import zipfile
import xml.etree.ElementTree as ET # type: ignore
from typing import Any, Dict
from . import archive, office
class EPUBParser(archive.ZipParser):
mimetypes = {'application/epub+zip', }
metadata_namespace = '{http://purl.org/dc/elements/1.1/}'
def __init__(self, filename):
super().__init__(filename)
self.files_to_keep = set(map(re.compile, { # type: ignore
'META-INF/container.xml',
'mimetype',
'OEBPS/content.opf',
'content.opf',
'hmh.opf',
'OPS/.+.xml'
}))
self.files_to_omit = set(map(re.compile, { # type: ignore
'iTunesMetadata.plist',
'META-INF/calibre_bookmarks.txt',
'OEBPS/package.opf',
}))
self.uniqid = uuid.uuid4()
def is_archive_valid(self):
super().is_archive_valid()
with zipfile.ZipFile(self.filename) as zin:
for item in self._get_all_members(zin):
member_name = self._get_member_name(item)
if member_name.endswith('META-INF/encryption.xml'):
raise ValueError('the file contains encrypted fonts')
def _specific_get_meta(self, full_path, file_path) -> Dict[str, Any]:
if not file_path.endswith('.opf'):
return {}
with open(full_path, encoding='utf-8') as f:
try:
results = re.findall(r"<((?:meta|dc|cp).+?)[^>]*>(.+)</\1>",
f.read(), re.I|re.M)
return {k:v for (k, v) in results}
except (TypeError, UnicodeDecodeError):
return {file_path: 'harmful content', }
def _specific_cleanup(self, full_path: str) -> bool:
if full_path.endswith('hmh.opf') or full_path.endswith('content.opf'):
return self.__handle_contentopf(full_path)
elif full_path.endswith('OEBPS/toc.ncx'):
return self.__handle_tocncx(full_path)
elif re.search('/OPS/[^/]+.xml$', full_path):
return self.__handle_ops_xml(full_path)
return True
def __handle_ops_xml(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError: # pragma: nocover
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('head'):
item.clear()
break
tree.write(full_path, xml_declaration=True, encoding='utf-8',
short_empty_elements=False)
return True
def __handle_tocncx(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError: # pragma: nocover
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('head'):
item.clear()
ET.SubElement(item, 'meta', attrib={'name': '', 'content': ''})
break
tree.write(full_path, xml_declaration=True, encoding='utf-8',
short_empty_elements=False)
return True
def __handle_contentopf(self, full_path: str) -> bool:
try:
tree, namespace = office._parse_xml(full_path)
except ET.ParseError:
logging.error("Unable to parse %s in %s.", full_path, self.filename)
return False
for item in tree.iterfind('.//', namespace): # pragma: nocover
if item.tag.strip().lower().endswith('metadata'):
item.clear()
# item with mandatory content
uniqid = ET.Element(self.metadata_namespace + 'identifier')
uniqid.text = str(self.uniqid)
uniqid.set('id', 'id')
item.append(uniqid)
# items without mandatory content
for name in ['language', 'title']:
uniqid = ET.Element(self.metadata_namespace + name)
item.append(uniqid)
break # there is only a single <metadata> block
tree.write(full_path, xml_declaration=True, encoding='utf-8')
return True
import functools
import json
import logging
import os
import shutil
import subprocess
from typing import Union, Set, Dict
from . import abstract
from . import bubblewrap
class ExiftoolParser(abstract.AbstractParser):
""" Exiftool is often the easiest way to get all the metadata
from a import file, hence why several parsers are re-using its `get_meta`
method.
"""
meta_allowlist: Set[str] = set()
def get_meta(self) -> Dict[str, Union[str, Dict]]:
try:
if self.sandbox:
out = bubblewrap.run([_get_exiftool_path(), '-json',
self.filename],
input_filename=self.filename,
check=True, stdout=subprocess.PIPE).stdout
else:
out = subprocess.run([_get_exiftool_path(), '-json',
self.filename],
check=True, stdout=subprocess.PIPE).stdout
except subprocess.CalledProcessError: # pragma: no cover
raise ValueError
meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_allowlist:
meta.pop(key, None)
return meta
def _lightweight_cleanup(self) -> bool:
if os.path.exists(self.output_filename):
try: # exiftool can't force output to existing files
os.remove(self.output_filename)
except OSError as e: # pragma: no cover
logging.error("The output file %s is already existing and \
can't be overwritten: %s.", self.filename, e)
return False
# Note: '-All=' must be followed by a known exiftool option.
# Also, '-CommonIFD0' is needed for .tiff files
cmd = [_get_exiftool_path(),
'-all=', # remove metadata
'-adobe=', # remove adobe-specific metadata
'-exif:all=', # remove all exif metadata
'-Time:All=', # remove all timestamps
'-quiet', # don't show useless logs
'-CommonIFD0=', # remove IFD0 metadata
'-o', self.output_filename,
self.filename]
try:
if self.sandbox:
bubblewrap.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e: # pragma: no cover
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
return True
@functools.lru_cache(maxsize=None)
def _get_exiftool_path() -> str: # pragma: no cover
which_path = shutil.which('exiftool')
if which_path:
return which_path
# Exiftool on Arch Linux has a weird path
if os.access('/usr/bin/vendor_perl/exiftool', os.X_OK):
return '/usr/bin/vendor_perl/exiftool'
raise RuntimeError("Unable to find exiftool")
import shutil
from typing import Dict
from typing import Union, Dict
from . import abstract
class HarmlessParser(abstract.AbstractParser):
""" This is the parser for filetypes that can not contain metadata. """
mimetypes = {'text/plain', 'image/x-ms-bmp'}
mimetypes = {'text/plain', 'image/x-ms-bmp', 'image/bmp'}
def get_meta(self) -> Dict[str, str]:
def get_meta(self) -> Dict[str, Union[str, Dict]]:
return dict()
def remove_all(self) -> bool:
......
import subprocess
import imghdr
import json
import os
import shutil
import tempfile
import re
from typing import Set
from typing import Union, Any, Dict
import cairo
import gi
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import GdkPixbuf
from . import abstract
# Make pyflakes happy
assert Set
class _ImageParser(abstract.AbstractParser):
""" Since we use `exiftool` to get metadata from
all images fileformat, `get_meta` is implemented in this class,
and all the image-handling ones are inheriting from it."""
meta_whitelist = set() # type: Set[str]
@staticmethod
def __handle_problematic_filename(filename: str, callback) -> str:
""" This method takes a filename with a problematic name,
and safely applies it a `callback`."""
tmpdirname = tempfile.mkdtemp()
fname = os.path.join(tmpdirname, "temp_file")
shutil.copy(filename, fname)
out = callback(fname)
shutil.rmtree(tmpdirname)
return out
def get_meta(self):
""" There is no way to escape the leading(s) dash(es) of the current
self.filename to prevent parameter injections, so we need to take care
of this.
"""
fun = lambda f: subprocess.check_output(['/usr/bin/exiftool', '-json', f])
if re.search('^[a-z0-9/]', self.filename) is None:
out = self.__handle_problematic_filename(self.filename, fun)
else:
out = fun(self.filename)
meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_whitelist:
meta.pop(key, None)
gi.require_version('Rsvg', '2.0')
from gi.repository import GdkPixbuf, GLib, Rsvg
from . import exiftool, abstract
class SVGParser(exiftool.ExiftoolParser):
mimetypes = {'image/svg+xml', }
meta_allowlist = {'Directory', 'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageWidth',
'MIMEType', 'SVGVersion', 'SourceFile', 'ViewBox'
}
def remove_all(self) -> bool:
try:
svg = Rsvg.Handle.new_from_file(self.filename)
except GLib.GError:
raise ValueError
try:
_, _, _, _, has_viewbox, viewbox = svg.get_intrinsic_dimensions()
if has_viewbox is False:
raise ValueError
_, width, height = svg.get_intrinsic_size_in_pixels()
except AttributeError:
dimensions = svg.get_dimensions()
height, width = dimensions.height, dimensions.width
surface = cairo.SVGSurface(self.output_filename, height, width)
context = cairo.Context(surface)
try:
svg.render_document(context, viewbox)
except AttributeError:
svg.render_cairo(context)
surface.finish()
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
# The namespace is mandatory, but only the …/2000/svg is valid.
ns = 'http://www.w3.org/2000/svg'
if meta.get('Xmlns') == ns:
meta.pop('Xmlns')
return meta
class PNGParser(_ImageParser):
class PNGParser(exiftool.ExiftoolParser):
mimetypes = {'image/png', }
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', 'FileInodeChangeDate',
'FilePermissions', 'FileType', 'FileTypeExtension',
......@@ -62,54 +67,85 @@ class PNGParser(_ImageParser):
def __init__(self, filename):
super().__init__(filename)
try: # better fail here than later
cairo.ImageSurface.create_from_png(self.filename)
except MemoryError:
except: # pragma: no cover
# Cairo is returning some weird exceptions :/
raise ValueError
def remove_all(self):
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
surface = cairo.ImageSurface.create_from_png(self.filename)
surface.write_to_png(self.output_filename)
return True
class GdkPixbufAbstractParser(_ImageParser):
class GIFParser(exiftool.ExiftoolParser):
mimetypes = {'image/gif'}
meta_allowlist = {'AnimationIterations', 'BackgroundColor', 'BitsPerPixel',
'ColorResolutionDepth', 'Directory', 'Duration',
'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'FrameCount', 'GIFVersion',
'HasColorMap', 'ImageHeight', 'ImageSize', 'ImageWidth',
'MIMEType', 'Megapixels', 'SourceFile',}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class GdkPixbufAbstractParser(exiftool.ExiftoolParser):
""" GdkPixbuf can handle a lot of surfaces, so we're rending images on it,
this has the side-effect of completely removing metadata.
"""
_type = ''
def remove_all(self):
def __init__(self, filename):
super().__init__(filename)
try:
GdkPixbuf.Pixbuf.new_from_file(self.filename)
except GLib.GError:
raise ValueError
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
_, extension = os.path.splitext(self.filename)
pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename)
if extension == '.jpg':
pixbuf = GdkPixbuf.Pixbuf.apply_embedded_orientation(pixbuf)
if extension.lower() == '.jpg':
extension = '.jpeg' # gdk is picky
pixbuf.savev(self.output_filename, extension[1:], [], [])
elif extension.lower() == '.tif':
extension = '.tiff' # gdk is picky
try:
pixbuf.savev(self.output_filename, type=extension[1:],
option_keys=[], option_values=[])
except GLib.GError: # pragma: no cover
return False
return True
def __init__(self, filename):
super().__init__(filename)
if imghdr.what(filename) != self._type: # better safe than sorry
raise ValueError
class JPGParser(GdkPixbufAbstractParser):
_type = 'jpeg'
mimetypes = {'image/jpeg'}
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight'}
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation'}
class TiffParser(GdkPixbufAbstractParser):
_type = 'tiff'
mimetypes = {'image/tiff'}
meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
meta_allowlist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
'FillOrder', 'PhotometricInterpretation',
'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel',
'StripByteCounts', 'StripOffsets', 'BitsPerSample',
......@@ -117,4 +153,58 @@ class TiffParser(GdkPixbufAbstractParser):
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'}
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile', 'Orientation'}
class PPMParser(abstract.AbstractParser):
mimetypes = {'image/x-portable-pixmap'}
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta: Dict[str, Union[str, Dict[Any, Any]]] = dict()
with open(self.filename) as f:
for idx, line in enumerate(f):
if line.lstrip().startswith('#'):
meta[str(idx)] = line.lstrip().rstrip()
return meta
def remove_all(self) -> bool:
with open(self.filename) as fin:
with open(self.output_filename, 'w') as fout:
for line in fin:
if not line.lstrip().startswith('#'):
line = re.sub(r"\s+", "", line, flags=re.UNICODE)
fout.write(line)
return True
class HEICParser(exiftool.ExiftoolParser):
mimetypes = {'image/heic'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'MajorBrand', 'MinorVersion',
'CompatibleBrands','HandlerType', 'PrimaryItemReference',
'HEVCConfigurationVersion', 'GeneralProfileSpace',
'GeneralTierFlag', 'GeneralProfileIDC',
'GenProfileCompatibilityFlags', 'ConstraintIndicatorFlags',
'GeneralLevelIDC', 'MinSpatialSegmentationIDC',
'ParallelismType','ChromaFormat', 'BitDepthLuma', 'BitDepthChroma',
'NumTemporalLayers', 'TemporalIDNested', 'ImageWidth',
'ImageHeight', 'ImageSpatialExtent', 'ImagePixelDepth',
'AverageFrameRate', 'ConstantFrameRate', 'MediaDataSize',
'MediaDataOffset','ImageSize', 'Megapixels'}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class WEBPParser(GdkPixbufAbstractParser):
mimetypes = {'image/webp'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation',
'HorizontalScale', 'VerticalScale', 'VP8Version'}
This diff is collapsed.
......@@ -2,14 +2,20 @@ import glob
import os
import mimetypes
import importlib
from typing import TypeVar, List, Tuple, Optional
from typing import TypeVar, Optional, List, Tuple
from . import abstract, UNSUPPORTED_EXTENSIONS
assert Tuple # make pyflakes happy
T = TypeVar('T', bound='abstract.AbstractParser')
mimetypes.add_type('application/epub+zip', '.epub')
mimetypes.add_type('application/x-dtbncx+xml', '.ncx') # EPUB Navigation Control XML File
# This should be removed after we move to python3.10
# https://github.com/python/cpython/commit/20a5b7e986377bdfd929d7e8c4e3db5847dfdb2d
mimetypes.add_type('image/heic', '.heic')
def __load_all_parsers():
""" Loads every parser in a dynamic way """
current_dir = os.path.dirname(__file__)
......@@ -18,12 +24,16 @@ def __load_all_parsers():
continue
elif fname.endswith('__init__.py'):
continue
elif fname.endswith('exiftool.py'):
continue
basename = os.path.basename(fname)
name, _ = os.path.splitext(basename)
importlib.import_module('.' + name, package='libmat2')
__load_all_parsers()
def _get_parsers() -> List[T]:
""" Get all our parsers!"""
def __get_parsers(cls):
......@@ -33,16 +43,22 @@ def _get_parsers() -> List[T]:
def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]:
""" Return the appropriate parser for a given filename.
:raises ValueError: Raised if the instantiation of the parser went wrong.
"""
mtype, _ = mimetypes.guess_type(filename)
_, extension = os.path.splitext(filename)
if extension in UNSUPPORTED_EXTENSIONS:
if extension.lower() in UNSUPPORTED_EXTENSIONS:
return None, mtype
if mtype == 'application/x-tar':
if extension[1:] in ('bz2', 'gz', 'xz'):
mtype = mtype + '+' + extension[1:]
for parser_class in _get_parsers(): # type: ignore
if mtype in parser_class.mimetypes:
try:
return parser_class(filename), mtype
except ValueError:
return None, mtype
# This instantiation might raise a ValueError on malformed files
return parser_class(filename), mtype
return None, mtype
......@@ -7,7 +7,7 @@ import re
import logging
import tempfile
import io
from distutils.version import LooseVersion
from typing import Union, Dict
import cairo
import gi
......@@ -16,12 +16,7 @@ from gi.repository import Poppler, GLib
from . import abstract
logging.basicConfig(level=logging.ERROR)
poppler_version = Poppler.get_version()
if LooseVersion(poppler_version) < LooseVersion('0.46'): # pragma: no cover
raise ValueError("MAT2 needs at least Poppler version 0.46 to work. \
The installed version is %s." % poppler_version) # pragma: no cover
FIXED_PDF_VERSION = cairo.PDFVersion.VERSION_1_5
class PDFParser(abstract.AbstractParser):
......@@ -33,13 +28,21 @@ class PDFParser(abstract.AbstractParser):
def __init__(self, filename):
super().__init__(filename)
self.uri = 'file://' + os.path.abspath(self.filename)
self.__scale = 2 # how much precision do we want for the render
self.__scale = 200 / 72.0 # how much precision do we want for the render
try: # Check now that the file is valid, to avoid surprises later
Poppler.Document.new_from_file(self.uri, None)
except GLib.GError: # Invalid PDF
raise ValueError
def remove_all_lightweight(self):
def remove_all(self) -> bool:
if self.lightweight_cleaning is True:
try:
return self.__remove_all_lightweight()
except (cairo.Error, MemoryError) as e:
raise RuntimeError(e)
return self.__remove_all_thorough()
def __remove_all_lightweight(self) -> bool:
"""
Load the document into Poppler, render pages on a new PDFSurface.
"""
......@@ -48,6 +51,7 @@ class PDFParser(abstract.AbstractParser):
tmp_path = tempfile.mkstemp()[1]
pdf_surface = cairo.PDFSurface(tmp_path, 10, 10) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface) # context draws on the surface
for pagenum in range(pages_count):
......@@ -66,7 +70,7 @@ class PDFParser(abstract.AbstractParser):
return True
def remove_all(self):
def __remove_all_thorough(self) -> bool:
"""
Load the document into Poppler, render pages on PNG,
and shove those PNG into a new PDF.
......@@ -76,15 +80,19 @@ class PDFParser(abstract.AbstractParser):
_, tmp_path = tempfile.mkstemp()
pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface)
for pagenum in range(pages_count):
page = document.get_page(pagenum)
if page is None: # pragma: no cover
logging.error("Unable to get PDF pages")
return False
page_width, page_height = page.get_size()
logging.info("Rendering page %d/%d", pagenum + 1, pages_count)
width = int(page_width) * self.__scale
height = int(page_height) * self.__scale
width = int(page_width * self.__scale)
height = int(page_height * self.__scale)
img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
img_context = cairo.Context(img_surface)
......@@ -98,7 +106,11 @@ class PDFParser(abstract.AbstractParser):
buf.seek(0)
img = cairo.ImageSurface.create_from_png(buf)
pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale)
if cairo.version_info < (1, 12, 0):
pdf_surface.set_size(width, height)
else:
pdf_surface.set_size(page_width, page_height)
pdf_surface.set_device_scale(1 / self.__scale, 1 / self.__scale)
pdf_context.set_source_surface(img, 0, 0)
pdf_context.paint()
pdf_context.show_page() # draw pdf_context on pdf_surface
......@@ -118,17 +130,27 @@ class PDFParser(abstract.AbstractParser):
document.set_creator('')
document.set_creation_date(-1)
document.save('file://' + os.path.abspath(out_file))
return True
# Cairo adds "/Producer" and "/CreationDate", and Poppler sometimes
# fails to remove them, we have to use this terrible regex.
# It should(tm) be alright though, because cairo's output format
# for metadata is fixed.
with open(out_file, 'rb') as f:
out = re.sub(rb'<<[\s\n]*/Producer.*?>>', b' << >>', f.read(),
count=0, flags=re.DOTALL | re.IGNORECASE)
with open(out_file, 'wb') as f:
f.write(out)
return True
@staticmethod
def __parse_metadata_field(data: str) -> dict:
def __parse_metadata_field(data: str) -> Dict[str, str]:
metadata = {}
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
metadata[key] = value
return metadata
def get_meta(self):
def get_meta(self) -> Dict[str, Union[str, Dict]]:
""" Return a dict with all the meta of the file
"""
metadata = {}
......
import logging
from typing import Union, Tuple, Dict
from typing import Union, Dict, List, Tuple
from . import abstract
logging.basicConfig(level=logging.ERROR)
class TorrentParser(abstract.AbstractParser):
mimetypes = {'application/x-bittorrent', }
whitelist = {b'announce', b'announce-list', b'info'}
allowlist = {b'announce', b'announce-list', b'info'}
def __init__(self, filename):
super().__init__(filename)
......@@ -17,18 +15,17 @@ class TorrentParser(abstract.AbstractParser):
if self.dict_repr is None:
raise ValueError
def get_meta(self) -> Dict[str, str]:
def get_meta(self) -> Dict[str, Union[str, Dict]]:
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.whitelist:
if key not in self.allowlist:
metadata[key.decode('utf-8')] = value
return metadata
def remove_all(self) -> bool:
cleaned = dict()
for key, value in self.dict_repr.items():
if key in self.whitelist:
if key in self.allowlist:
cleaned[key] = value
with open(self.output_filename, 'wb') as f:
f.write(_BencodeHandler().bencode(cleaned))
......@@ -36,10 +33,10 @@ class TorrentParser(abstract.AbstractParser):
return True
class _BencodeHandler(object):
class _BencodeHandler:
"""
Since bencode isn't that hard to parse,
MAT2 comes with its own parser, based on the spec
mat2 comes with its own parser, based on the spec
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
"""
def __init__(self):
......@@ -79,7 +76,7 @@ class _BencodeHandler(object):
s = s[1:]
return s[colon:colon+str_len], s[colon+str_len:]
def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
def __decode_list(self, s: bytes) -> Tuple[List, bytes]:
ret = list()
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
......@@ -87,7 +84,7 @@ class _BencodeHandler(object):
ret.append(value)
return ret, s[1:]
def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
def __decode_dict(self, s: bytes) -> Tuple[Dict, bytes]:
ret = dict()
s = s[1:] # skip leading `d`
while s[0] != ord(b'e'):
......@@ -116,16 +113,16 @@ class _BencodeHandler(object):
ret += self.__encode_func[type(value)](value)
return b'd' + ret + b'e'
def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
def bencode(self, s: Union[Dict, List, bytes, int]) -> bytes:
return self.__encode_func[type(s)](s)
def bdecode(self, s: bytes) -> Union[dict, None]:
def bdecode(self, s: bytes) -> Union[Dict, None]:
try:
ret, trail = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e:
logging.debug("Not a valid bencoded string: %s", e)
logging.warning("Not a valid bencoded string: %s", e)
return None
if trail != b'':
logging.debug("Invalid bencoded value (data after valid prefix)")
logging.warning("Invalid bencoded value (data after valid prefix)")
return None
return ret
import subprocess
import functools
import shutil
import logging
from typing import Union, Dict
from . import exiftool
from . import bubblewrap
class AbstractFFmpegParser(exiftool.ExiftoolParser):
""" Abstract parser for all FFmpeg-based ones, mainly for video. """
# Some fileformats have mandatory metadata fields
meta_key_value_allowlist: Dict[str, Union[str, int]] = dict()
def remove_all(self) -> bool:
if self.meta_key_value_allowlist:
logging.warning('The format of "%s" (%s) has some mandatory '
'metadata fields; mat2 filled them with standard '
'data.', self.filename, ', '.join(self.mimetypes))
cmd = [_get_ffmpeg_path(),
'-i', self.filename, # input file
'-y', # overwrite existing output file
'-map', '0', # copy everything all streams from input to output
'-codec', 'copy', # don't decode anything, just copy (speed!)
'-loglevel', 'panic', # Don't show log
'-hide_banner', # hide the banner
'-map_metadata', '-1', # remove supperficial metadata
'-map_chapters', '-1', # remove chapters
'-disposition', '0', # Remove dispositions (check ffmpeg's manpage)
'-fflags', '+bitexact', # don't add any metadata
'-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata
self.output_filename]
try:
if self.sandbox:
bubblewrap.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
ret: Dict[str, Union[str, Dict]] = dict()
for key, value in meta.items():
if key in self.meta_key_value_allowlist:
if value == self.meta_key_value_allowlist[key]:
continue
ret[key] = value
return ret
class WMVParser(AbstractFFmpegParser):
mimetypes = {'video/x-ms-wmv', }
meta_allowlist = {'AudioChannels', 'AudioCodecID', 'AudioCodecName',
'ErrorCorrectionType', 'AudioSampleRate', 'DataPackets',
'Directory', 'Duration', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileLength',
'FileModifyDate', 'FileName', 'FilePermissions',
'FileSize', 'FileType', 'FileTypeExtension',
'FrameCount', 'FrameRate', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'MaxBitrate', 'MaxPacketSize',
'Megapixels', 'MinPacketSize', 'Preroll', 'SendDuration',
'SourceFile', 'StreamNumber', 'VideoCodecName', }
meta_key_value_allowlist = { # some metadata are mandatory :/
'AudioCodecDescription': '',
'CreationDate': '0000:00:00 00:00:00Z',
'FileID': '00000000-0000-0000-0000-000000000000',
'Flags': 2, # FIXME: What is this? Why 2?
'ModifyDate': '0000:00:00 00:00:00',
'TimeOffset': '0 s',
'VideoCodecDescription': '',
'StreamType': 'Audio',
}
class AVIParser(AbstractFFmpegParser):
mimetypes = {'video/x-msvideo', }
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'FrameRate', 'MaxDataRate',
'FrameCount', 'StreamCount', 'StreamType', 'VideoCodec',
'VideoFrameRate', 'VideoFrameCount', 'Quality',
'SampleSize', 'BMPVersion', 'ImageWidth', 'ImageHeight',
'Planes', 'BitDepth', 'Compression', 'ImageLength',
'PixelsPerMeterX', 'PixelsPerMeterY',
'NumImportantColors', 'NumColors',
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
'ColorSpace', 'AudioCodec', 'AudioCodecRate',
'AudioSampleCount',
'AudioSampleRate', 'Encoding', 'NumChannels',
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'}
class MP4Parser(AbstractFFmpegParser):
mimetypes = {'video/mp4', }
meta_allowlist = {'AudioFormat', 'AvgBitrate', 'Balance', 'TrackDuration',
'XResolution', 'YResolution', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileModifyDate',
'FileName', 'FilePermissions', 'MIMEType', 'FileType',
'FileTypeExtension', 'Directory', 'ImageWidth',
'ImageSize', 'ImageHeight', 'FileSize', 'SourceFile',
'BitDepth', 'Duration', 'AudioChannels',
'AudioBitsPerSample', 'AudioSampleRate', 'Megapixels',
'MovieDataSize', 'VideoFrameRate', 'MediaTimeScale',
'SourceImageHeight', 'SourceImageWidth',
'MatrixStructure', 'MediaDuration'}
meta_key_value_allowlist = { # some metadata are mandatory :/
'CreateDate': '0000:00:00 00:00:00',
'CurrentTime': '0 s',
'MediaCreateDate': '0000:00:00 00:00:00',
'MediaLanguageCode': 'und',
'MediaModifyDate': '0000:00:00 00:00:00',
'ModifyDate': '0000:00:00 00:00:00',
'OpColor': '0 0 0',
'PosterTime': '0 s',
'PreferredRate': '1',
'PreferredVolume': '100.00%',
'PreviewDuration': '0 s',
'PreviewTime': '0 s',
'SelectionDuration': '0 s',
'SelectionTime': '0 s',
'TrackCreateDate': '0000:00:00 00:00:00',
'TrackModifyDate': '0000:00:00 00:00:00',
'TrackVolume': '0.00%',
}
@functools.lru_cache(maxsize=None)
def _get_ffmpeg_path() -> str: # pragma: no cover
which_path = shutil.which('ffmpeg')
if which_path:
return which_path
raise RuntimeError("Unable to find ffmpeg")
from html import parser, escape
from typing import Any, Optional, Dict, List, Tuple, Set
import re
import string
from . import abstract
# pylint: disable=too-many-instance-attributes
class CSSParser(abstract.AbstractParser):
"""There is no such things as metadata in CSS files,
only comments of the form `/* … */`, so we're removing the laters."""
mimetypes = {'text/css', }
flags = re.MULTILINE | re.DOTALL
def remove_all(self) -> bool:
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cleaned = re.sub(r'/\*.*?\*/', '', content, count=0, flags=self.flags)
with open(self.output_filename, 'w', encoding='utf-8') as f:
f.write(cleaned)
return True
def get_meta(self) -> Dict[str, Any]:
metadata = {}
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cssdoc = re.findall(r'/\*(.*?)\*/', content, self.flags)
for match in cssdoc:
for line in match.splitlines():
try:
k, v = line.split(':')
metadata[k.strip(string.whitespace + '*')] = v.strip()
except ValueError:
metadata['harmful data'] = line.strip()
return metadata
class AbstractHTMLParser(abstract.AbstractParser):
tags_blocklist: Set[str] = set()
# In some html/xml-based formats some tags are mandatory,
# so we're keeping them, but are discarding their content
tags_required_blocklist: Set[str] = set()
def __init__(self, filename):
super().__init__(filename)
self.__parser = _HTMLParser(self.filename, self.tags_blocklist,
self.tags_required_blocklist)
with open(filename, encoding='utf-8') as f:
self.__parser.feed(f.read())
self.__parser.close()
def get_meta(self) -> Dict[str, Any]:
return self.__parser.get_meta()
def remove_all(self) -> bool:
return self.__parser.remove_all(self.output_filename)
class HTMLParser(AbstractHTMLParser):
mimetypes = {'text/html', 'application/xhtml+xml'}
tags_blocklist = {'meta', }
tags_required_blocklist = {'title', }
class DTBNCXParser(AbstractHTMLParser):
mimetypes = {'application/x-dtbncx+xml', }
tags_required_blocklist = {'title', 'doctitle', 'meta'}
class _HTMLParser(parser.HTMLParser):
"""Python doesn't have a validating html parser in its stdlib, so
we're using an internal queue to track all the opening/closing tags,
and hoping for the best.
Moreover, the parser.HTMLParser call doesn't provide a get_endtag_text
method, so we have to use get_starttag_text instead, put its result in a
LIFO, and transform it in a closing tag when needed.
Also, gotcha: the `tag` parameters are always in lowercase.
"""
def __init__(self, filename, blocklisted_tags, required_blocklisted_tags):
super().__init__()
self.filename = filename
self.__textrepr = ''
self.__meta = {}
self.__validation_queue: List[str] = list()
# We're using counters instead of booleans, to handle nested tags
self.__in_dangerous_but_required_tag = 0
self.__in_dangerous_tag = 0
if required_blocklisted_tags & blocklisted_tags: # pragma: nocover
raise ValueError("There is an overlap between %s and %s" % (
required_blocklisted_tags, blocklisted_tags))
self.tag_required_blocklist = required_blocklisted_tags
self.tag_blocklist = blocklisted_tags
def error(self, message): # pragma: no cover
""" Amusingly, Python's documentation doesn't mention that this
function needs to be implemented in subclasses of the parent class
of parser.HTMLParser. This was found by fuzzing,
triggering the following exception:
NotImplementedError: subclasses of ParserBase must override error()
"""
raise ValueError(message)
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]):
# Ignore the type, because mypy is too stupid to infer
# that get_starttag_text() can't return None.
original_tag = self.get_starttag_text() # type: ignore
self.__validation_queue.append(original_tag) # type: ignore
if tag in self.tag_blocklist:
self.__in_dangerous_tag += 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += original_tag
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag += 1
def handle_endtag(self, tag: str):
if not self.__validation_queue:
raise ValueError("The closing tag %s doesn't have a corresponding "
"opening one in %s." % (tag, self.filename))
previous_tag = self.__validation_queue.pop()
previous_tag = previous_tag[1:-1] # remove < and >
previous_tag = previous_tag.split(' ')[0] # remove attributes
if tag != previous_tag.lower():
raise ValueError("The closing tag %s doesn't match the previous "
"tag %s in %s" %
(tag, previous_tag, self.filename))
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag -= 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
# There is no `get_endtag_text()` method :/
self.__textrepr += '</' + previous_tag + '>'
if tag in self.tag_blocklist:
self.__in_dangerous_tag -= 1
def handle_data(self, data: str):
if self.__in_dangerous_but_required_tag == 0:
if self.__in_dangerous_tag == 0:
if data.strip():
self.__textrepr += escape(data)
def handle_startendtag(self, tag: str,
attrs: List[Tuple[str, Optional[str]]]):
if tag in self.tag_required_blocklist | self.tag_blocklist:
meta = {k:v for k, v in attrs}
name = meta.get('name', 'harmful metadata')
content = meta.get('content', 'harmful data')
self.__meta[name] = content
if self.__in_dangerous_tag == 0:
if tag in self.tag_required_blocklist:
self.__textrepr += '<' + tag + ' />'
return
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += self.get_starttag_text()
def remove_all(self, output_filename: str) -> bool:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
with open(output_filename, 'w', encoding='utf-8') as f:
f.write(self.__textrepr)
return True
def get_meta(self) -> Dict[str, Any]:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
return self.__meta
#!/usr/bin/python3
#!/usr/bin/env python3
import os
from typing import Tuple
import shutil
from typing import List, Set, Dict
import sys
import itertools
import mimetypes
import argparse
import multiprocessing
import logging
import unicodedata
import concurrent.futures
try:
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS, check_dependencies
except ValueError as e:
print(e)
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
from libmat2 import check_dependencies, UnknownMemberPolicy
except ValueError as ex:
print(ex)
sys.exit(1)
__version__ = '0.2.0'
__version__ = '0.13.5'
def __check_file(filename: str, mode: int=os.R_OK) -> bool:
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
def __print_without_chars(s: str):
""" Remove control characters
We might use 'Cc' instead of 'C', but better safe than sorry
https://www.unicode.org/reports/tr44/#GC_Values_Table
"""
print(''.join(ch for ch in s if not unicodedata.category(ch).startswith('C')))
def __check_file(filename: str, mode: int = os.R_OK) -> bool:
if not os.path.exists(filename):
print("[-] %s is doesn't exist." % filename)
__print_without_chars("[-] %s doesn't exist." % filename)
return False
elif not os.path.isfile(filename):
print("[-] %s is not a regular file." % filename)
__print_without_chars("[-] %s is not a regular file." % filename)
return False
elif not os.access(filename, mode):
print("[-] %s is not readable and writeable." % filename)
mode_str: List[str] = list()
if mode & os.R_OK:
mode_str += 'readable'
if mode & os.W_OK:
mode_str += 'writeable'
__print_without_chars("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
return False
return True
def create_arg_parser():
def create_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
parser.add_argument('files', nargs='*', help='the files to process')
parser.add_argument('-v', '--version', action='version',
version='MAT2 %s' % __version__)
parser.add_argument('-l', '--list', action='store_true',
help='list all supported fileformats')
parser.add_argument('-c', '--check-dependencies', action='store_true',
help='check if MAT2 has all the dependencies it needs')
info = parser.add_mutually_exclusive_group()
info.add_argument('-s', '--show', action='store_true',
help='list harmful metadata detectable by MAT2 without removing them')
info.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
parser.add_argument('-V', '--verbose', action='store_true',
help='show more verbose status information')
parser.add_argument('--unknown-members', metavar='policy', default='abort',
help='how to handle unknown members of archive-style '
'files (policy should be one of: %s) [Default: abort]' %
', '.join(p.value for p in UnknownMemberPolicy))
parser.add_argument('--inplace', action='store_true',
help='clean in place, without backup')
parser.add_argument('--no-sandbox', dest='sandbox', action='store_false',
default=True, help='Disable bubblewrap\'s sandboxing')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('files', nargs='*', help='the files to process',
default=[])
excl_group.add_argument('-v', '--version', action='version',
version='mat2 %s' % __version__)
excl_group.add_argument('-l', '--list', action='store_true', default=False,
help='list all supported fileformats')
excl_group.add_argument('--check-dependencies', action='store_true',
default=False,
help='check if mat2 has all the dependencies it '
'needs')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
excl_group.add_argument('-s', '--show', action='store_true',
help='list harmful metadata detectable by mat2 '
'without removing them')
return parser
def show_meta(filename: str):
def show_meta(filename: str, sandbox: bool):
if not __check_file(filename):
return
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when processing %s: %s" % (filename, e))
return
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return
p.sandbox = sandbox
__print_meta(filename, p.get_meta())
def __print_meta(filename: str, metadata: Dict, depth: int = 1):
padding = " " * depth*2
if not metadata:
__print_without_chars(padding + "No metadata found in %s." % filename)
return
print("[+] Metadata for %s:" % filename)
for k, v in p.get_meta().items():
__print_without_chars("[%s] Metadata for %s:" % ('+'*depth, filename))
for (k, v) in sorted(metadata.items()):
if isinstance(v, dict):
__print_meta(k, v, depth+1)
continue
try: # FIXME this is ugly.
print(" %s: %s" % (k, v))
__print_without_chars(padding + " %s: %s" % (k, v))
except UnicodeEncodeError:
print(" %s: harmful content" % k)
__print_without_chars(padding + " %s: harmful content" % k)
except TypeError:
pass # for things that aren't iterable
def clean_meta(params: Tuple[str, bool]) -> bool:
filename, is_lightweigth = params
if not __check_file(filename, os.R_OK|os.W_OK):
def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
policy: UnknownMemberPolicy) -> bool:
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
if not __check_file(filename, mode):
return False
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when cleaning %s: %s" % (filename, e))
return False
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
if is_lightweigth:
return p.remove_all_lightweight()
return p.remove_all()
p.unknown_member_policy = policy
p.lightweight_cleaning = is_lightweight
p.sandbox = sandbox
try:
logging.debug('Cleaning %s…', filename)
ret = p.remove_all()
if ret is True:
shutil.copymode(filename, p.output_filename)
if inplace is True:
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
__print_without_chars("[-] %s can't be cleaned: %s" % (filename, e))
return False
def show_parsers():
print('[+] Supported formats:')
formats = list()
for parser in parser_factory._get_parsers():
formats = set() # Set[str]
for parser in parser_factory._get_parsers(): # type: ignore
for mtype in parser.mimetypes:
extensions = set()
extensions = set() # Set[str]
for extension in mimetypes.guess_all_extensions(mtype):
if extension[1:] not in UNSUPPORTED_EXTENSIONS: # skip the dot
if extension not in UNSUPPORTED_EXTENSIONS:
extensions.add(extension)
if not extensions:
# we're not supporting a single extension in the current
# mimetype, so there is not point in showing the mimetype at all
continue
formats.append(' - %s (%s)' % (mtype, ', '.join(extensions)))
formats.add(' - %s (%s)' % (mtype, ', '.join(extensions)))
print('\n'.join(sorted(formats)))
def __get_files_recursively(files):
def __get_files_recursively(files: List[str]) -> List[str]:
ret: Set[str] = set()
for f in files:
if os.path.isdir(f):
for path, _, _files in os.walk(f):
for _f in _files:
fname = os.path.join(path, _f)
if __check_file(fname):
yield fname
ret.add(fname)
elif __check_file(f):
yield f
ret.add(f)
return list(ret)
def main():
def main() -> int:
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger(__name__).setLevel(logging.DEBUG)
if not args.files:
if args.list:
show_parsers()
return 0
elif args.check_dependencies:
print("Dependencies required for MAT2 %s:" % __version__)
__print_without_chars("Dependencies for mat2 %s:" % __version__)
for key, value in sorted(check_dependencies().items()):
print('- %s: %s' % (key, 'yes' if value else 'no'))
__print_without_chars('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
'(optional)' if not value['required'] else ''))
else:
return arg_parser.print_help()
arg_parser.print_help()
return 0
elif args.show:
for f in __get_files_recursively(args.files):
show_meta(f)
show_meta(f, args.sandbox)
return 0
else:
p = multiprocessing.Pool()
mode = (args.lightweight is True)
l = zip(__get_files_recursively(args.files), itertools.repeat(mode))
inplace = args.inplace
policy = UnknownMemberPolicy(args.unknown_members)
if policy == UnknownMemberPolicy.KEEP:
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
no_failure = True
files = __get_files_recursively(args.files)
# We have to use Processes instead of Threads, since
# we're using tempfile.mkdtemp, which isn't thread-safe.
futures = list()
with concurrent.futures.ProcessPoolExecutor() as executor:
for f in files:
future = executor.submit(clean_meta, f, args.lightweight,
inplace, args.sandbox, policy)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
return 0 if no_failure is True else -1
ret = list(p.imap_unordered(clean_meta, list(l)))
return 0 if all(ret) else -1
if __name__ == '__main__':
sys.exit(main())
File deleted
#!/usr/bin/env python3
"""
Because writing GUI is non-trivial (cf. https://0xacab.org/jvoisin/mat2/issues/3),
we decided to write a Nautilus extensions instead
(cf. https://0xacab.org/jvoisin/mat2/issues/2).
The code is a little bit convoluted because Gtk isn't thread-safe,
so we're not allowed to call anything Gtk-related outside of the main
thread, so we'll have to resort to using a `queue` to pass "messages" around.
"""
# pylint: disable=no-name-in-module,unused-argument,no-self-use,import-error
import queue
import threading
from typing import Tuple
from urllib.parse import unquote
import gi
gi.require_version('Nautilus', '3.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import Nautilus, GObject, Gtk, Gio, GLib, GdkPixbuf
from libmat2 import parser_factory
# make pyflakes happy
assert Tuple
def _remove_metadata(fpath):
""" This is a simple wrapper around libmat2, because it's
easier and cleaner this way.
"""
parser, mtype = parser_factory.get_parser(fpath)
if parser is None:
return False, mtype
return parser.remove_all(), mtype
class ColumnExtension(GObject.GObject, Nautilus.MenuProvider, Nautilus.LocationWidgetProvider):
""" This class adds an item to the right-clic menu in Nautilus. """
def __init__(self):
super().__init__()
self.infobar_hbox = None
self.infobar = None
self.failed_items = list()
def __infobar_failure(self):
""" Add an hbox to the `infobar` warning about the fact that we didn't
manage to remove the metadata from every single file.
"""
self.infobar.set_show_close_button(True)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
btn = Gtk.Button("Show")
btn.connect("clicked", self.__cb_show_failed)
self.infobar_hbox.pack_end(btn, False, False, 0)
infobar_msg = Gtk.Label("Failed to clean some items")
self.infobar_hbox.pack_start(infobar_msg, False, False, 0)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
def get_widget(self, uri, window):
""" This is the method that we have to implement (because we're
a LocationWidgetProvider) in order to show our infobar.
"""
self.infobar = Gtk.InfoBar()
self.infobar.set_message_type(Gtk.MessageType.ERROR)
self.infobar.connect("response", self.__cb_infobar_response)
return self.infobar
def __cb_infobar_response(self, infobar, response):
""" Callback for the infobar close button.
"""
if response == Gtk.ResponseType.CLOSE:
self.infobar_hbox.destroy()
self.infobar.hide()
def __cb_show_failed(self, button):
""" Callback to show a popup containing a list of files
that we didn't manage to clean.
"""
# FIXME this should be done only once the window is destroyed
self.infobar_hbox.destroy()
self.infobar.hide()
window = Gtk.Window()
headerbar = Gtk.HeaderBar()
window.set_titlebar(headerbar)
headerbar.props.title = "Metadata removal failed"
close_buton = Gtk.Button("Close")
close_buton.connect("clicked", lambda _: window.close())
headerbar.pack_end(close_buton)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
window.add(box)
box.add(self.__create_treeview())
window.show_all()
@staticmethod
def __validate(fileinfo) -> Tuple[bool, str]:
""" Validate if a given file FileInfo `fileinfo` can be processed.
Returns a boolean, and a textreason why"""
if fileinfo.get_uri_scheme() != "file" or fileinfo.is_directory():
return False, "Not a file"
elif not fileinfo.can_write():
return False, "Not writeable"
return True, ""
def __create_treeview(self) -> Gtk.TreeView:
liststore = Gtk.ListStore(GdkPixbuf.Pixbuf, str, str)
treeview = Gtk.TreeView(model=liststore)
renderer_pixbuf = Gtk.CellRendererPixbuf()
column_pixbuf = Gtk.TreeViewColumn("Icon", renderer_pixbuf, pixbuf=0)
treeview.append_column(column_pixbuf)
for idx, name in enumerate(['File', 'Reason']):
renderer_text = Gtk.CellRendererText()
column_text = Gtk.TreeViewColumn(name, renderer_text, text=idx+1)
treeview.append_column(column_text)
for (fname, mtype, reason) in self.failed_items:
# This part is all about adding mimetype icons to the liststore
icon = Gio.content_type_get_icon('text/plain' if not mtype else mtype)
# in case we don't have the corresponding icon,
# we're adding `text/plain`, because we have this one for sure™
names = icon.get_names() + ['text/plain', ]
icon_theme = Gtk.IconTheme.get_default()
for name in names:
try:
img = icon_theme.load_icon(name, Gtk.IconSize.BUTTON, 0)
break
except GLib.GError:
pass
liststore.append([img, fname, reason])
treeview.show_all()
return treeview
def __create_progressbar(self) -> Gtk.ProgressBar:
""" Create the progressbar used to notify that files are currently
being processed.
"""
self.infobar.set_show_close_button(False)
self.infobar.set_message_type(Gtk.MessageType.INFO)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
progressbar = Gtk.ProgressBar()
self.infobar_hbox.pack_start(progressbar, True, True, 0)
progressbar.set_show_text(True)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
return progressbar
def __update_progressbar(self, processing_queue, progressbar) -> bool:
""" This method is run via `Glib.add_idle` to update the progressbar."""
try:
fname = processing_queue.get(block=False)
except queue.Empty:
return True
# `None` is the marker put in the queue to signal that every selected
# file was processed.
if fname is None:
self.infobar_hbox.destroy()
self.infobar.hide()
if len(self.failed_items):
self.__infobar_failure()
if not processing_queue.empty():
print("Something went wrong, the queue isn't empty :/")
return False
progressbar.pulse()
progressbar.set_text("Cleaning %s" % fname)
progressbar.show_all()
self.infobar_hbox.show_all()
self.infobar.show_all()
return True
def __clean_files(self, files: list, processing_queue: queue.Queue) -> bool:
""" This method is threaded in order to avoid blocking the GUI
while cleaning up the files.
"""
for fileinfo in files:
fname = fileinfo.get_name()
processing_queue.put(fname)
valid, reason = self.__validate(fileinfo)
if not valid:
self.failed_items.append((fname, None, reason))
continue
fpath = unquote(fileinfo.get_uri()[7:]) # `len('file://') = 7`
success, mtype = _remove_metadata(fpath)
if not success:
self.failed_items.append((fname, mtype, 'Unsupported/invalid'))
processing_queue.put(None) # signal that we processed all the files
return True
def __cb_menu_activate(self, menu, files):
""" This method is called when the user clicked the "clean metadata"
menu item.
"""
self.failed_items = list()
progressbar = self.__create_progressbar()
progressbar.set_pulse_step = 1.0 / len(files)
self.infobar.show_all()
processing_queue = queue.Queue()
GLib.idle_add(self.__update_progressbar, processing_queue, progressbar)
thread = threading.Thread(target=self.__clean_files, args=(files, processing_queue))
thread.daemon = True
thread.start()
def get_background_items(self, window, file):
""" https://bugzilla.gnome.org/show_bug.cgi?id=784278 """
return None
def get_file_items(self, window, files):
""" This method is the one allowing us to create a menu item.
"""
# Do not show the menu item if not a single file has a chance to be
# processed by mat2.
if not any([is_valid for (is_valid, _) in map(self.__validate, files)]):
return None
item = Nautilus.MenuItem(
name="MAT2::Remove_metadata",
label="Remove metadata",
tip="Remove metadata"
)
item.connect('activate', self.__cb_menu_activate, files)
return [item, ]
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.