Commit 32293c9f authored by jvoisin's avatar jvoisin

Initial implementation of inverted cleanup

parent 37145531
Pipeline #27232 failed with stages
in 24 seconds
......@@ -30,7 +30,7 @@ class AbstractParser(abc.ABC):
if fname.endswith('.tar') and len(fname) > 4:
fname, extension = fname[:-4], '.tar' + extension
self.output_filename = fname + '.cleaned' + extension
self.backup = fname + '.original.' + extension
self.lightweight_cleaning = False
@abc.abstractmethod
......@@ -38,7 +38,7 @@ class AbstractParser(abc.ABC):
"""Return all the metadata of the current file"""
@abc.abstractmethod
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
"""
Remove all the metadata of the current file
......
......@@ -144,11 +144,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
shutil.rmtree(temp_folder)
return meta
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
# pylint: disable=too-many-branches
with self.archive_class(self.filename) as zin,\
self.archive_class(self.output_filename, 'w' + self.compression) as zout:
self.archive_class(self.backup, 'w' + self.compression) as zout:
temp_folder = tempfile.mkdtemp()
abort = False
......@@ -205,14 +205,14 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
abort = True
continue
else:
if member_parser.remove_all() is False:
if member_parser.remove_all(inplace=False) is False:
logging.warning("In file %s, something went wrong \
with the cleaning of %s \
(format: %s)",
self.filename, member_name, mtype)
abort = True
continue
os.rename(member_parser.output_filename, full_path)
os.rename(member_parser.backup, full_path)
zinfo = self.member_class(member_name) # type: ignore
zinfo = self._set_member_permissions(zinfo, original_permissions)
......@@ -221,8 +221,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
shutil.rmtree(temp_folder)
if abort:
os.remove(self.output_filename)
os.remove(self.backup)
return False
if inplace is True:
os.remove(self.filename)
os.rename(self.backup, self.filename)
return True
......
......@@ -23,9 +23,11 @@ class MutagenParser(abstract.AbstractParser):
return {k:', '.join(v) for k, v in f.tags.items()}
return {}
def remove_all(self) -> bool:
shutil.copy(self.filename, self.output_filename)
f = mutagen.File(self.output_filename)
def remove_all(self, inplace:bool = False) -> bool:
output = self.filename if inplace else self.backup
if inplace:
shutil.copy(self.filename, self.backup)
f = mutagen.File(output)
f.delete()
f.save()
return True
......@@ -51,9 +53,11 @@ class OGGParser(MutagenParser):
class FLACParser(MutagenParser):
mimetypes = {'audio/flac', 'audio/x-flac'}
def remove_all(self) -> bool:
shutil.copy(self.filename, self.output_filename)
f = mutagen.File(self.output_filename)
def remove_all(self, inplace:bool = False) -> bool:
output = self.filename if inplace else self.backup
if inplace:
shutil.copy(self.filename, self.backup)
f = mutagen.File(output)
f.clear_pictures()
f.delete()
f.save(deleteid3=True)
......
......@@ -28,10 +28,11 @@ class ExiftoolParser(abstract.AbstractParser):
return meta
def _lightweight_cleanup(self) -> bool:
if os.path.exists(self.output_filename):
# TODO(jvoisin)
if os.path.exists(self.backup):
try:
# exiftool can't force output to existing files
os.remove(self.output_filename)
os.remove(self.backup)
except OSError as e: # pragma: no cover
logging.error("The output file %s is already existing and \
can't be overwritten: %s.", self.filename, e)
......@@ -46,12 +47,12 @@ class ExiftoolParser(abstract.AbstractParser):
'-Time:All=', # remove all timestamps
'-quiet', # don't show useless logs
'-CommonIFD0=', # remove IFD0 metadata
'-o', self.output_filename,
'-o', self.backup,
self.filename]
try:
subprocess.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
output_filename=self.backup)
except subprocess.CalledProcessError as e: # pragma: no cover
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
......
......@@ -10,6 +10,7 @@ class HarmlessParser(abstract.AbstractParser):
def get_meta(self) -> Dict[str, Union[str, dict]]:
return dict()
def remove_all(self) -> bool:
shutil.copy(self.filename, self.output_filename)
def remove_all(self, inplace:bool = False) -> bool:
if not inplace:
shutil.copy(self.filename, self.backup)
return True
......@@ -25,10 +25,11 @@ class SVGParser(exiftool.ExiftoolParser):
'MIMEType', 'SVGVersion', 'SourceFile', 'ViewBox'
}
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
output = self.filename if inplace else self.backup
svg = Rsvg.Handle.new_from_file(self.filename)
dimensions = svg.get_dimensions()
surface = cairo.SVGSurface(self.output_filename,
surface = cairo.SVGSurface(output,
dimensions.height,
dimensions.width)
context = cairo.Context(surface)
......@@ -66,11 +67,12 @@ class PNGParser(exiftool.ExiftoolParser):
except MemoryError: # pragma: no cover
raise ValueError
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
output = self.filename if inplace else self.backup
if self.lightweight_cleaning:
return self._lightweight_cleanup()
surface = cairo.ImageSurface.create_from_png(self.filename)
surface.write_to_png(self.output_filename)
surface.write_to_png(output)
return True
......@@ -85,7 +87,7 @@ class GIFParser(exiftool.ExiftoolParser):
'HasColorMap', 'ImageHeight', 'ImageSize', 'ImageWidth',
'MIMEType', 'Megapixels', 'SourceFile',}
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
return self._lightweight_cleanup()
......@@ -103,15 +105,16 @@ class GdkPixbufAbstractParser(exiftool.ExiftoolParser):
except GLib.GError:
raise ValueError
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
if self.lightweight_cleaning:
return self._lightweight_cleanup()
output = self.filename if inplace else self.backup
_, extension = os.path.splitext(self.filename)
pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename)
if extension.lower() == '.jpg':
extension = '.jpeg' # gdk is picky
pixbuf.savev(self.output_filename, type=extension[1:], option_keys=[], option_values=[])
pixbuf.savev(output, type=extension[1:], option_keys=[], option_values=[])
return True
......@@ -152,11 +155,15 @@ class PPMParser(abstract.AbstractParser):
meta[str(idx)] = line.lstrip().rstrip()
return meta
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
with open(self.filename) as fin:
with open(self.output_filename, 'w') as fout:
with open(self.backup, 'w') as fout:
for line in fin:
if not line.lstrip().startswith('#'):
line = re.sub(r"\s+", "", line, flags=re.UNICODE)
fout.write(line)
if inplace:
os.remove(self.filename)
os.rename(self.backup, self.filename)
return True
......@@ -38,10 +38,14 @@ class PDFParser(abstract.AbstractParser):
except GLib.GError: # Invalid PDF
raise ValueError
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
if self.lightweight_cleaning is True:
return self.__remove_all_lightweight()
return self.__remove_all_thorough()
if self.__remove_all_thorough():
if inplace:
os.remove(self.filename)
os.rename(self.backup, self.filename)
def __remove_all_lightweight(self) -> bool:
"""
......@@ -65,7 +69,7 @@ class PDFParser(abstract.AbstractParser):
pdf_context.show_page() # draw pdf_context on pdf_surface
pdf_surface.finish()
self.__remove_superficial_meta(tmp_path, self.output_filename)
self.__remove_superficial_meta(tmp_path, self.backup)
os.remove(tmp_path)
return True
......@@ -110,7 +114,7 @@ class PDFParser(abstract.AbstractParser):
pdf_surface.finish()
# Removes metadata added by Poppler
self.__remove_superficial_meta(tmp_path, self.output_filename)
self.__remove_superficial_meta(tmp_path, self.backup)
os.remove(tmp_path)
return True
......
......@@ -22,12 +22,13 @@ class TorrentParser(abstract.AbstractParser):
metadata[key.decode('utf-8')] = value
return metadata
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
cleaned = dict()
for key, value in self.dict_repr.items():
if key in self.allowlist:
cleaned[key] = value
with open(self.output_filename, 'wb') as f:
out_file = self.filename if inplace else self.backup
with open(out_file, 'wb') as f:
f.write(_BencodeHandler().bencode(cleaned))
self.dict_repr = cleaned # since we're stateful
return True
......
......@@ -13,7 +13,7 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser):
# Some fileformats have mandatory metadata fields
meta_key_value_allowlist = {} # type: Dict[str, Union[str, int]]
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
if self.meta_key_value_allowlist:
logging.warning('The format of "%s" (%s) has some mandatory '
'metadata fields; mat2 filled them with standard '
......@@ -31,14 +31,17 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser):
'-fflags', '+bitexact', # don't add any metadata
'-flags:v', '+bitexact', # don't add any metadata
'-flags:a', '+bitexact', # don't add any metadata
self.output_filename]
self.backup]
try:
subprocess.run(cmd, check=True,
input_filename=self.filename,
output_filename=self.output_filename)
output_filename=self.backup)
except subprocess.CalledProcessError as e:
logging.error("Something went wrong during the processing of %s: %s", self.filename, e)
return False
if inplace:
os.remove(self.filename)
os.rename(self.backup, self.filename)
return True
def get_meta(self) -> Dict[str, Union[str, dict]]:
......
......@@ -2,6 +2,7 @@ from html import parser, escape
from typing import Dict, Any, List, Tuple, Set, Optional
import re
import string
import shutil
from . import abstract
......@@ -15,10 +16,12 @@ class CSSParser(abstract.AbstractParser):
mimetypes = {'text/css', }
flags = re.MULTILINE | re.DOTALL
def remove_all(self) -> bool:
def remove_all(self, inplace:bool = False) -> bool:
if not inplace:
shutil.copy(self.filename, self.backup)
with open(self.filename, encoding='utf-8') as f:
cleaned = re.sub(r'/\*.*?\*/', '', f.read(), 0, self.flags)
with open(self.output_filename, 'w', encoding='utf-8') as f:
with open(self.filename, 'w', encoding='utf-8') as f:
f.write(cleaned)
return True
......@@ -53,8 +56,9 @@ class AbstractHTMLParser(abstract.AbstractParser):
def get_meta(self) -> Dict[str, Any]:
return self.__parser.get_meta()
def remove_all(self) -> bool:
return self.__parser.remove_all(self.output_filename)
def remove_all(self, inplace:bool = False) -> bool:
out_file = self.filename if inplace else self.backup
return self.__parser.remove_all(out_file)
class HTMLParser(AbstractHTMLParser):
......
......@@ -131,10 +131,7 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool,
try:
logging.debug('Cleaning %s…', filename)
ret = p.remove_all()
if inplace is True:
os.rename(p.output_filename, filename)
return ret
return p.remove_all(inplace)
except RuntimeError as e:
print("[-] %s can't be cleaned: %s" % (filename, e))
return False
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment