Skip to content
Snippets Groups Projects

Add ZIP archives support

Closed Simon Magnin requested to merge smagnin/mat2:archive into master
2 unresolved threads
Files
5
+ 39
15
@@ -2,6 +2,7 @@ import zipfile
import datetime
import tempfile
import os
import sys
import logging
import shutil
from typing import Dict, Set, Pattern
@@ -48,11 +49,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
return zipinfo
@staticmethod
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
metadata = {}
if zipinfo.create_system == 3: # this is Linux
pass
metadata['create_system'] = 'Linux'
elif zipinfo.create_system == 2:
metadata['create_system'] = 'Windows'
else:
@@ -64,11 +64,16 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
ret = self._parse_files()
metadata_files = ret[0]
for name, _ in metadata_files.items():
metadata[name] = metadata_files
return metadata
def remove_all(self) -> bool:
# pylint: disable=too-many-branches
# pylint: disable=too-many-branches
def _parse_files(self) -> tuple:
metadata = {} # type: dict
caller = sys._getframe(1).f_code.co_name
with zipfile.ZipFile(self.filename) as zin,\
zipfile.ZipFile(self.output_filename, 'w') as zout:
@@ -84,11 +89,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename)
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
item.filename)
abort = True
continue
if caller == "remove_all":
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
item.filename)
abort = True
continue
if any(map(lambda r: r.search(item.filename), self.files_to_keep)):
# those files aren't supported, but we want to add them anyway
@@ -112,8 +118,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
abort = True
continue
if tmp_parser:
tmp_parser.remove_all()
os.rename(tmp_parser.output_filename, full_path)
if caller == "remove_all":
tmp_parser.remove_all()
os.rename(tmp_parser.output_filename, full_path)
else:
metadata[item.filename] = tmp_parser.get_meta()
zinfo = zipfile.ZipInfo(item.filename) # type: ignore
clean_zinfo = self._clean_zipinfo(zinfo)
@@ -123,5 +132,20 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
shutil.rmtree(temp_folder)
if abort:
os.remove(self.output_filename)
return False
return True
return metadata, abort
def remove_all(self) -> bool:
ret = self._parse_files()
return not ret[1]
class ZIPParser(ArchiveBasedAbstractParser):
mimetypes = {'application/zip'}
def get_meta(self) -> Dict[str, str]:
metadata = {}
zipin = zipfile.ZipFile(self.filename)
for item in zipin.infolist():
for key, value in self._get_zipinfo_meta(item).items():
metadata[key] = value
zipin.close()
return metadata
Loading