Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tguinot/mat2
  • jvoisin/mat2
  • dachary/mat2
  • mejo-/mat2
  • LogicalDash/mat2
  • dkg/mat2
  • christian/mat2
  • Selflike323/mat2
  • fz/mat2
  • iwwmidatlanticgdc/mat2
  • Gu1nn3zz/mat2
  • smagnin/mat2
  • flashcode/mat2
  • MANCASTILLEJA/mat2
  • jboursier/mat2
  • tails/mat2
  • matiargs/mat2
  • Brolf/mat2
  • madaidan/mat2
  • Delmer84/mat2
  • yuebyzua/mat2
  • yyyyyyyan/mat2
  • rmnvgr/mat2
  • Marxism-Leninism/mat2
  • GNUtoo/mat2
  • allexj/mat2
  • b068931cc450442b63f5b3d276ea4297/mat2
  • chenrui/mat2
  • nosec13346/mat2
  • anelki/mat2
30 results
Show changes
Showing with 1034 additions and 485 deletions
import subprocess
import imghdr
import json
import os
import shutil
import tempfile
import re
from typing import Set
from typing import Union, Any, Dict
import cairo
import gi
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import GdkPixbuf
from . import abstract, _get_exiftool_path
# Make pyflakes happy
assert Set
class _ImageParser(abstract.AbstractParser):
""" Since we use `exiftool` to get metadata from
all images fileformat, `get_meta` is implemented in this class,
and all the image-handling ones are inheriting from it."""
meta_whitelist = set() # type: Set[str]
@staticmethod
def __handle_problematic_filename(filename: str, callback) -> str:
""" This method takes a filename with a problematic name,
and safely applies it a `callback`."""
tmpdirname = tempfile.mkdtemp()
fname = os.path.join(tmpdirname, "temp_file")
shutil.copy(filename, fname)
out = callback(fname)
shutil.rmtree(tmpdirname)
return out
def get_meta(self):
""" There is no way to escape the leading(s) dash(es) of the current
self.filename to prevent parameter injections, so we need to take care
of this.
"""
fun = lambda f: subprocess.check_output([_get_exiftool_path(), '-json', f])
if re.search('^[a-z0-9/]', self.filename) is None:
out = self.__handle_problematic_filename(self.filename, fun)
else:
out = fun(self.filename)
meta = json.loads(out.decode('utf-8'))[0]
for key in self.meta_whitelist:
meta.pop(key, None)
gi.require_version('Rsvg', '2.0')
from gi.repository import GdkPixbuf, GLib, Rsvg
from . import exiftool, abstract
class SVGParser(exiftool.ExiftoolParser):
mimetypes = {'image/svg+xml', }
meta_allowlist = {'Directory', 'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageWidth',
'MIMEType', 'SVGVersion', 'SourceFile', 'ViewBox'
}
def remove_all(self) -> bool:
try:
svg = Rsvg.Handle.new_from_file(self.filename)
except GLib.GError:
raise ValueError
try:
_, _, _, _, has_viewbox, viewbox = svg.get_intrinsic_dimensions()
if has_viewbox is False:
raise ValueError
_, width, height = svg.get_intrinsic_size_in_pixels()
except AttributeError:
dimensions = svg.get_dimensions()
height, width = dimensions.height, dimensions.width
surface = cairo.SVGSurface(self.output_filename, height, width)
context = cairo.Context(surface)
try:
svg.render_document(context, viewbox)
except AttributeError:
svg.render_cairo(context)
surface.finish()
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
# The namespace is mandatory, but only the …/2000/svg is valid.
ns = 'http://www.w3.org/2000/svg'
if meta.get('Xmlns') == ns:
meta.pop('Xmlns')
return meta
class PNGParser(_ImageParser):
class PNGParser(exiftool.ExiftoolParser):
mimetypes = {'image/png', }
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', 'FileInodeChangeDate',
'FilePermissions', 'FileType', 'FileTypeExtension',
......@@ -63,57 +68,84 @@ class PNGParser(_ImageParser):
def __init__(self, filename):
super().__init__(filename)
if imghdr.what(filename) != 'png':
raise ValueError
try: # better fail here than later
cairo.ImageSurface.create_from_png(self.filename)
except MemoryError: # pragma: no cover
except: # pragma: no cover
# Cairo is returning some weird exceptions :/
raise ValueError
def remove_all(self):
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
surface = cairo.ImageSurface.create_from_png(self.filename)
surface.write_to_png(self.output_filename)
return True
class GdkPixbufAbstractParser(_ImageParser):
class GIFParser(exiftool.ExiftoolParser):
mimetypes = {'image/gif'}
meta_allowlist = {'AnimationIterations', 'BackgroundColor', 'BitsPerPixel',
'ColorResolutionDepth', 'Directory', 'Duration',
'ExifToolVersion', 'FileAccessDate',
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'FrameCount', 'GIFVersion',
'HasColorMap', 'ImageHeight', 'ImageSize', 'ImageWidth',
'MIMEType', 'Megapixels', 'SourceFile',}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class GdkPixbufAbstractParser(exiftool.ExiftoolParser):
""" GdkPixbuf can handle a lot of surfaces, so we're rending images on it,
this has the side-effect of completely removing metadata.
"""
_type = ''
def remove_all(self):
def __init__(self, filename):
super().__init__(filename)
try:
GdkPixbuf.Pixbuf.new_from_file(self.filename)
except GLib.GError:
raise ValueError
def remove_all(self) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
_, extension = os.path.splitext(self.filename)
pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename)
pixbuf = GdkPixbuf.Pixbuf.apply_embedded_orientation(pixbuf)
if extension.lower() == '.jpg':
extension = '.jpeg' # gdk is picky
pixbuf.savev(self.output_filename, extension[1:], [], [])
elif extension.lower() == '.tif':
extension = '.tiff' # gdk is picky
try:
pixbuf.savev(self.output_filename, type=extension[1:],
option_keys=[], option_values=[])
except GLib.GError: # pragma: no cover
return False
return True
def __init__(self, filename):
super().__init__(filename)
if imghdr.what(filename) != self._type: # better safe than sorry
raise ValueError
class JPGParser(GdkPixbufAbstractParser):
_type = 'jpeg'
mimetypes = {'image/jpeg'}
meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName',
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight'}
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation'}
class TiffParser(GdkPixbufAbstractParser):
_type = 'tiff'
mimetypes = {'image/tiff'}
meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
meta_allowlist = {'Compression', 'ExifByteOrder', 'ExtraSamples',
'FillOrder', 'PhotometricInterpretation',
'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel',
'StripByteCounts', 'StripOffsets', 'BitsPerSample',
......@@ -121,4 +153,58 @@ class TiffParser(GdkPixbufAbstractParser):
'FileInodeChangeDate', 'FileModifyDate', 'FileName',
'FilePermissions', 'FileSize', 'FileType',
'FileTypeExtension', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'}
'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile', 'Orientation'}
class PPMParser(abstract.AbstractParser):
mimetypes = {'image/x-portable-pixmap'}
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta: Dict[str, Union[str, Dict[Any, Any]]] = dict()
with open(self.filename) as f:
for idx, line in enumerate(f):
if line.lstrip().startswith('#'):
meta[str(idx)] = line.lstrip().rstrip()
return meta
def remove_all(self) -> bool:
with open(self.filename) as fin:
with open(self.output_filename, 'w') as fout:
for line in fin:
if not line.lstrip().startswith('#'):
line = re.sub(r"\s+", "", line, flags=re.UNICODE)
fout.write(line)
return True
class HEICParser(exiftool.ExiftoolParser):
mimetypes = {'image/heic'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'MajorBrand', 'MinorVersion',
'CompatibleBrands','HandlerType', 'PrimaryItemReference',
'HEVCConfigurationVersion', 'GeneralProfileSpace',
'GeneralTierFlag', 'GeneralProfileIDC',
'GenProfileCompatibilityFlags', 'ConstraintIndicatorFlags',
'GeneralLevelIDC', 'MinSpatialSegmentationIDC',
'ParallelismType','ChromaFormat', 'BitDepthLuma', 'BitDepthChroma',
'NumTemporalLayers', 'TemporalIDNested', 'ImageWidth',
'ImageHeight', 'ImageSpatialExtent', 'ImagePixelDepth',
'AverageFrameRate', 'ConstantFrameRate', 'MediaDataSize',
'MediaDataOffset','ImageSize', 'Megapixels'}
def remove_all(self) -> bool:
return self._lightweight_cleanup()
class WEBPParser(GdkPixbufAbstractParser):
mimetypes = {'image/webp'}
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName',
'Directory', 'FileSize', 'FileModifyDate',
'FileAccessDate', "FileInodeChangeDate",
'FilePermissions', 'FileType', 'FileTypeExtension',
'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample',
'ColorComponents', 'EncodingProcess', 'JFIFVersion',
'ResolutionUnit', 'XResolution', 'YCbCrSubSampling',
'YResolution', 'Megapixels', 'ImageHeight', 'Orientation',
'HorizontalScale', 'VerticalScale', 'VP8Version'}
This diff is collapsed.
......@@ -2,14 +2,20 @@ import glob
import os
import mimetypes
import importlib
from typing import TypeVar, List, Tuple, Optional
from typing import TypeVar, Optional, List, Tuple
from . import abstract, UNSUPPORTED_EXTENSIONS
assert Tuple # make pyflakes happy
T = TypeVar('T', bound='abstract.AbstractParser')
mimetypes.add_type('application/epub+zip', '.epub')
mimetypes.add_type('application/x-dtbncx+xml', '.ncx') # EPUB Navigation Control XML File
# This should be removed after we move to python3.10
# https://github.com/python/cpython/commit/20a5b7e986377bdfd929d7e8c4e3db5847dfdb2d
mimetypes.add_type('image/heic', '.heic')
def __load_all_parsers():
""" Loads every parser in a dynamic way """
current_dir = os.path.dirname(__file__)
......@@ -18,12 +24,16 @@ def __load_all_parsers():
continue
elif fname.endswith('__init__.py'):
continue
elif fname.endswith('exiftool.py'):
continue
basename = os.path.basename(fname)
name, _ = os.path.splitext(basename)
importlib.import_module('.' + name, package='libmat2')
__load_all_parsers()
def _get_parsers() -> List[T]:
""" Get all our parsers!"""
def __get_parsers(cls):
......@@ -33,17 +43,22 @@ def _get_parsers() -> List[T]:
def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]:
""" Return the appropriate parser for a giver filename. """
""" Return the appropriate parser for a given filename.
:raises ValueError: Raised if the instantiation of the parser went wrong.
"""
mtype, _ = mimetypes.guess_type(filename)
_, extension = os.path.splitext(filename)
if extension.lower() in UNSUPPORTED_EXTENSIONS:
return None, mtype
if mtype == 'application/x-tar':
if extension[1:] in ('bz2', 'gz', 'xz'):
mtype = mtype + '+' + extension[1:]
for parser_class in _get_parsers(): # type: ignore
if mtype in parser_class.mimetypes:
try:
return parser_class(filename), mtype
except ValueError:
return None, mtype
# This instantiation might raise a ValueError on malformed files
return parser_class(filename), mtype
return None, mtype
......@@ -7,7 +7,7 @@ import re
import logging
import tempfile
import io
from distutils.version import LooseVersion
from typing import Union, Dict
import cairo
import gi
......@@ -16,10 +16,7 @@ from gi.repository import Poppler, GLib
from . import abstract
poppler_version = Poppler.get_version()
if LooseVersion(poppler_version) < LooseVersion('0.46'): # pragma: no cover
raise ValueError("MAT2 needs at least Poppler version 0.46 to work. \
The installed version is %s." % poppler_version) # pragma: no cover
FIXED_PDF_VERSION = cairo.PDFVersion.VERSION_1_5
class PDFParser(abstract.AbstractParser):
......@@ -31,13 +28,21 @@ class PDFParser(abstract.AbstractParser):
def __init__(self, filename):
super().__init__(filename)
self.uri = 'file://' + os.path.abspath(self.filename)
self.__scale = 2 # how much precision do we want for the render
self.__scale = 200 / 72.0 # how much precision do we want for the render
try: # Check now that the file is valid, to avoid surprises later
Poppler.Document.new_from_file(self.uri, None)
except GLib.GError: # Invalid PDF
raise ValueError
def remove_all_lightweight(self):
def remove_all(self) -> bool:
if self.lightweight_cleaning is True:
try:
return self.__remove_all_lightweight()
except (cairo.Error, MemoryError) as e:
raise RuntimeError(e)
return self.__remove_all_thorough()
def __remove_all_lightweight(self) -> bool:
"""
Load the document into Poppler, render pages on a new PDFSurface.
"""
......@@ -46,6 +51,7 @@ class PDFParser(abstract.AbstractParser):
tmp_path = tempfile.mkstemp()[1]
pdf_surface = cairo.PDFSurface(tmp_path, 10, 10) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface) # context draws on the surface
for pagenum in range(pages_count):
......@@ -64,7 +70,7 @@ class PDFParser(abstract.AbstractParser):
return True
def remove_all(self):
def __remove_all_thorough(self) -> bool:
"""
Load the document into Poppler, render pages on PNG,
and shove those PNG into a new PDF.
......@@ -74,15 +80,19 @@ class PDFParser(abstract.AbstractParser):
_, tmp_path = tempfile.mkstemp()
pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway
pdf_surface.restrict_to_version(FIXED_PDF_VERSION)
pdf_context = cairo.Context(pdf_surface)
for pagenum in range(pages_count):
page = document.get_page(pagenum)
if page is None: # pragma: no cover
logging.error("Unable to get PDF pages")
return False
page_width, page_height = page.get_size()
logging.info("Rendering page %d/%d", pagenum + 1, pages_count)
width = int(page_width) * self.__scale
height = int(page_height) * self.__scale
width = int(page_width * self.__scale)
height = int(page_height * self.__scale)
img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
img_context = cairo.Context(img_surface)
......@@ -96,7 +106,11 @@ class PDFParser(abstract.AbstractParser):
buf.seek(0)
img = cairo.ImageSurface.create_from_png(buf)
pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale)
if cairo.version_info < (1, 12, 0):
pdf_surface.set_size(width, height)
else:
pdf_surface.set_size(page_width, page_height)
pdf_surface.set_device_scale(1 / self.__scale, 1 / self.__scale)
pdf_context.set_source_surface(img, 0, 0)
pdf_context.paint()
pdf_context.show_page() # draw pdf_context on pdf_surface
......@@ -116,16 +130,27 @@ class PDFParser(abstract.AbstractParser):
document.set_creator('')
document.set_creation_date(-1)
document.save('file://' + os.path.abspath(out_file))
# Cairo adds "/Producer" and "/CreationDate", and Poppler sometimes
# fails to remove them, we have to use this terrible regex.
# It should(tm) be alright though, because cairo's output format
# for metadata is fixed.
with open(out_file, 'rb') as f:
out = re.sub(rb'<<[\s\n]*/Producer.*?>>', b' << >>', f.read(),
count=0, flags=re.DOTALL | re.IGNORECASE)
with open(out_file, 'wb') as f:
f.write(out)
return True
@staticmethod
def __parse_metadata_field(data: str) -> dict:
def __parse_metadata_field(data: str) -> Dict[str, str]:
metadata = {}
for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)</\1:\2>", data, re.I):
metadata[key] = value
return metadata
def get_meta(self):
def get_meta(self) -> Dict[str, Union[str, Dict]]:
""" Return a dict with all the meta of the file
"""
metadata = {}
......
import logging
from typing import Union, Tuple, Dict
from typing import Union, Dict, List, Tuple
from . import abstract
class TorrentParser(abstract.AbstractParser):
mimetypes = {'application/x-bittorrent', }
whitelist = {b'announce', b'announce-list', b'info'}
allowlist = {b'announce', b'announce-list', b'info'}
def __init__(self, filename):
super().__init__(filename)
......@@ -14,17 +15,17 @@ class TorrentParser(abstract.AbstractParser):
if self.dict_repr is None:
raise ValueError
def get_meta(self) -> Dict[str, str]:
def get_meta(self) -> Dict[str, Union[str, Dict]]:
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.whitelist:
if key not in self.allowlist:
metadata[key.decode('utf-8')] = value
return metadata
def remove_all(self) -> bool:
cleaned = dict()
for key, value in self.dict_repr.items():
if key in self.whitelist:
if key in self.allowlist:
cleaned[key] = value
with open(self.output_filename, 'wb') as f:
f.write(_BencodeHandler().bencode(cleaned))
......@@ -32,10 +33,10 @@ class TorrentParser(abstract.AbstractParser):
return True
class _BencodeHandler(object):
class _BencodeHandler:
"""
Since bencode isn't that hard to parse,
MAT2 comes with its own parser, based on the spec
mat2 comes with its own parser, based on the spec
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
"""
def __init__(self):
......@@ -75,7 +76,7 @@ class _BencodeHandler(object):
s = s[1:]
return s[colon:colon+str_len], s[colon+str_len:]
def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
def __decode_list(self, s: bytes) -> Tuple[List, bytes]:
ret = list()
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
......@@ -83,7 +84,7 @@ class _BencodeHandler(object):
ret.append(value)
return ret, s[1:]
def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
def __decode_dict(self, s: bytes) -> Tuple[Dict, bytes]:
ret = dict()
s = s[1:] # skip leading `d`
while s[0] != ord(b'e'):
......@@ -112,10 +113,10 @@ class _BencodeHandler(object):
ret += self.__encode_func[type(value)](value)
return b'd' + ret + b'e'
def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
def bencode(self, s: Union[Dict, List, bytes, int]) -> bytes:
return self.__encode_func[type(s)](s)
def bdecode(self, s: bytes) -> Union[dict, None]:
def bdecode(self, s: bytes) -> Union[Dict, None]:
try:
ret, trail = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e:
......
import subprocess
import functools
import shutil
import logging
from typing import Union, Dict
from . import exiftool
from . import bubblewrap
class AbstractFFmpegParser(exiftool.ExiftoolParser):
""" Abstract parser for all FFmpeg-based ones, mainly for video. """
# Some fileformats have mandatory metadata fields
meta_key_value_allowlist: Dict[str, Union[str, int]] = dict()
def remove_all(self) -> bool:
if self.meta_key_value_allowlist:
logging.warning('The format of "%s" (%s) has some mandatory '
'metadata fields; mat2 filled them with standard '
'data.', self.filename, ', '.join(self.mimetypes))
cmd = [_get_ffmpeg_path(),
'-i', self.filename, # input file
'-y', # overwrite existing output file
'-map', '0', # copy everything all streams from input to output
'-codec', 'copy', # don't decode anything, just copy (speed!)
'-loglevel', 'panic', # Don't show log
'-hide_banner', # hide the banner
'-map_metadata', '-1', # remove supperficial metadata
'-map_chapters', '-1', # remove chapters
'-disposition', '0', # Remove dispositions (check ffmpeg's manpage)
'-fflags', '+bitexact', # don't add any metadata
'-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata
self.output_filename]
try:
if self.sandbox:
bubblewrap.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
else:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
return True
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
ret: Dict[str, Union[str, Dict]] = dict()
for key, value in meta.items():
if key in self.meta_key_value_allowlist:
if value == self.meta_key_value_allowlist[key]:
continue
ret[key] = value
return ret
class WMVParser(AbstractFFmpegParser):
mimetypes = {'video/x-ms-wmv', }
meta_allowlist = {'AudioChannels', 'AudioCodecID', 'AudioCodecName',
'ErrorCorrectionType', 'AudioSampleRate', 'DataPackets',
'Directory', 'Duration', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileLength',
'FileModifyDate', 'FileName', 'FilePermissions',
'FileSize', 'FileType', 'FileTypeExtension',
'FrameCount', 'FrameRate', 'ImageHeight', 'ImageSize',
'ImageWidth', 'MIMEType', 'MaxBitrate', 'MaxPacketSize',
'Megapixels', 'MinPacketSize', 'Preroll', 'SendDuration',
'SourceFile', 'StreamNumber', 'VideoCodecName', }
meta_key_value_allowlist = { # some metadata are mandatory :/
'AudioCodecDescription': '',
'CreationDate': '0000:00:00 00:00:00Z',
'FileID': '00000000-0000-0000-0000-000000000000',
'Flags': 2, # FIXME: What is this? Why 2?
'ModifyDate': '0000:00:00 00:00:00',
'TimeOffset': '0 s',
'VideoCodecDescription': '',
'StreamType': 'Audio',
}
class AVIParser(AbstractFFmpegParser):
mimetypes = {'video/x-msvideo', }
meta_allowlist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory',
'FileSize', 'FileModifyDate', 'FileAccessDate',
'FileInodeChangeDate', 'FilePermissions', 'FileType',
'FileTypeExtension', 'MIMEType', 'FrameRate', 'MaxDataRate',
'FrameCount', 'StreamCount', 'StreamType', 'VideoCodec',
'VideoFrameRate', 'VideoFrameCount', 'Quality',
'SampleSize', 'BMPVersion', 'ImageWidth', 'ImageHeight',
'Planes', 'BitDepth', 'Compression', 'ImageLength',
'PixelsPerMeterX', 'PixelsPerMeterY',
'NumImportantColors', 'NumColors',
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
'ColorSpace', 'AudioCodec', 'AudioCodecRate',
'AudioSampleCount',
'AudioSampleRate', 'Encoding', 'NumChannels',
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'}
class MP4Parser(AbstractFFmpegParser):
mimetypes = {'video/mp4', }
meta_allowlist = {'AudioFormat', 'AvgBitrate', 'Balance', 'TrackDuration',
'XResolution', 'YResolution', 'ExifToolVersion',
'FileAccessDate', 'FileInodeChangeDate', 'FileModifyDate',
'FileName', 'FilePermissions', 'MIMEType', 'FileType',
'FileTypeExtension', 'Directory', 'ImageWidth',
'ImageSize', 'ImageHeight', 'FileSize', 'SourceFile',
'BitDepth', 'Duration', 'AudioChannels',
'AudioBitsPerSample', 'AudioSampleRate', 'Megapixels',
'MovieDataSize', 'VideoFrameRate', 'MediaTimeScale',
'SourceImageHeight', 'SourceImageWidth',
'MatrixStructure', 'MediaDuration'}
meta_key_value_allowlist = { # some metadata are mandatory :/
'CreateDate': '0000:00:00 00:00:00',
'CurrentTime': '0 s',
'MediaCreateDate': '0000:00:00 00:00:00',
'MediaLanguageCode': 'und',
'MediaModifyDate': '0000:00:00 00:00:00',
'ModifyDate': '0000:00:00 00:00:00',
'OpColor': '0 0 0',
'PosterTime': '0 s',
'PreferredRate': '1',
'PreferredVolume': '100.00%',
'PreviewDuration': '0 s',
'PreviewTime': '0 s',
'SelectionDuration': '0 s',
'SelectionTime': '0 s',
'TrackCreateDate': '0000:00:00 00:00:00',
'TrackModifyDate': '0000:00:00 00:00:00',
'TrackVolume': '0.00%',
}
@functools.lru_cache(maxsize=None)
def _get_ffmpeg_path() -> str: # pragma: no cover
which_path = shutil.which('ffmpeg')
if which_path:
return which_path
raise RuntimeError("Unable to find ffmpeg")
from html import parser, escape
from typing import Any, Optional, Dict, List, Tuple, Set
import re
import string
from . import abstract
# pylint: disable=too-many-instance-attributes
class CSSParser(abstract.AbstractParser):
"""There is no such things as metadata in CSS files,
only comments of the form `/* … */`, so we're removing the laters."""
mimetypes = {'text/css', }
flags = re.MULTILINE | re.DOTALL
def remove_all(self) -> bool:
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cleaned = re.sub(r'/\*.*?\*/', '', content, count=0, flags=self.flags)
with open(self.output_filename, 'w', encoding='utf-8') as f:
f.write(cleaned)
return True
def get_meta(self) -> Dict[str, Any]:
metadata = {}
with open(self.filename, encoding='utf-8') as f:
try:
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cssdoc = re.findall(r'/\*(.*?)\*/', content, self.flags)
for match in cssdoc:
for line in match.splitlines():
try:
k, v = line.split(':')
metadata[k.strip(string.whitespace + '*')] = v.strip()
except ValueError:
metadata['harmful data'] = line.strip()
return metadata
class AbstractHTMLParser(abstract.AbstractParser):
tags_blocklist: Set[str] = set()
# In some html/xml-based formats some tags are mandatory,
# so we're keeping them, but are discarding their content
tags_required_blocklist: Set[str] = set()
def __init__(self, filename):
super().__init__(filename)
self.__parser = _HTMLParser(self.filename, self.tags_blocklist,
self.tags_required_blocklist)
with open(filename, encoding='utf-8') as f:
self.__parser.feed(f.read())
self.__parser.close()
def get_meta(self) -> Dict[str, Any]:
return self.__parser.get_meta()
def remove_all(self) -> bool:
return self.__parser.remove_all(self.output_filename)
class HTMLParser(AbstractHTMLParser):
mimetypes = {'text/html', 'application/xhtml+xml'}
tags_blocklist = {'meta', }
tags_required_blocklist = {'title', }
class DTBNCXParser(AbstractHTMLParser):
mimetypes = {'application/x-dtbncx+xml', }
tags_required_blocklist = {'title', 'doctitle', 'meta'}
class _HTMLParser(parser.HTMLParser):
"""Python doesn't have a validating html parser in its stdlib, so
we're using an internal queue to track all the opening/closing tags,
and hoping for the best.
Moreover, the parser.HTMLParser call doesn't provide a get_endtag_text
method, so we have to use get_starttag_text instead, put its result in a
LIFO, and transform it in a closing tag when needed.
Also, gotcha: the `tag` parameters are always in lowercase.
"""
def __init__(self, filename, blocklisted_tags, required_blocklisted_tags):
super().__init__()
self.filename = filename
self.__textrepr = ''
self.__meta = {}
self.__validation_queue: List[str] = list()
# We're using counters instead of booleans, to handle nested tags
self.__in_dangerous_but_required_tag = 0
self.__in_dangerous_tag = 0
if required_blocklisted_tags & blocklisted_tags: # pragma: nocover
raise ValueError("There is an overlap between %s and %s" % (
required_blocklisted_tags, blocklisted_tags))
self.tag_required_blocklist = required_blocklisted_tags
self.tag_blocklist = blocklisted_tags
def error(self, message): # pragma: no cover
""" Amusingly, Python's documentation doesn't mention that this
function needs to be implemented in subclasses of the parent class
of parser.HTMLParser. This was found by fuzzing,
triggering the following exception:
NotImplementedError: subclasses of ParserBase must override error()
"""
raise ValueError(message)
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]):
# Ignore the type, because mypy is too stupid to infer
# that get_starttag_text() can't return None.
original_tag = self.get_starttag_text() # type: ignore
self.__validation_queue.append(original_tag) # type: ignore
if tag in self.tag_blocklist:
self.__in_dangerous_tag += 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += original_tag
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag += 1
def handle_endtag(self, tag: str):
if not self.__validation_queue:
raise ValueError("The closing tag %s doesn't have a corresponding "
"opening one in %s." % (tag, self.filename))
previous_tag = self.__validation_queue.pop()
previous_tag = previous_tag[1:-1] # remove < and >
previous_tag = previous_tag.split(' ')[0] # remove attributes
if tag != previous_tag.lower():
raise ValueError("The closing tag %s doesn't match the previous "
"tag %s in %s" %
(tag, previous_tag, self.filename))
if tag in self.tag_required_blocklist:
self.__in_dangerous_but_required_tag -= 1
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
# There is no `get_endtag_text()` method :/
self.__textrepr += '</' + previous_tag + '>'
if tag in self.tag_blocklist:
self.__in_dangerous_tag -= 1
def handle_data(self, data: str):
if self.__in_dangerous_but_required_tag == 0:
if self.__in_dangerous_tag == 0:
if data.strip():
self.__textrepr += escape(data)
def handle_startendtag(self, tag: str,
attrs: List[Tuple[str, Optional[str]]]):
if tag in self.tag_required_blocklist | self.tag_blocklist:
meta = {k:v for k, v in attrs}
name = meta.get('name', 'harmful metadata')
content = meta.get('content', 'harmful data')
self.__meta[name] = content
if self.__in_dangerous_tag == 0:
if tag in self.tag_required_blocklist:
self.__textrepr += '<' + tag + ' />'
return
if self.__in_dangerous_tag == 0:
if self.__in_dangerous_but_required_tag == 0:
self.__textrepr += self.get_starttag_text()
def remove_all(self, output_filename: str) -> bool:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
with open(output_filename, 'w', encoding='utf-8') as f:
f.write(self.__textrepr)
return True
def get_meta(self) -> Dict[str, Any]:
if self.__validation_queue:
raise ValueError("Some tags (%s) were left unclosed in %s" % (
', '.join(self.__validation_queue),
self.filename))
return self.__meta
#!/usr/bin/env python3
import os
from typing import Tuple, Generator, List
import shutil
from typing import List, Set, Dict
import sys
import mimetypes
import argparse
import logging
import unicodedata
import concurrent.futures
try:
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
from libmat2 import check_dependencies, UnknownMemberPolicy
except ValueError as e:
print(e)
except ValueError as ex:
print(ex)
sys.exit(1)
__version__ = '0.4.0'
__version__ = '0.13.5'
# Make pyflakes happy
assert Tuple
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
def __print_without_chars(s: str):
""" Remove control characters
We might use 'Cc' instead of 'C', but better safe than sorry
https://www.unicode.org/reports/tr44/#GC_Values_Table
"""
print(''.join(ch for ch in s if not unicodedata.category(ch).startswith('C')))
def __check_file(filename: str, mode: int=os.R_OK) -> bool:
def __check_file(filename: str, mode: int = os.R_OK) -> bool:
if not os.path.exists(filename):
print("[-] %s is doesn't exist." % filename)
__print_without_chars("[-] %s doesn't exist." % filename)
return False
elif not os.path.isfile(filename):
print("[-] %s is not a regular file." % filename)
__print_without_chars("[-] %s is not a regular file." % filename)
return False
elif not os.access(filename, mode):
print("[-] %s is not readable and writeable." % filename)
mode_str: List[str] = list()
if mode & os.R_OK:
mode_str += 'readable'
if mode & os.W_OK:
mode_str += 'writeable'
__print_without_chars("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
return False
return True
def create_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2')
parser.add_argument('files', nargs='*', help='the files to process')
parser.add_argument('-v', '--version', action='version',
version='MAT2 %s' % __version__)
parser.add_argument('-l', '--list', action='store_true',
help='list all supported fileformats')
parser.add_argument('--check-dependencies', action='store_true',
help='check if MAT2 has all the dependencies it needs')
parser.add_argument('-V', '--verbose', action='store_true',
help='show more verbose status information')
parser.add_argument('--unknown-members', metavar='policy', default='abort',
help='how to handle unknown members of archive-style files (policy should' +
' be one of: %s)' % ', '.join(p.value for p in UnknownMemberPolicy))
help='how to handle unknown members of archive-style '
'files (policy should be one of: %s) [Default: abort]' %
', '.join(p.value for p in UnknownMemberPolicy))
parser.add_argument('--inplace', action='store_true',
help='clean in place, without backup')
parser.add_argument('--no-sandbox', dest='sandbox', action='store_false',
default=True, help='Disable bubblewrap\'s sandboxing')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('files', nargs='*', help='the files to process',
default=[])
excl_group.add_argument('-v', '--version', action='version',
version='mat2 %s' % __version__)
excl_group.add_argument('-l', '--list', action='store_true', default=False,
help='list all supported fileformats')
excl_group.add_argument('--check-dependencies', action='store_true',
default=False,
help='check if mat2 has all the dependencies it '
'needs')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
excl_group.add_argument('-s', '--show', action='store_true',
help='list harmful metadata detectable by mat2 '
'without removing them')
info = parser.add_mutually_exclusive_group()
info.add_argument('-s', '--show', action='store_true',
help='list harmful metadata detectable by MAT2 without removing them')
info.add_argument('-L', '--lightweight', action='store_true',
help='remove SOME metadata')
return parser
def show_meta(filename: str):
def show_meta(filename: str, sandbox: bool):
if not __check_file(filename):
return
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when processing %s: %s" % (filename, e))
return
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return
p.sandbox = sandbox
__print_meta(filename, p.get_meta())
print("[+] Metadata for %s:" % filename)
metadata = p.get_meta().items()
def __print_meta(filename: str, metadata: Dict, depth: int = 1):
padding = " " * depth*2
if not metadata:
print(" No metadata found")
__print_without_chars(padding + "No metadata found in %s." % filename)
return
for k, v in metadata:
__print_without_chars("[%s] Metadata for %s:" % ('+'*depth, filename))
for (k, v) in sorted(metadata.items()):
if isinstance(v, dict):
__print_meta(k, v, depth+1)
continue
try: # FIXME this is ugly.
print(" %s: %s" % (k, v))
__print_without_chars(padding + " %s: %s" % (k, v))
except UnicodeEncodeError:
print(" %s: harmful content" % k)
__print_without_chars(padding + " %s: harmful content" % k)
except TypeError:
pass # for things that aren't iterable
def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool:
if not __check_file(filename, os.R_OK|os.W_OK):
def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
policy: UnknownMemberPolicy) -> bool:
mode = (os.R_OK | os.W_OK) if inplace else os.R_OK
if not __check_file(filename, mode):
return False
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when cleaning %s: %s" % (filename, e))
return False
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
p.unknown_member_policy = policy
if is_lightweight:
return p.remove_all_lightweight()
return p.remove_all()
p.lightweight_cleaning = is_lightweight
p.sandbox = sandbox
try:
logging.debug('Cleaning %s…', filename)
ret = p.remove_all()
if ret is True:
shutil.copymode(filename, p.output_filename)
if inplace is True:
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
__print_without_chars("[-] %s can't be cleaned: %s" % (filename, e))
return False
def show_parsers():
print('[+] Supported formats:')
formats = set()
for parser in parser_factory._get_parsers():
formats = set() # Set[str]
for parser in parser_factory._get_parsers(): # type: ignore
for mtype in parser.mimetypes:
extensions = set()
extensions = set() # Set[str]
for extension in mimetypes.guess_all_extensions(mtype):
if extension not in UNSUPPORTED_EXTENSIONS:
extensions.add(extension)
......@@ -109,49 +167,63 @@ def show_parsers():
print('\n'.join(sorted(formats)))
def __get_files_recursively(files: List[str]) -> Generator[str, None, None]:
def __get_files_recursively(files: List[str]) -> List[str]:
ret: Set[str] = set()
for f in files:
if os.path.isdir(f):
for path, _, _files in os.walk(f):
for _f in _files:
fname = os.path.join(path, _f)
if __check_file(fname):
yield fname
ret.add(fname)
elif __check_file(f):
yield f
ret.add(f)
return list(ret)
def main():
def main() -> int:
arg_parser = create_arg_parser()
args = arg_parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
logging.getLogger(__name__).setLevel(logging.DEBUG)
if not args.files:
if args.list:
show_parsers()
return 0
elif args.check_dependencies:
print("Dependencies required for MAT2 %s:" % __version__)
__print_without_chars("Dependencies for mat2 %s:" % __version__)
for key, value in sorted(check_dependencies().items()):
print('- %s: %s' % (key, 'yes' if value else 'no'))
__print_without_chars('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
'(optional)' if not value['required'] else ''))
else:
return arg_parser.print_help()
arg_parser.print_help()
return 0
elif args.show:
for f in __get_files_recursively(args.files):
show_meta(f)
show_meta(f, args.sandbox)
return 0
else:
inplace = args.inplace
policy = UnknownMemberPolicy(args.unknown_members)
if policy == UnknownMemberPolicy.KEEP:
logging.warning('Keeping unknown member files may leak metadata in the resulting file!')
no_failure = True
for f in __get_files_recursively(args.files):
if clean_meta(f, args.lightweight, policy) is False:
no_failure = False
files = __get_files_recursively(args.files)
# We have to use Processes instead of Threads, since
# we're using tempfile.mkdtemp, which isn't thread-safe.
futures = list()
with concurrent.futures.ProcessPoolExecutor() as executor:
for f in files:
future = executor.submit(clean_meta, f, args.lightweight,
inplace, args.sandbox, policy)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
return 0 if no_failure is True else -1
......
#!/usr/bin/env python3
"""
Because writing GUI is non-trivial (cf. https://0xacab.org/jvoisin/mat2/issues/3),
we decided to write a Nautilus extensions instead
(cf. https://0xacab.org/jvoisin/mat2/issues/2).
The code is a little bit convoluted because Gtk isn't thread-safe,
so we're not allowed to call anything Gtk-related outside of the main
thread, so we'll have to resort to using a `queue` to pass "messages" around.
"""
# pylint: disable=no-name-in-module,unused-argument,no-self-use,import-error
import queue
import threading
from typing import Tuple
from urllib.parse import unquote
import gi
gi.require_version('Nautilus', '3.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import Nautilus, GObject, Gtk, Gio, GLib, GdkPixbuf
from libmat2 import parser_factory
# make pyflakes happy
assert Tuple
def _remove_metadata(fpath):
""" This is a simple wrapper around libmat2, because it's
easier and cleaner this way.
"""
parser, mtype = parser_factory.get_parser(fpath)
if parser is None:
return False, mtype
return parser.remove_all(), mtype
class ColumnExtension(GObject.GObject, Nautilus.MenuProvider, Nautilus.LocationWidgetProvider):
""" This class adds an item to the right-clic menu in Nautilus. """
def __init__(self):
super().__init__()
self.infobar_hbox = None
self.infobar = None
self.failed_items = list()
def __infobar_failure(self):
""" Add an hbox to the `infobar` warning about the fact that we didn't
manage to remove the metadata from every single file.
"""
self.infobar.set_show_close_button(True)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
btn = Gtk.Button("Show")
btn.connect("clicked", self.__cb_show_failed)
self.infobar_hbox.pack_end(btn, False, False, 0)
infobar_msg = Gtk.Label("Failed to clean some items")
self.infobar_hbox.pack_start(infobar_msg, False, False, 0)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
def get_widget(self, uri, window):
""" This is the method that we have to implement (because we're
a LocationWidgetProvider) in order to show our infobar.
"""
self.infobar = Gtk.InfoBar()
self.infobar.set_message_type(Gtk.MessageType.ERROR)
self.infobar.connect("response", self.__cb_infobar_response)
return self.infobar
def __cb_infobar_response(self, infobar, response):
""" Callback for the infobar close button.
"""
if response == Gtk.ResponseType.CLOSE:
self.infobar_hbox.destroy()
self.infobar.hide()
def __cb_show_failed(self, button):
""" Callback to show a popup containing a list of files
that we didn't manage to clean.
"""
# FIXME this should be done only once the window is destroyed
self.infobar_hbox.destroy()
self.infobar.hide()
window = Gtk.Window()
headerbar = Gtk.HeaderBar()
window.set_titlebar(headerbar)
headerbar.props.title = "Metadata removal failed"
close_buton = Gtk.Button("Close")
close_buton.connect("clicked", lambda _: window.close())
headerbar.pack_end(close_buton)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
window.add(box)
box.add(self.__create_treeview())
window.show_all()
@staticmethod
def __validate(fileinfo) -> Tuple[bool, str]:
""" Validate if a given file FileInfo `fileinfo` can be processed.
Returns a boolean, and a textreason why"""
if fileinfo.get_uri_scheme() != "file" or fileinfo.is_directory():
return False, "Not a file"
elif not fileinfo.can_write():
return False, "Not writeable"
return True, ""
def __create_treeview(self) -> Gtk.TreeView:
liststore = Gtk.ListStore(GdkPixbuf.Pixbuf, str, str)
treeview = Gtk.TreeView(model=liststore)
renderer_pixbuf = Gtk.CellRendererPixbuf()
column_pixbuf = Gtk.TreeViewColumn("Icon", renderer_pixbuf, pixbuf=0)
treeview.append_column(column_pixbuf)
for idx, name in enumerate(['File', 'Reason']):
renderer_text = Gtk.CellRendererText()
column_text = Gtk.TreeViewColumn(name, renderer_text, text=idx+1)
treeview.append_column(column_text)
for (fname, mtype, reason) in self.failed_items:
# This part is all about adding mimetype icons to the liststore
icon = Gio.content_type_get_icon('text/plain' if not mtype else mtype)
# in case we don't have the corresponding icon,
# we're adding `text/plain`, because we have this one for sure™
names = icon.get_names() + ['text/plain', ]
icon_theme = Gtk.IconTheme.get_default()
for name in names:
try:
img = icon_theme.load_icon(name, Gtk.IconSize.BUTTON, 0)
break
except GLib.GError:
pass
liststore.append([img, fname, reason])
treeview.show_all()
return treeview
def __create_progressbar(self) -> Gtk.ProgressBar:
""" Create the progressbar used to notify that files are currently
being processed.
"""
self.infobar.set_show_close_button(False)
self.infobar.set_message_type(Gtk.MessageType.INFO)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
progressbar = Gtk.ProgressBar()
self.infobar_hbox.pack_start(progressbar, True, True, 0)
progressbar.set_show_text(True)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
return progressbar
def __update_progressbar(self, processing_queue, progressbar) -> bool:
""" This method is run via `Glib.add_idle` to update the progressbar."""
try:
fname = processing_queue.get(block=False)
except queue.Empty:
return True
# `None` is the marker put in the queue to signal that every selected
# file was processed.
if fname is None:
self.infobar_hbox.destroy()
self.infobar.hide()
if len(self.failed_items):
self.__infobar_failure()
if not processing_queue.empty():
print("Something went wrong, the queue isn't empty :/")
return False
progressbar.pulse()
progressbar.set_text("Cleaning %s" % fname)
progressbar.show_all()
self.infobar_hbox.show_all()
self.infobar.show_all()
return True
def __clean_files(self, files: list, processing_queue: queue.Queue) -> bool:
""" This method is threaded in order to avoid blocking the GUI
while cleaning up the files.
"""
for fileinfo in files:
fname = fileinfo.get_name()
processing_queue.put(fname)
valid, reason = self.__validate(fileinfo)
if not valid:
self.failed_items.append((fname, None, reason))
continue
fpath = unquote(fileinfo.get_uri()[7:]) # `len('file://') = 7`
success, mtype = _remove_metadata(fpath)
if not success:
self.failed_items.append((fname, mtype, 'Unsupported/invalid'))
processing_queue.put(None) # signal that we processed all the files
return True
def __cb_menu_activate(self, menu, files):
""" This method is called when the user clicked the "clean metadata"
menu item.
"""
self.failed_items = list()
progressbar = self.__create_progressbar()
progressbar.set_pulse_step = 1.0 / len(files)
self.infobar.show_all()
processing_queue = queue.Queue()
GLib.idle_add(self.__update_progressbar, processing_queue, progressbar)
thread = threading.Thread(target=self.__clean_files, args=(files, processing_queue))
thread.daemon = True
thread.start()
def get_background_items(self, window, file):
""" https://bugzilla.gnome.org/show_bug.cgi?id=784278 """
return None
def get_file_items(self, window, files):
""" This method is the one allowing us to create a menu item.
"""
# Do not show the menu item if not a single file has a chance to be
# processed by mat2.
if not any([is_valid for (is_valid, _) in map(self.__validate, files)]):
return None
item = Nautilus.MenuItem(
name="MAT2::Remove_metadata",
label="Remove metadata",
tip="Remove metadata"
)
item.connect('activate', self.__cb_menu_activate, files)
return [item, ]
[project]
name = "mat2"
version = "0.13.5"
description = "mat2 is a metadata removal tool, supporting a wide range of commonly used file formats, written in python3: at its core, it's a library, used by an eponymous command-line interface, as well as several file manager extensions."
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.9"
dependencies = [
'mutagen',
'PyGObject',
'pycairo',
]
[project.urls]
Repository = "https://0xacab.org/jvoisin/mat2"
Issues = "https://0xacab.org/jvoisin/mat2/-/issues"
Changelog = "https://0xacab.org/jvoisin/mat2/-/blob/master/CHANGELOG.md"
[tool.ruff]
target-version = "py39"
# E501 Line too long
ignore = ["E501", "F401", "E402", "E722"]
import setuptools
with open("README.md", "r") as fh:
with open("README.md", encoding='utf-8') as fh:
long_description = fh.read()
setuptools.setup(
name="mat2",
version='0.4.0',
version='0.13.5',
author="Julien (jvoisin) Voisin",
author_email="julien.voisin+mat2@dustri.org",
description="A handy tool to trash your metadata",
......@@ -20,6 +20,7 @@ setuptools.setup(
'pycairo',
],
packages=setuptools.find_packages(exclude=('tests', )),
data_files = [('share/man/man1', ['doc/mat2.1'])],
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
......
File added
tests/data/control_chars.jpg

1.88 KiB

File added
File added
/**
* This is my super css framework
* version: 1.0
* author : jvoisin
*/
body {
color: red;
background-color: blue;
}
.underline {
text-decoration: underline; /* underline is cool */
}
File added
No preview for this file type
tests/data/dirty.gif

1.1 KiB

File added