Skip to content
Snippets Groups Projects
Commit 1d5c2884 authored by Julien (jvoisin) Voisin's avatar Julien (jvoisin) Voisin
Browse files

Add support for tar files

parent 20ed5eb7
No related branches found
No related tags found
No related merge requests found
Pipeline #24277 failed
import abc
import zipfile
import datetime
import tarfile
import tempfile
import os
import logging
import shutil
from typing import Dict, Set, Pattern, Union, Any, List
from typing import Dict, Set, Pattern, Union, Any, List, Text
from . import abstract, UnknownMemberPolicy, parser_factory
......@@ -13,12 +15,27 @@ assert Set
assert Pattern
assert List
assert Union
assert Text
ArchiveClass = Union[zipfile.ZipFile, tarfile.TarFile]
ArchiveMember = Union[zipfile.ZipInfo, tarfile.TarInfo]
class ArchiveBasedAbstractParser(abstract.AbstractParser):
""" Office files (.docx, .odt, …) are zipped files. """
"""Base class for all archive-based formats.
Welcome to a world of frustrating complexity and tediouness:
- A lot of file formats (docx, odt, epubs, …) are archive-based,
so we need to add callbacks erverywhere to allow their respective
parsers to apply specific cleanup to the required files.
- Python has two different modules to deal with .tar and .zip files,
with similar-but-yet-o-so-different API, so we need to write
a ghetto-wrapper to avoid duplicating everything
"""
def __init__(self, filename):
super().__init__(filename)
self.archive_class = None # type: Optional[ArchiveClass]
self.member_class = None # type: Optional[ArchiveMember]
# Those are the files that have a format that _isn't_
# supported by MAT2, but that we want to keep anyway.
......@@ -32,10 +49,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
# the archive?
self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
try: # better fail here than later
zipfile.ZipFile(self.filename)
except zipfile.BadZipFile:
raise ValueError
self.is_archive_valid()
def is_archive_valid(self):
"""Raise a ValueError is the current file isn't valid."""
pass
def _specific_cleanup(self, full_path: str) -> bool:
""" This method can be used to apply specific treatment
......@@ -49,50 +67,48 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
# pylint: disable=unused-argument,no-self-use
return {} # pragma: no cover
@staticmethod
def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
zipinfo.create_system = 3 # Linux
zipinfo.comment = b''
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return zipinfo
@abc.abstractstaticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
"""Return all the members of the archive."""
@staticmethod
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
metadata = {}
if zipinfo.create_system == 3: # this is Linux
pass
elif zipinfo.create_system == 2:
metadata['create_system'] = 'Windows'
else:
metadata['create_system'] = 'Weird'
@abc.abstractstaticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
"""Remove all the metadata for a given member."""
if zipinfo.comment:
metadata['comment'] = zipinfo.comment # type: ignore
@abc.abstractstaticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
"""Return all the metadata of a given member."""
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
@abc.abstractstaticmethod
def _get_member_name(member: ArchiveMember) -> Text:
"""Return the name of the given member."""
return metadata
@abc.abstractstaticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
"""Add the file at full_path to the archive, via the given member."""
def get_meta(self) -> Dict[str, Union[str, dict]]:
meta = dict() # type: Dict[str, Union[str, dict]]
with zipfile.ZipFile(self.filename) as zin:
with self.archive_class(self.filename) as zin:
temp_folder = tempfile.mkdtemp()
for item in zin.infolist():
for item in self._get_all_members(zin):
local_meta = dict() # type: Dict[str, Union[str, Dict]]
for k, v in self._get_zipinfo_meta(item).items():
for k, v in self._get_member_meta(item).items():
local_meta[k] = v
if item.filename[-1] == '/': # pragma: no cover
member_name = self._get_member_name(item)
if member_name[-1] == '/': # pragma: no cover
# `is_dir` is added in Python3.6
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename)
full_path = os.path.join(temp_folder, member_name)
specific_meta = self._specific_get_meta(full_path, item.filename)
specific_meta = self._specific_get_meta(full_path, member_name)
for (k, v) in specific_meta.items():
local_meta[k] = v
......@@ -102,7 +118,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
local_meta[k] = v
if local_meta:
meta[item.filename] = local_meta
meta[member_name] = local_meta
shutil.rmtree(temp_folder)
return meta
......@@ -110,17 +126,22 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
def remove_all(self) -> bool:
# pylint: disable=too-many-branches
with zipfile.ZipFile(self.filename) as zin,\
zipfile.ZipFile(self.output_filename, 'w') as zout:
with self.archive_class(self.filename) as zin,\
self.archive_class(self.output_filename, 'w') as zout:
temp_folder = tempfile.mkdtemp()
abort = False
items = list() # type: List[zipfile.ZipInfo]
for item in sorted(zin.infolist(), key=lambda z: z.filename):
# Sort the items to process, to reduce fingerprinting,
# and keep them in the `items` variable.
items = list() # type: List[ArchiveMember]
members = self._get_all_members(zin)
sort_key = lambda z: self._get_member_name(z)
for item in sorted(members, key=sort_key):
# Some fileformats do require to have the `mimetype` file
# as the first file in the archive.
if item.filename == 'mimetype':
if self._get_member_name(item) == 'mimetype':
items = [item] + items
else:
items.append(item)
......@@ -128,37 +149,38 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
# Since files order is a fingerprint factor,
# we're iterating (and thus inserting) them in lexicographic order.
for item in items:
if item.filename[-1] == '/': # `is_dir` is added in Python3.6
member_name = self._get_member_name(item)
if member_name[-1] == '/': # `is_dir` is added in Python3.6
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename)
full_path = os.path.join(temp_folder, member_name)
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
item.filename)
member_name)
abort = True
continue
if any(map(lambda r: r.search(item.filename), self.files_to_keep)):
if any(map(lambda r: r.search(member_name), self.files_to_keep)):
# those files aren't supported, but we want to add them anyway
pass
elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
elif any(map(lambda r: r.search(member_name), self.files_to_omit)):
continue
else: # supported files that we want to first clean, then add
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
if not tmp_parser:
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
logging.warning("In file %s, omitting unknown element %s (format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
continue
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
logging.warning("In file %s, keeping unknown element %s (format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
else:
logging.error("In file %s, element %s's format (%s) " \
"isn't supported",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
abort = True
continue
if tmp_parser:
......@@ -166,15 +188,14 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
logging.warning("In file %s, something went wrong \
with the cleaning of %s \
(format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
abort = True
continue
os.rename(tmp_parser.output_filename, full_path)
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
clean_zinfo = self._clean_zipinfo(zinfo)
with open(full_path, 'rb') as f:
zout.writestr(clean_zinfo, f.read())
zinfo = self.member_class(member_name) # type: ignore
clean_zinfo = self._clean_member(zinfo)
self._add_file_to_archive(zout, clean_zinfo, full_path)
shutil.rmtree(temp_folder)
if abort:
......@@ -183,6 +204,99 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
return True
class TarParser(ArchiveBasedAbstractParser):
mimetypes = {'application/x-tar'}
def __init__(self, filename):
super().__init__(filename)
self.archive_class = tarfile.TarFile
self.member_class = tarfile.TarInfo
def is_archive_valid(self):
if tarfile.is_tarfile(self.filename) is False:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
member.mtime = member.uid = member.gid = 0
member.uname = member.gname = ''
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
metadata = {}
if member.mtime != 0:
metadata['mtime'] = member.mtime
if member.uid != 0:
metadata['uid'] = member.uid
if member.gid != 0:
metadata['gid'] = member.gid
if member.uname != '':
metadata['uname'] = member.uname
if member.gname != '':
metadata['gname'] = member.gname
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
archive.add(full_path, member.name, filter=TarParser._clean_member)
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
return archive.getmembers()
@staticmethod
def _get_member_name(member: ArchiveMember) -> Text:
return member.name
class ZipParser(ArchiveBasedAbstractParser):
mimetypes = {'application/zip'}
def __init__(self, filename):
super().__init__(filename)
self.archive_class = zipfile.ZipFile
self.member_class = zipfile.ZipInfo
def is_archive_valid(self):
try:
zipfile.ZipFile(self.filename)
except zipfile.BadZipFile:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
member.create_system = 3 # Linux
member.comment = b''
member.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
metadata = {}
if member.create_system == 3: # this is Linux
pass
elif member.create_system == 2:
metadata['create_system'] = 'Windows'
else:
metadata['create_system'] = 'Weird'
if member.comment:
metadata['comment'] = member.comment # type: ignore
if member.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*member.date_time))
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
with open(full_path, 'rb') as f:
archive.writestr(member, f.read())
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
return archive.infolist()
@staticmethod
def _get_member_name(member: ArchiveMember) -> Text:
return member.filename
......@@ -5,7 +5,7 @@ import xml.etree.ElementTree as ET # type: ignore
from . import archive, office
class EPUBParser(archive.ArchiveBasedAbstractParser):
class EPUBParser(archive.ZipParser):
mimetypes = {'application/epub+zip', }
metadata_namespace = '{http://purl.org/dc/elements/1.1/}'
......
......@@ -6,7 +6,7 @@ from typing import Dict, Set, Pattern, Tuple, Any
import xml.etree.ElementTree as ET # type: ignore
from .archive import ArchiveBasedAbstractParser
from .archive import ZipParser
# pylint: disable=line-too-long
......@@ -43,7 +43,7 @@ def _sort_xml_attributes(full_path: str) -> bool:
return True
class MSOfficeParser(ArchiveBasedAbstractParser):
class MSOfficeParser(ZipParser):
mimetypes = {
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
......@@ -312,7 +312,7 @@ class MSOfficeParser(ArchiveBasedAbstractParser):
return {file_path: 'harmful content', }
class LibreOfficeParser(ArchiveBasedAbstractParser):
class LibreOfficeParser(ZipParser):
mimetypes = {
'application/vnd.oasis.opendocument.text',
'application/vnd.oasis.opendocument.spreadsheet',
......
......@@ -4,6 +4,7 @@ import unittest
import shutil
import os
import re
import tarfile
import zipfile
from libmat2 import pdf, images, audio, office, parser_factory, torrent, harmless
......@@ -195,6 +196,19 @@ class TestGetMeta(unittest.TestCase):
self.assertEqual(meta['version'], '1.0')
self.assertEqual(meta['harmful data'], 'underline is cool')
def test_tar(self):
with tarfile.TarFile('./tests/data/dirty.tar', 'w') as tout:
tout.add('./tests/data/dirty.flac')
tout.add('./tests/data/dirty.docx')
tout.add('./tests/data/dirty.jpg')
p, mimetype = parser_factory.get_parser('./tests/data/dirty.tar')
self.assertEqual(mimetype, 'application/x-tar')
meta = p.get_meta()
self.assertEqual(meta['./tests/data/dirty.flac']['comments'], 'Thank you for using MAT !')
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
os.remove('./tests/data/dirty.tar')
class TestRemovingThumbnails(unittest.TestCase):
def test_odt(self):
shutil.copy('./tests/data/revision.odt', './tests/data/clean.odt')
......@@ -702,3 +716,23 @@ class TestCleaning(unittest.TestCase):
os.remove('./tests/data/clean.css')
os.remove('./tests/data/clean.cleaned.css')
os.remove('./tests/data/clean.cleaned.cleaned.css')
def test_tar(self):
with tarfile.TarFile('./tests/data/dirty.tar', 'w') as zout:
zout.add('./tests/data/dirty.flac')
zout.add('./tests/data/dirty.docx')
zout.add('./tests/data/dirty.jpg')
p = archive.TarParser('./tests/data/dirty.tar')
meta = p.get_meta()
self.assertEqual(meta['./tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!')
ret = p.remove_all()
self.assertTrue(ret)
p = archive.TarParser('./tests/data/dirty.cleaned.tar')
self.assertEqual(p.get_meta(), {})
self.assertTrue(p.remove_all())
os.remove('./tests/data/dirty.tar')
os.remove('./tests/data/dirty.cleaned.tar')
os.remove('./tests/data/dirty.cleaned.cleaned.tar')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment