Skip to content
Snippets Groups Projects

Add tar archive support

Merged jvoisin requested to merge add_archive_support into master
Files
5
+ 184
51
import abc
import zipfile
import datetime
import tarfile
import tempfile
import os
import logging
import shutil
from typing import Dict, Set, Pattern, Union, Any, List
from typing import Dict, Set, Pattern, Union, Any, List, Text
from . import abstract, UnknownMemberPolicy, parser_factory
# Make pyflakes happy
assert Set
assert Pattern
assert List
assert Union
# pylint: disable=not-callable,assignment-from-no-return
ArchiveClass = Union[zipfile.ZipFile, tarfile.TarFile]
ArchiveMember = Union[zipfile.ZipInfo, tarfile.TarInfo]
class ArchiveBasedAbstractParser(abstract.AbstractParser):
""" Office files (.docx, .odt, …) are zipped files. """
"""Base class for all archive-based formats.
Welcome to a world of frustrating complexity and tediouness:
- A lot of file formats (docx, odt, epubs, …) are archive-based,
so we need to add callbacks erverywhere to allow their respective
parsers to apply specific cleanup to the required files.
- Python has two different modules to deal with .tar and .zip files,
with similar-but-yet-o-so-different API, so we need to write
a ghetto-wrapper to avoid duplicating everything
- The combination of @staticmethod and @abstractstaticmethod is
required because for now, mypy doesn't know that
@abstractstaticmethod is, indeed, a static method.
- Mypy is too dumb (yet) to realise that a type A is valid under
the Union[A, B] constrain, hence the weird `# type: ignore`
annotations.
"""
def __init__(self, filename):
super().__init__(filename)
self.archive_class = None # type: Optional[ArchiveClass]
self.member_class = None # type: Optional[ArchiveMember]
# Those are the files that have a format that _isn't_
# supported by MAT2, but that we want to keep anyway.
@@ -32,10 +54,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
# the archive?
self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
try: # better fail here than later
zipfile.ZipFile(self.filename)
except zipfile.BadZipFile:
raise ValueError
self.is_archive_valid()
def is_archive_valid(self):
"""Raise a ValueError is the current file isn't valid."""
def _specific_cleanup(self, full_path: str) -> bool:
""" This method can be used to apply specific treatment
@@ -50,49 +72,52 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
return {} # pragma: no cover
@staticmethod
def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
zipinfo.create_system = 3 # Linux
zipinfo.comment = b''
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return zipinfo
@abc.abstractstaticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
"""Return all the members of the archive."""
@staticmethod
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
metadata = {}
if zipinfo.create_system == 3: # this is Linux
pass
elif zipinfo.create_system == 2:
metadata['create_system'] = 'Windows'
else:
metadata['create_system'] = 'Weird'
@abc.abstractstaticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
"""Remove all the metadata for a given member."""
if zipinfo.comment:
metadata['comment'] = zipinfo.comment # type: ignore
@staticmethod
@abc.abstractstaticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
"""Return all the metadata of a given member."""
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
@staticmethod
@abc.abstractstaticmethod
def _get_member_name(member: ArchiveMember) -> Text:
"""Return the name of the given member."""
return metadata
@staticmethod
@abc.abstractstaticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
"""Add the file at full_path to the archive, via the given member."""
def get_meta(self) -> Dict[str, Union[str, dict]]:
meta = dict() # type: Dict[str, Union[str, dict]]
with zipfile.ZipFile(self.filename) as zin:
with self.archive_class(self.filename) as zin:
temp_folder = tempfile.mkdtemp()
for item in zin.infolist():
for item in self._get_all_members(zin):
local_meta = dict() # type: Dict[str, Union[str, Dict]]
for k, v in self._get_zipinfo_meta(item).items():
for k, v in self._get_member_meta(item).items():
local_meta[k] = v
if item.filename[-1] == '/': # pragma: no cover
member_name = self._get_member_name(item)
if member_name[-1] == '/': # pragma: no cover
# `is_dir` is added in Python3.6
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename)
full_path = os.path.join(temp_folder, member_name)
specific_meta = self._specific_get_meta(full_path, item.filename)
specific_meta = self._specific_get_meta(full_path, member_name)
for (k, v) in specific_meta.items():
local_meta[k] = v
@@ -102,7 +127,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
local_meta[k] = v
if local_meta:
meta[item.filename] = local_meta
meta[member_name] = local_meta
shutil.rmtree(temp_folder)
return meta
@@ -110,17 +135,20 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
def remove_all(self) -> bool:
# pylint: disable=too-many-branches
with zipfile.ZipFile(self.filename) as zin,\
zipfile.ZipFile(self.output_filename, 'w') as zout:
with self.archive_class(self.filename) as zin,\
self.archive_class(self.output_filename, 'w') as zout:
temp_folder = tempfile.mkdtemp()
abort = False
items = list() # type: List[zipfile.ZipInfo]
for item in sorted(zin.infolist(), key=lambda z: z.filename):
# Sort the items to process, to reduce fingerprinting,
# and keep them in the `items` variable.
items = list() # type: List[ArchiveMember]
for item in sorted(self._get_all_members(zin), key=self._get_member_name):
# Some fileformats do require to have the `mimetype` file
# as the first file in the archive.
if item.filename == 'mimetype':
if self._get_member_name(item) == 'mimetype':
items = [item] + items
else:
items.append(item)
@@ -128,37 +156,38 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
# Since files order is a fingerprint factor,
# we're iterating (and thus inserting) them in lexicographic order.
for item in items:
if item.filename[-1] == '/': # `is_dir` is added in Python3.6
member_name = self._get_member_name(item)
if member_name[-1] == '/': # `is_dir` is added in Python3.6
continue # don't keep empty folders
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename)
full_path = os.path.join(temp_folder, member_name)
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
item.filename)
member_name)
abort = True
continue
if any(map(lambda r: r.search(item.filename), self.files_to_keep)):
if any(map(lambda r: r.search(member_name), self.files_to_keep)):
# those files aren't supported, but we want to add them anyway
pass
elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
elif any(map(lambda r: r.search(member_name), self.files_to_omit)):
continue
else: # supported files that we want to first clean, then add
tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
if not tmp_parser:
if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
logging.warning("In file %s, omitting unknown element %s (format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
continue
elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
logging.warning("In file %s, keeping unknown element %s (format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
else:
logging.error("In file %s, element %s's format (%s) " \
"isn't supported",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
abort = True
continue
if tmp_parser:
@@ -166,15 +195,14 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
logging.warning("In file %s, something went wrong \
with the cleaning of %s \
(format: %s)",
self.filename, item.filename, mtype)
self.filename, member_name, mtype)
abort = True
continue
os.rename(tmp_parser.output_filename, full_path)
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
clean_zinfo = self._clean_zipinfo(zinfo)
with open(full_path, 'rb') as f:
zout.writestr(clean_zinfo, f.read())
zinfo = self.member_class(member_name) # type: ignore
clean_zinfo = self._clean_member(zinfo)
self._add_file_to_archive(zout, clean_zinfo, full_path)
shutil.rmtree(temp_folder)
if abort:
@@ -183,6 +211,111 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
return True
class TarParser(ArchiveBasedAbstractParser):
mimetypes = {'application/x-tar'}
def __init__(self, filename):
super().__init__(filename)
self.archive_class = tarfile.TarFile
self.member_class = tarfile.TarInfo
def is_archive_valid(self):
if tarfile.is_tarfile(self.filename) is False:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
assert isinstance(member, tarfile.TarInfo) # please mypy
member.mtime = member.uid = member.gid = 0
member.uname = member.gname = ''
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
assert isinstance(member, tarfile.TarInfo) # please mypy
metadata = {}
if member.mtime != 0:
metadata['mtime'] = str(member.mtime)
if member.uid != 0:
metadata['uid'] = str(member.uid)
if member.gid != 0:
metadata['gid'] = str(member.gid)
if member.uname != '':
metadata['uname'] = member.uname
if member.gname != '':
metadata['gname'] = member.gname
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
assert isinstance(member, tarfile.TarInfo) # please mypy
assert isinstance(archive, tarfile.TarFile) # please mypy
archive.add(full_path, member.name, filter=TarParser._clean_member) # type: ignore
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
assert isinstance(archive, tarfile.TarFile) # please mypy
return archive.getmembers() # type: ignore
@staticmethod
def _get_member_name(member: ArchiveMember) -> Text:
assert isinstance(member, tarfile.TarInfo) # please mypy
return member.name
class ZipParser(ArchiveBasedAbstractParser):
mimetypes = {'application/zip'}
def __init__(self, filename):
super().__init__(filename)
self.archive_class = zipfile.ZipFile
self.member_class = zipfile.ZipInfo
def is_archive_valid(self):
try:
zipfile.ZipFile(self.filename)
except zipfile.BadZipFile:
raise ValueError
@staticmethod
def _clean_member(member: ArchiveMember) -> ArchiveMember:
assert isinstance(member, zipfile.ZipInfo) # please mypy
member.create_system = 3 # Linux
member.comment = b''
member.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return member
@staticmethod
def _get_member_meta(member: ArchiveMember) -> Dict[str, str]:
assert isinstance(member, zipfile.ZipInfo) # please mypy
metadata = {}
if member.create_system == 3: # this is Linux
pass
elif member.create_system == 2:
metadata['create_system'] = 'Windows'
else:
metadata['create_system'] = 'Weird'
if member.comment:
metadata['comment'] = member.comment # type: ignore
if member.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*member.date_time))
return metadata
@staticmethod
def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember,
full_path: Text):
assert isinstance(archive, zipfile.ZipFile) # please mypy
assert isinstance(member, zipfile.ZipInfo) # please mypy
with open(full_path, 'rb') as f:
archive.writestr(member, f.read())
@staticmethod
def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]:
assert isinstance(archive, zipfile.ZipFile) # please mypy
return archive.infolist() # type: ignore
@staticmethod
def _get_member_name(member: ArchiveMember) -> Text:
assert isinstance(member, zipfile.ZipInfo) # please mypy
return member.filename
Loading