Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tguinot/mat2
  • jvoisin/mat2
  • dachary/mat2
  • mejo-/mat2
  • LogicalDash/mat2
  • dkg/mat2
  • christian/mat2
  • Selflike323/mat2
  • fz/mat2
  • iwwmidatlanticgdc/mat2
  • Gu1nn3zz/mat2
  • smagnin/mat2
  • flashcode/mat2
  • MANCASTILLEJA/mat2
  • jboursier/mat2
  • tails/mat2
  • matiargs/mat2
  • Brolf/mat2
  • madaidan/mat2
  • Delmer84/mat2
  • yuebyzua/mat2
  • yyyyyyyan/mat2
  • rmnvgr/mat2
  • Marxism-Leninism/mat2
  • GNUtoo/mat2
  • allexj/mat2
  • b068931cc450442b63f5b3d276ea4297/mat2
  • chenrui/mat2
  • nosec13346/mat2
  • anelki/mat2
30 results
Show changes
import logging
from typing import Union, Tuple, Dict
from typing import Union, Dict, List, Tuple
from . import abstract
......@@ -15,7 +15,7 @@ class TorrentParser(abstract.AbstractParser):
if self.dict_repr is None:
raise ValueError
def get_meta(self) -> Dict[str, Union[str, dict]]:
def get_meta(self) -> Dict[str, Union[str, Dict]]:
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.allowlist:
......@@ -76,7 +76,7 @@ class _BencodeHandler:
s = s[1:]
return s[colon:colon+str_len], s[colon+str_len:]
def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
def __decode_list(self, s: bytes) -> Tuple[List, bytes]:
ret = list()
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
......@@ -84,7 +84,7 @@ class _BencodeHandler:
ret.append(value)
return ret, s[1:]
def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
def __decode_dict(self, s: bytes) -> Tuple[Dict, bytes]:
ret = dict()
s = s[1:] # skip leading `d`
while s[0] != ord(b'e'):
......@@ -113,10 +113,10 @@ class _BencodeHandler:
ret += self.__encode_func[type(value)](value)
return b'd' + ret + b'e'
def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
def bencode(self, s: Union[Dict, List, bytes, int]) -> bytes:
return self.__encode_func[type(s)](s)
def bdecode(self, s: bytes) -> Union[dict, None]:
def bdecode(self, s: bytes) -> Union[Dict, None]:
try:
ret, trail = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e:
......
......@@ -3,7 +3,7 @@ import functools
import shutil
import logging
from typing import Dict, Union
from typing import Union, Dict
from . import exiftool
from . import bubblewrap
......@@ -12,7 +12,7 @@ from . import bubblewrap
class AbstractFFmpegParser(exiftool.ExiftoolParser):
""" Abstract parser for all FFmpeg-based ones, mainly for video. """
# Some fileformats have mandatory metadata fields
meta_key_value_allowlist = {} # type: Dict[str, Union[str, int]]
meta_key_value_allowlist: Dict[str, Union[str, int]] = dict()
def remove_all(self) -> bool:
if self.meta_key_value_allowlist:
......@@ -45,12 +45,12 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser):
return False
return True
def get_meta(self) -> Dict[str, Union[str, dict]]:
def get_meta(self) -> Dict[str, Union[str, Dict]]:
meta = super().get_meta()
ret = dict() # type: Dict[str, Union[str, dict]]
ret: Dict[str, Union[str, Dict]] = dict()
for key, value in meta.items():
if key in self.meta_key_value_allowlist.keys():
if key in self.meta_key_value_allowlist:
if value == self.meta_key_value_allowlist[key]:
continue
ret[key] = value
......@@ -91,11 +91,11 @@ class AVIParser(AbstractFFmpegParser):
'VideoFrameRate', 'VideoFrameCount', 'Quality',
'SampleSize', 'BMPVersion', 'ImageWidth', 'ImageHeight',
'Planes', 'BitDepth', 'Compression', 'ImageLength',
'PixelsPerMeterX', 'PixelsPerMeterY', 'NumColors',
'NumImportantColors', 'NumColors', 'NumImportantColors',
'PixelsPerMeterX', 'PixelsPerMeterY',
'NumImportantColors', 'NumColors',
'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask',
'ColorSpace', 'AudioCodec', 'AudioCodecRate',
'AudioSampleCount', 'AudioSampleCount',
'AudioSampleCount',
'AudioSampleRate', 'Encoding', 'NumChannels',
'SampleRate', 'AvgBytesPerSec', 'BitsPerSample',
'Duration', 'ImageSize', 'Megapixels'}
......@@ -135,7 +135,7 @@ class MP4Parser(AbstractFFmpegParser):
}
@functools.lru_cache()
@functools.lru_cache(maxsize=None)
def _get_ffmpeg_path() -> str: # pragma: no cover
which_path = shutil.which('ffmpeg')
if which_path:
......
from html import parser, escape
from typing import Dict, Any, List, Tuple, Set, Optional
from typing import Any, Optional, Dict, List, Tuple, Set
import re
import string
from . import abstract
assert Set
# pylint: disable=too-many-instance-attributes
......@@ -21,7 +20,7 @@ class CSSParser(abstract.AbstractParser):
content = f.read()
except UnicodeDecodeError: # pragma: no cover
raise ValueError
cleaned = re.sub(r'/\*.*?\*/', '', content, 0, self.flags)
cleaned = re.sub(r'/\*.*?\*/', '', content, count=0, flags=self.flags)
with open(self.output_filename, 'w', encoding='utf-8') as f:
f.write(cleaned)
return True
......@@ -45,10 +44,10 @@ class CSSParser(abstract.AbstractParser):
class AbstractHTMLParser(abstract.AbstractParser):
tags_blocklist = set() # type: Set[str]
tags_blocklist: Set[str] = set()
# In some html/xml-based formats some tags are mandatory,
# so we're keeping them, but are discarding their content
tags_required_blocklist = set() # type: Set[str]
tags_required_blocklist: Set[str] = set()
def __init__(self, filename):
super().__init__(filename)
......@@ -92,7 +91,7 @@ class _HTMLParser(parser.HTMLParser):
self.filename = filename
self.__textrepr = ''
self.__meta = {}
self.__validation_queue = [] # type: List[str]
self.__validation_queue: List[str] = list()
# We're using counters instead of booleans, to handle nested tags
self.__in_dangerous_but_required_tag = 0
......
......@@ -2,7 +2,7 @@
import os
import shutil
from typing import Tuple, List, Union, Set
from typing import List, Set, Dict
import sys
import mimetypes
import argparse
......@@ -13,34 +13,35 @@ import concurrent.futures
try:
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
from libmat2 import check_dependencies, UnknownMemberPolicy
except ValueError as e:
print(e)
except ValueError as ex:
print(ex)
sys.exit(1)
__version__ = '0.11.0'
# Make pyflakes happy
assert Set
assert Tuple
assert Union
__version__ = '0.13.5'
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING)
def __print_without_chars(s: str):
""" Remove control characters
We might use 'Cc' instead of 'C', but better safe than sorry
https://www.unicode.org/reports/tr44/#GC_Values_Table
"""
print(''.join(ch for ch in s if not unicodedata.category(ch).startswith('C')))
def __check_file(filename: str, mode: int = os.R_OK) -> bool:
if not os.path.exists(filename):
print("[-] %s doesn't exist." % filename)
__print_without_chars("[-] %s doesn't exist." % filename)
return False
elif not os.path.isfile(filename):
print("[-] %s is not a regular file." % filename)
__print_without_chars("[-] %s is not a regular file." % filename)
return False
elif not os.access(filename, mode):
mode_str = [] # type: List[str]
mode_str: List[str] = list()
if mode & os.R_OK:
mode_str += 'readable'
if mode & os.W_OK:
mode_str += 'writeable'
print("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
__print_without_chars("[-] %s is not %s." % (filename, 'nor '.join(mode_str)))
return False
return True
......@@ -56,8 +57,8 @@ def create_arg_parser() -> argparse.ArgumentParser:
', '.join(p.value for p in UnknownMemberPolicy))
parser.add_argument('--inplace', action='store_true',
help='clean in place, without backup')
parser.add_argument('--no-sandbox', dest='sandbox', action='store_true',
default=False, help='Disable bubblewrap\'s sandboxing')
parser.add_argument('--no-sandbox', dest='sandbox', action='store_false',
default=True, help='Disable bubblewrap\'s sandboxing')
excl_group = parser.add_mutually_exclusive_group()
excl_group.add_argument('files', nargs='*', help='the files to process',
......@@ -85,39 +86,37 @@ def show_meta(filename: str, sandbox: bool):
if not __check_file(filename):
return
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when processing %s: %s" % (filename, e))
return
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return
p.sandbox = sandbox
__print_meta(filename, p.get_meta())
def __print_meta(filename: str, metadata: dict, depth: int = 1):
def __print_meta(filename: str, metadata: Dict, depth: int = 1):
padding = " " * depth*2
if not metadata:
print(padding + "No metadata found in %s." % filename)
__print_without_chars(padding + "No metadata found in %s." % filename)
return
print("[%s] Metadata for %s:" % ('+'*depth, filename))
__print_without_chars("[%s] Metadata for %s:" % ('+'*depth, filename))
for (k, v) in sorted(metadata.items()):
if isinstance(v, dict):
__print_meta(k, v, depth+1)
continue
# Remove control characters
# We might use 'Cc' instead of 'C', but better safe than sorry
# https://www.unicode.org/reports/tr44/#GC_Values_Table
try:
v = ''.join(ch for ch in v if not unicodedata.category(ch).startswith('C'))
except TypeError:
pass # for things that aren't iterable
try: # FIXME this is ugly.
print(padding + " %s: %s" % (k, v))
__print_without_chars(padding + " %s: %s" % (k, v))
except UnicodeEncodeError:
print(padding + " %s: harmful content" % k)
__print_without_chars(padding + " %s: harmful content" % k)
except TypeError:
pass # for things that aren't iterable
def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool,
......@@ -126,9 +125,13 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool
if not __check_file(filename, mode):
return False
p, mtype = parser_factory.get_parser(filename) # type: ignore
try:
p, mtype = parser_factory.get_parser(filename) # type: ignore
except ValueError as e:
__print_without_chars("[-] something went wrong when cleaning %s: %s" % (filename, e))
return False
if p is None:
print("[-] %s's format (%s) is not supported" % (filename, mtype))
__print_without_chars("[-] %s's format (%s) is not supported" % (filename, mtype))
return False
p.unknown_member_policy = policy
p.lightweight_cleaning = is_lightweight
......@@ -143,7 +146,7 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool
os.rename(p.output_filename, filename)
return ret
except RuntimeError as e:
print("[-] %s can't be cleaned: %s" % (filename, e))
__print_without_chars("[-] %s can't be cleaned: %s" % (filename, e))
return False
......@@ -165,7 +168,7 @@ def show_parsers():
def __get_files_recursively(files: List[str]) -> List[str]:
ret = set() # type: Set[str]
ret: Set[str] = set()
for f in files:
if os.path.isdir(f):
for path, _, _files in os.walk(f):
......@@ -183,16 +186,16 @@ def main() -> int:
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger(__name__).setLevel(logging.DEBUG)
if not args.files:
if args.list:
show_parsers()
return 0
elif args.check_dependencies:
print("Dependencies for mat2 %s:" % __version__)
__print_without_chars("Dependencies for mat2 %s:" % __version__)
for key, value in sorted(check_dependencies().items()):
print('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
__print_without_chars('- %s: %s %s' % (key, 'yes' if value['found'] else 'no',
'(optional)' if not value['required'] else ''))
else:
arg_parser.print_help()
......@@ -213,14 +216,14 @@ def main() -> int:
files = __get_files_recursively(args.files)
# We have to use Processes instead of Threads, since
# we're using tempfile.mkdtemp, which isn't thread-safe.
futures = list()
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = list()
for f in files:
future = executor.submit(clean_meta, f, args.lightweight,
inplace, args.sandbox, policy)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
for future in concurrent.futures.as_completed(futures):
no_failure &= future.result()
return 0 if no_failure is True else -1
......
# mat2's Nautilus extension
# Dependencies
- Nautilus (now known as [Files](https://wiki.gnome.org/action/show/Apps/Files))
- [nautilus-python](https://gitlab.gnome.org/GNOME/nautilus-python) >= 2.10
# Installation
Simply copy the `mat2.py` file to `~/.local/share/nautilus-python/extensions`,
and launch Nautilus; you should now have a "Remove metadata" item in the
right-click menu on supported files.
Please note: This is not needed if using a distribution provided package. It
only applies if installing from source.
#!/usr/bin/env python3
"""
Because writing GUI is non-trivial (cf. https://0xacab.org/jvoisin/mat2/issues/3),
we decided to write a Nautilus extension instead
(cf. https://0xacab.org/jvoisin/mat2/issues/2).
The code is a little bit convoluted because Gtk isn't thread-safe,
so we're not allowed to call anything Gtk-related outside of the main
thread, so we'll have to resort to using a `queue` to pass "messages" around.
"""
# pylint: disable=no-name-in-module,unused-argument,no-self-use,import-error
import queue
import threading
from typing import Tuple, Optional, List
from urllib.parse import unquote
import gi
gi.require_version('Nautilus', '3.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import Nautilus, GObject, Gtk, Gio, GLib, GdkPixbuf
from libmat2 import parser_factory
def _remove_metadata(fpath) -> Tuple[bool, Optional[str]]:
""" This is a simple wrapper around libmat2, because it's
easier and cleaner this way.
"""
parser, mtype = parser_factory.get_parser(fpath)
if parser is None:
return False, mtype
return parser.remove_all(), mtype
class Mat2Extension(GObject.GObject, Nautilus.MenuProvider, Nautilus.LocationWidgetProvider):
""" This class adds an item to the right-click menu in Nautilus. """
def __init__(self):
super().__init__()
self.infobar_hbox = None
self.infobar = None
self.failed_items = list()
def __infobar_failure(self):
""" Add an hbox to the `infobar` warning about the fact that we didn't
manage to remove the metadata from every single file.
"""
self.infobar.set_show_close_button(True)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
btn = Gtk.Button("Show")
btn.connect("clicked", self.__cb_show_failed)
self.infobar_hbox.pack_end(btn, False, False, 0)
infobar_msg = Gtk.Label("Failed to clean some items")
self.infobar_hbox.pack_start(infobar_msg, False, False, 0)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
def get_widget(self, uri, window) -> Gtk.Widget:
""" This is the method that we have to implement (because we're
a LocationWidgetProvider) in order to show our infobar.
"""
self.infobar = Gtk.InfoBar()
self.infobar.set_message_type(Gtk.MessageType.ERROR)
self.infobar.connect("response", self.__cb_infobar_response)
return self.infobar
def __cb_infobar_response(self, infobar, response):
""" Callback for the infobar close button.
"""
if response == Gtk.ResponseType.CLOSE:
self.infobar_hbox.destroy()
self.infobar.hide()
def __cb_show_failed(self, button):
""" Callback to show a popup containing a list of files
that we didn't manage to clean.
"""
# FIXME this should be done only once the window is destroyed
self.infobar_hbox.destroy()
self.infobar.hide()
window = Gtk.Window()
headerbar = Gtk.HeaderBar()
window.set_titlebar(headerbar)
headerbar.props.title = "Metadata removal failed"
close_buton = Gtk.Button("Close")
close_buton.connect("clicked", lambda _: window.close())
headerbar.pack_end(close_buton)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
window.add(box)
box.add(self.__create_treeview())
window.show_all()
@staticmethod
def __validate(fileinfo) -> Tuple[bool, str]:
""" Validate if a given file FileInfo `fileinfo` can be processed.
Returns a boolean, and a textreason why"""
if fileinfo.get_uri_scheme() != "file" or fileinfo.is_directory():
return False, "Not a file"
elif not fileinfo.can_write():
return False, "Not writeable"
return True, ""
def __create_treeview(self) -> Gtk.TreeView:
liststore = Gtk.ListStore(GdkPixbuf.Pixbuf, str, str)
treeview = Gtk.TreeView(model=liststore)
renderer_pixbuf = Gtk.CellRendererPixbuf()
column_pixbuf = Gtk.TreeViewColumn("Icon", renderer_pixbuf, pixbuf=0)
treeview.append_column(column_pixbuf)
for idx, name in enumerate(['File', 'Reason']):
renderer_text = Gtk.CellRendererText()
column_text = Gtk.TreeViewColumn(name, renderer_text, text=idx+1)
treeview.append_column(column_text)
for (fname, mtype, reason) in self.failed_items:
# This part is all about adding mimetype icons to the liststore
icon = Gio.content_type_get_icon('text/plain' if not mtype else mtype)
# in case we don't have the corresponding icon,
# we're adding `text/plain`, because we have this one for sure™
names = icon.get_names() + ['text/plain', ]
icon_theme = Gtk.IconTheme.get_default()
for name in names:
try:
img = icon_theme.load_icon(name, Gtk.IconSize.BUTTON, 0)
break
except GLib.GError:
pass
liststore.append([img, fname, reason])
treeview.show_all()
return treeview
def __create_progressbar(self) -> Gtk.ProgressBar:
""" Create the progressbar used to notify that files are currently
being processed.
"""
self.infobar.set_show_close_button(False)
self.infobar.set_message_type(Gtk.MessageType.INFO)
self.infobar_hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
progressbar = Gtk.ProgressBar()
self.infobar_hbox.pack_start(progressbar, True, True, 0)
progressbar.set_show_text(True)
self.infobar.get_content_area().pack_start(self.infobar_hbox, True, True, 0)
self.infobar.show_all()
return progressbar
def __update_progressbar(self, processing_queue, progressbar) -> bool:
""" This method is run via `Glib.add_idle` to update the progressbar."""
try:
fname = processing_queue.get(block=False)
except queue.Empty:
return True
# `None` is the marker put in the queue to signal that every selected
# file was processed.
if fname is None:
self.infobar_hbox.destroy()
self.infobar.hide()
if self.failed_items:
self.__infobar_failure()
if not processing_queue.empty():
print("Something went wrong, the queue isn't empty :/")
return False
progressbar.pulse()
progressbar.set_text("Cleaning %s" % fname)
progressbar.show_all()
self.infobar_hbox.show_all()
self.infobar.show_all()
return True
def __clean_files(self, files: list, processing_queue: queue.Queue) -> bool:
""" This method is threaded in order to avoid blocking the GUI
while cleaning up the files.
"""
for fileinfo in files:
fname = fileinfo.get_name()
processing_queue.put(fname)
valid, reason = self.__validate(fileinfo)
if not valid:
self.failed_items.append((fname, None, reason))
continue
fpath = unquote(fileinfo.get_uri()[7:]) # `len('file://') = 7`
success, mtype = _remove_metadata(fpath)
if not success:
self.failed_items.append((fname, mtype, 'Unsupported/invalid'))
processing_queue.put(None) # signal that we processed all the files
return True
def __cb_menu_activate(self, menu, files):
""" This method is called when the user clicked the "clean metadata"
menu item.
"""
self.failed_items = list()
progressbar = self.__create_progressbar()
progressbar.set_pulse_step = 1.0 / len(files)
self.infobar.show_all()
processing_queue = queue.Queue()
GLib.idle_add(self.__update_progressbar, processing_queue, progressbar)
thread = threading.Thread(target=self.__clean_files, args=(files, processing_queue))
thread.daemon = True
thread.start()
def get_background_items(self, window, file):
""" https://bugzilla.gnome.org/show_bug.cgi?id=784278 """
return None
def get_file_items(self, window, files) -> Optional[List[Nautilus.MenuItem]]:
""" This method is the one allowing us to create a menu item.
"""
# Do not show the menu item if not a single file has a chance to be
# processed by mat2.
if not any([is_valid for (is_valid, _) in map(self.__validate, files)]):
return None
item = Nautilus.MenuItem(
name="mat2::Remove_metadata",
label="Remove metadata",
tip="Remove metadata"
)
item.connect('activate', self.__cb_menu_activate, files)
return [item, ]
[project]
name = "mat2"
version = "0.13.5"
description = "mat2 is a metadata removal tool, supporting a wide range of commonly used file formats, written in python3: at its core, it's a library, used by an eponymous command-line interface, as well as several file manager extensions."
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.9"
dependencies = [
'mutagen',
'PyGObject',
'pycairo',
]
[project.urls]
Repository = "https://0xacab.org/jvoisin/mat2"
Issues = "https://0xacab.org/jvoisin/mat2/-/issues"
Changelog = "https://0xacab.org/jvoisin/mat2/-/blob/master/CHANGELOG.md"
[tool.ruff]
target-version = "py39"
# E501 Line too long
ignore = ["E501", "F401", "E402", "E722"]
......@@ -5,7 +5,7 @@ with open("README.md", encoding='utf-8') as fh:
setuptools.setup(
name="mat2",
version='0.11.0',
version='0.13.5',
author="Julien (jvoisin) Voisin",
author_email="julien.voisin+mat2@dustri.org",
description="A handy tool to trash your metadata",
......@@ -20,6 +20,7 @@ setuptools.setup(
'pycairo',
],
packages=setuptools.find_packages(exclude=('tests', )),
data_files = [('share/man/man1', ['doc/mat2.1'])],
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
......
File added
File added
File added
tests/data/dirty.webp

37.6 KiB

File deleted
import mimetypes
import os
import sys
sys.path.append('..')
import atheris
with atheris.instrument_imports(enable_loader_override=False):
from libmat2 import parser_factory, UNSUPPORTED_EXTENSIONS
extensions = set()
for parser in parser_factory._get_parsers(): # type: ignore
for mtype in parser.mimetypes:
if mtype.startswith('video'):
continue
if 'aif' in mtype:
continue
if 'wav' in mtype:
continue
if 'gif' in mtype:
continue
if 'aifc' in mtype:
continue
for extension in mimetypes.guess_all_extensions(mtype):
if extension not in UNSUPPORTED_EXTENSIONS:
extensions.add(extension)
extensions = list(extensions)
def TestOneInput(data):
fdp = atheris.FuzzedDataProvider(data)
extension = fdp.PickValueInList(extensions)
data = fdp.ConsumeBytes(sys.maxsize)
fname = '/tmp/mat2_fuzz' + extension
with open(fname, 'wb') as f:
f.write(data)
try:
p, _ = parser_factory.get_parser(fname)
if p:
p.sandbox = False
p.get_meta()
p.remove_all()
p, _ = parser_factory.get_parser(fname)
p.get_meta()
except ValueError:
pass
os.remove(fname)
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
import sys
import random
import os
import shutil
......@@ -29,7 +28,7 @@ class TestHelp(unittest.TestCase):
self.assertIn(b' [-v] [-l]', stdout)
self.assertIn(b'[--check-dependencies]', stdout)
self.assertIn(b'[-L | -s]', stdout)
self.assertIn(b'[files [files ...]]', stdout)
self.assertIn(b'[files ...]', stdout)
def test_no_arg(self):
proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE)
......@@ -39,7 +38,7 @@ class TestHelp(unittest.TestCase):
self.assertIn(b'[--inplace]', stdout)
self.assertIn(b'[--no-sandbox]', stdout)
self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout)
self.assertIn(b'[files [files ...]]', stdout)
self.assertIn(b'[files ...]', stdout)
class TestVersion(unittest.TestCase):
......@@ -237,6 +236,11 @@ class TestGetMeta(unittest.TestCase):
self.assertIn(b'i am a : various comment', stdout)
self.assertIn(b'artist: jvoisin', stdout)
def test_webp(self):
proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/dirty.webp'],
stdout=subprocess.PIPE)
stdout, _ = proc.communicate()
self.assertIn(b'Warning: [minor] Improper EXIF header', stdout)
class TestControlCharInjection(unittest.TestCase):
def test_jpg(self):
......@@ -267,14 +271,7 @@ class TestCommandLineParallel(unittest.TestCase):
def test_different(self):
src = './tests/data/'
dst = './tests/data/parallel'
if sys.version_info >= (3, 8):
with os.scandir(src) as itr:
entries = list(itr)
shutil._copytree(entries=entries, src=src, dst=dst, symlinks=False,
ignore=None, copy_function=shutil.copy2,
ignore_dangling_symlinks=False)
else:
shutil.copytree(src, dst)
shutil.copytree(src, dst)
proc = subprocess.Popen(mat2_binary + glob.glob('./tests/data/parallel/dirty.*'),
stdout=subprocess.PIPE)
......@@ -286,7 +283,7 @@ class TestCommandLineParallel(unittest.TestCase):
self.assertIsNotNone(p)
p = parser_factory.get_parser(p.output_filename)
self.assertEqual(p.get_meta(), {})
shutil.rmtree('./tests/data/parallel')
shutil.rmtree('./tests/data/parallel/')
def test_faulty(self):
for i in range(self.iterations):
......
......@@ -14,7 +14,7 @@ from libmat2 import harmless, video, web, archive
# No need to logging messages, should something go wrong,
# the testsuite _will_ fail.
logger = logging.getLogger()
logger = logging.getLogger(__name__)
logger.setLevel(logging.FATAL)
......@@ -65,8 +65,10 @@ class TestCorruptedEmbedded(unittest.TestCase):
def test_docx(self):
shutil.copy('./tests/data/embedded_corrupted.docx', './tests/data/clean.docx')
parser, _ = parser_factory.get_parser('./tests/data/clean.docx')
self.assertFalse(parser.remove_all())
self.assertIsNotNone(parser.get_meta())
with self.assertRaises(ValueError):
parser.remove_all()
with self.assertRaises(ValueError):
self.assertIsNotNone(parser.get_meta())
os.remove('./tests/data/clean.docx')
def test_odt(self):
......@@ -120,8 +122,8 @@ class TestCorruptedFiles(unittest.TestCase):
def test_png2(self):
shutil.copy('./tests/test_libmat2.py', './tests/clean.png')
parser, _ = parser_factory.get_parser('./tests/clean.png')
self.assertIsNone(parser)
with self.assertRaises(ValueError):
parser_factory.get_parser('./tests/clean.png')
os.remove('./tests/clean.png')
def test_torrent(self):
......@@ -237,10 +239,10 @@ class TestCorruptedFiles(unittest.TestCase):
zout.write('./tests/data/embedded_corrupted.docx')
p, mimetype = parser_factory.get_parser('./tests/data/clean.zip')
self.assertEqual(mimetype, 'application/zip')
meta = p.get_meta()
self.assertEqual(meta['tests/data/dirty.flac']['comments'], 'Thank you for using MAT !')
self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
self.assertFalse(p.remove_all())
with self.assertRaises(ValueError):
p.get_meta()
with self.assertRaises(ValueError):
self.assertFalse(p.remove_all())
os.remove('./tests/data/clean.zip')
def test_html(self):
......@@ -315,10 +317,10 @@ class TestCorruptedFiles(unittest.TestCase):
zout.addfile(tarinfo, f)
p, mimetype = parser_factory.get_parser('./tests/data/clean.tar')
self.assertEqual(mimetype, 'application/x-tar')
meta = p.get_meta()
self.assertEqual(meta['./tests/data/dirty.flac']['comments'], 'Thank you for using MAT !')
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
self.assertFalse(p.remove_all())
with self.assertRaises(ValueError):
p.get_meta()
with self.assertRaises(ValueError):
self.assertFalse(p.remove_all())
os.remove('./tests/data/clean.tar')
shutil.copy('./tests/data/dirty.png', './tests/data/clean.tar')
......
......@@ -113,6 +113,11 @@ class TestGetMeta(unittest.TestCase):
meta = p.get_meta()
self.assertEqual(meta['Comment'], 'Created with GIMP')
def test_webp(self):
p = images.WEBPParser('./tests/data/dirty.webp')
meta = p.get_meta()
self.assertEqual(meta['Warning'], '[minor] Improper EXIF header')
def test_ppm(self):
p = images.PPMParser('./tests/data/dirty.ppm')
meta = p.get_meta()
......@@ -175,14 +180,30 @@ class TestGetMeta(unittest.TestCase):
def test_zip(self):
with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout:
zout.write('./tests/data/dirty.flac')
zout.write('./tests/data/dirty.docx')
zout.write('./tests/data/dirty.jpg')
zout.write('./tests/data/dirty.flac',
compress_type = zipfile.ZIP_STORED)
zout.write('./tests/data/dirty.docx',
compress_type = zipfile.ZIP_DEFLATED)
zout.write('./tests/data/dirty.jpg',
compress_type = zipfile.ZIP_BZIP2)
zout.write('./tests/data/dirty.txt',
compress_type = zipfile.ZIP_LZMA)
p, mimetype = parser_factory.get_parser('./tests/data/dirty.zip')
self.assertEqual(mimetype, 'application/zip')
meta = p.get_meta()
self.assertEqual(meta['tests/data/dirty.flac']['comments'], 'Thank you for using MAT !')
self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
with zipfile.ZipFile('./tests/data/dirty.zip') as zipin:
members = {
'tests/data/dirty.flac' : zipfile.ZIP_STORED,
'tests/data/dirty.docx': zipfile.ZIP_DEFLATED,
'tests/data/dirty.jpg' : zipfile.ZIP_BZIP2,
'tests/data/dirty.txt' : zipfile.ZIP_LZMA,
}
for k, v in members.items():
self.assertEqual(zipin.getinfo(k).compress_type, v)
os.remove('./tests/data/dirty.zip')
def test_wmv(self):
......@@ -230,6 +251,17 @@ class TestGetMeta(unittest.TestCase):
p = images.SVGParser('./tests/data/weird.svg')
self.assertEqual(p.get_meta()['Xmlns'], 'http://www.w3.org/1337/svg')
def test_aiff(self):
p = audio.AIFFParser('./tests/data/dirty.aiff')
meta = p.get_meta()
self.assertEqual(meta['Name'], 'I am so')
def test_heic(self):
p = images.HEICParser('./tests/data/dirty.heic')
meta = p.get_meta()
self.assertEqual(meta['ProfileCopyright'], 'Public Domain')
self.assertEqual(meta['ProfileDescription'], 'GIMP built-in sRGB')
class TestRemovingThumbnails(unittest.TestCase):
def test_odt(self):
......@@ -306,12 +338,23 @@ class TestCleaning(unittest.TestCase):
'parser': images.JPGParser,
'meta': {'Comment': 'Created with GIMP'},
'expected_meta': {},
}, {
'name': 'webp',
'parser': images.WEBPParser,
'meta': {'Warning': '[minor] Improper EXIF header'},
'expected_meta': {},
}, {
'name': 'wav',
'parser': audio.WAVParser,
'meta': {'Comment': 'Zomg, a comment!'},
'expected_meta': {},
}, {
'name': 'aiff',
'parser': audio.AIFFParser,
'meta': {'Annotation': 'Thank you for using MAT !'},
'expected_meta': {},
},
{
'name': 'mp3',
'parser': audio.MP3Parser,
'meta': {'TXXX:I am a': 'various comment'},
......@@ -402,7 +445,7 @@ class TestCleaning(unittest.TestCase):
'name': 'gif',
'parser': images.GIFParser,
'meta': {'Comment': 'this is a test comment'},
'expected_meta': {},
'expected_meta': {'TransparentColor': '5'},
},{
'name': 'css',
'parser': web.CSSParser,
......@@ -418,7 +461,10 @@ class TestCleaning(unittest.TestCase):
'meta': {
'WorkDescription': "This is a test svg image for mat2's testsuite",
},
'expected_meta': {},
'expected_meta': {
'ImageSize': '128x128',
'Megapixels': '0.016',
},
} ,{
'name': 'ppm',
'parser': images.PPMParser,
......@@ -442,16 +488,25 @@ class TestCleaning(unittest.TestCase):
'Encoder': 'HandBrake 0.9.4 2009112300',
},
'expected_meta': {
'AverageBitrate': 465641,
'BufferSize': 0,
'ColorPrimaries': 'BT.709',
'ColorProfiles': 'nclx',
'ColorRepresentation': 'nclx 1 1 1',
'CompatibleBrands': ['isom', 'iso2', 'avc1', 'mp41'],
'CompressorID': 'avc1',
'CompressorName': 'JVT/AVC Coding',
'GraphicsMode': 'srcCopy',
'HandlerDescription': 'SoundHandler',
'HandlerType': 'Metadata',
'HandlerVendorID': 'Apple',
'MajorBrand': 'MP4 Base Media v1 [IS0 14496-12:2003]',
'MajorBrand': 'Base Media v1 [IS0 14496-12:2003]',
'MatrixCoefficients': 'BT.709',
'MaxBitrate': 465641,
'MediaDataOffset': 48,
'MediaDataSize': 379872,
'MediaHeaderVersion': 0,
'MediaLanguageCode': 'eng',
'MinorVersion': '0.2.0',
'MovieDataOffset': 48,
'MovieHeaderVersion': 0,
......@@ -461,7 +516,13 @@ class TestCleaning(unittest.TestCase):
'TimeScale': 1000,
'TrackHeaderVersion': 0,
'TrackID': 1,
'TrackLayer': 0},
'TrackLayer': 0,
'TransferCharacteristics': 'BT.709',
'VideoFullRangeFlag': 'Limited',
},
'extra_expected_meta': {
'VideoFullRangeFlag': 0,
}
},{
'name': 'wmv',
'ffmpeg': 1,
......@@ -470,44 +531,60 @@ class TestCleaning(unittest.TestCase):
'EncodingSettings': 'Lavf52.103.0',
},
'expected_meta': {},
},{
'name': 'heic',
'parser': images.HEICParser,
'meta': {},
'expected_meta': {
'ExifByteOrder': 'Big-endian (Motorola, MM)',
'Warning': 'Bad IFD0 directory',
},
}
]
def test_all_parametred(self):
for case in self.data:
if 'ffmpeg' in case:
try:
video._get_ffmpeg_path()
except RuntimeError:
raise unittest.SkipTest
print('[+] Testing %s' % case['name'])
target = './tests/data/clean.' + case['name']
shutil.copy('./tests/data/dirty.' + case['name'], target)
p1 = case['parser'](target)
for k, v in p1.get_meta().items():
if k not in case['meta']:
continue
if isinstance(v, dict):
for _k, _v in v.items():
if _k in case['meta'][k]:
self.assertEqual(_v, case['meta'][k][_k])
else:
self.assertEqual(v, case['meta'][k])
p1.lightweight_cleaning = True
self.assertTrue(p1.remove_all())
p2 = case['parser'](p1.output_filename)
for k, v in p2.get_meta().items():
self.assertIn(k, case['expected_meta'])
self.assertEqual(v, case['expected_meta'][k])
self.assertTrue(p2.remove_all())
os.remove(target)
os.remove(p1.output_filename)
os.remove(p2.output_filename)
with self.subTest(case=case):
if 'ffmpeg' in case:
try:
video._get_ffmpeg_path()
except RuntimeError:
raise unittest.SkipTest
print('[+] Testing %s' % case['name'])
target = './tests/data/clean.' + case['name']
shutil.copy('./tests/data/dirty.' + case['name'], target)
p1 = case['parser'](target)
for k, v in p1.get_meta().items():
if k not in case['meta']:
continue
if isinstance(v, dict):
for _k, _v in v.items():
if _k in case['meta'][k]:
self.assertEqual(_v, case['meta'][k][_k])
else:
self.assertEqual(v, case['meta'][k])
p1.lightweight_cleaning = True
self.assertTrue(p1.remove_all())
p2 = case['parser'](p1.output_filename)
meta = p2.get_meta()
if meta:
for k, v in p2.get_meta().items():
self.assertIn(k, case['expected_meta'], '"%s" is not in "%s" (%s)' % (k, case['expected_meta'], case['name']))
if str(case['expected_meta'][k]) in str(v):
continue
if 'extra_expected_meta' in case and k in case['extra_expected_meta']:
if str(case['extra_expected_meta'][k]) in str(v):
continue
self.assertTrue(False, "got a different value (%s) than excepted (%s) for %s" % (str(v), meta, k))
self.assertTrue(p2.remove_all())
os.remove(target)
os.remove(p1.output_filename)
os.remove(p2.output_filename)
def test_html(self):
......@@ -580,9 +657,14 @@ class TestCleaning(unittest.TestCase):
class TestCleaningArchives(unittest.TestCase):
def test_zip(self):
with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout:
zout.write('./tests/data/dirty.flac')
zout.write('./tests/data/dirty.docx')
zout.write('./tests/data/dirty.jpg')
zout.write('./tests/data/dirty.flac',
compress_type = zipfile.ZIP_STORED)
zout.write('./tests/data/dirty.docx',
compress_type = zipfile.ZIP_DEFLATED)
zout.write('./tests/data/dirty.jpg',
compress_type = zipfile.ZIP_BZIP2)
zout.write('./tests/data/dirty.txt',
compress_type = zipfile.ZIP_LZMA)
p = archive.ZipParser('./tests/data/dirty.zip')
meta = p.get_meta()
self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
......@@ -594,6 +676,16 @@ class TestCleaningArchives(unittest.TestCase):
self.assertEqual(p.get_meta(), {})
self.assertTrue(p.remove_all())
with zipfile.ZipFile('./tests/data/dirty.zip') as zipin:
members = {
'tests/data/dirty.flac' : zipfile.ZIP_STORED,
'tests/data/dirty.docx': zipfile.ZIP_DEFLATED,
'tests/data/dirty.jpg' : zipfile.ZIP_BZIP2,
'tests/data/dirty.txt' : zipfile.ZIP_LZMA,
}
for k, v in members.items():
self.assertEqual(zipin.getinfo(k).compress_type, v)
os.remove('./tests/data/dirty.zip')
os.remove('./tests/data/dirty.cleaned.zip')
os.remove('./tests/data/dirty.cleaned.cleaned.zip')
......@@ -787,3 +879,97 @@ class TestComplexOfficeFiles(unittest.TestCase):
os.remove(target)
os.remove(p.output_filename)
class TextDocx(unittest.TestCase):
def test_comment_xml_is_removed(self):
with zipfile.ZipFile('./tests/data/comment.docx') as zipin:
# Check if 'word/comments.xml' exists in the zip
self.assertIn('word/comments.xml', zipin.namelist())
shutil.copy('./tests/data/comment.docx', './tests/data/comment_clean.docx')
p = office.MSOfficeParser('./tests/data/comment_clean.docx')
self.assertTrue(p.remove_all())
with zipfile.ZipFile('./tests/data/comment_clean.cleaned.docx') as zipin:
# Check if 'word/comments.xml' exists in the zip
self.assertNotIn('word/comments.xml', zipin.namelist())
os.remove('./tests/data/comment_clean.docx')
os.remove('./tests/data/comment_clean.cleaned.docx')
def test_xml_is_utf8(self):
with zipfile.ZipFile('./tests/data/comment.docx') as zipin:
c = zipin.open('word/document.xml')
content = c.read()
# ensure encoding is utf-8
r = b'encoding=(\'|\")UTF-8(\'|\")'
match = re.search(r, content, re.IGNORECASE)
self.assertIsNotNone(match)
shutil.copy('./tests/data/comment.docx', './tests/data/comment_clean.docx')
p = office.MSOfficeParser('./tests/data/comment_clean.docx')
self.assertTrue(p.remove_all())
with zipfile.ZipFile('./tests/data/comment_clean.cleaned.docx') as zipin:
c = zipin.open('word/document.xml')
content = c.read()
# ensure encoding is still utf-8
r = b'encoding=(\'|\")UTF-8(\'|\")'
match = re.search(r, content, re.IGNORECASE)
self.assertIsNotNone(match)
os.remove('./tests/data/comment_clean.docx')
os.remove('./tests/data/comment_clean.cleaned.docx')
def test_comment_references_are_removed(self):
with zipfile.ZipFile('./tests/data/comment.docx') as zipin:
c = zipin.open('word/document.xml')
content = c.read()
r = b'w:commentRangeStart'
self.assertIn(r, content)
r = b'w:commentRangeEnd'
self.assertIn(r, content)
r = b'w:commentReference'
self.assertIn(r, content)
shutil.copy('./tests/data/comment.docx', './tests/data/comment_clean.docx')
p = office.MSOfficeParser('./tests/data/comment_clean.docx')
self.assertTrue(p.remove_all())
with zipfile.ZipFile('./tests/data/comment_clean.cleaned.docx') as zipin:
c = zipin.open('word/document.xml')
content = c.read()
r = b'w:commentRangeStart'
self.assertNotIn(r, content)
r = b'w:commentRangeEnd'
self.assertNotIn(r, content)
r = b'w:commentReference'
self.assertNotIn(r, content)
os.remove('./tests/data/comment_clean.docx')
os.remove('./tests/data/comment_clean.cleaned.docx')
def test_clean_document_xml_rels(self):
with zipfile.ZipFile('./tests/data/comment.docx') as zipin:
c = zipin.open('word/_rels/document.xml.rels')
content = c.read()
r = b'Target="comments.xml"'
self.assertIn(r, content)
shutil.copy('./tests/data/comment.docx', './tests/data/comment_clean.docx')
p = office.MSOfficeParser('./tests/data/comment_clean.docx')
self.assertTrue(p.remove_all())
with zipfile.ZipFile('./tests/data/comment_clean.cleaned.docx') as zipin:
c = zipin.open('word/_rels/document.xml.rels')
content = c.read()
r = b'Target="comments.xml"'
self.assertNotIn(r, content)
os.remove('./tests/data/comment_clean.docx')
os.remove('./tests/data/comment_clean.cleaned.docx')
......@@ -23,6 +23,11 @@ class TestLightWeightCleaning(unittest.TestCase):
'parser': images.JPGParser,
'meta': {'Comment': 'Created with GIMP'},
'expected_meta': {},
}, {
'name': 'webp',
'parser': images.WEBPParser,
'meta': {'Warning': '[minor] Improper EXIF header'},
'expected_meta': {},
}, {
'name': 'torrent',
'parser': torrent.TorrentParser,
......@@ -33,7 +38,6 @@ class TestLightWeightCleaning(unittest.TestCase):
'parser': images.TiffParser,
'meta': {'ImageDescription': 'OLYMPUS DIGITAL CAMERA '},
'expected_meta': {
'Orientation': 'Horizontal (normal)',
'ResolutionUnit': 'inches',
'XResolution': 72,
'YResolution': 72
......
# Words to be ignored by codespell.
# Put one word per line and sort alphabetically.
process'