Skip to content
Snippets Groups Projects
Commit acff1ca6 authored by smagnin's avatar smagnin
Browse files

Add ZIP archives support

parent 4ed30b5e
No related branches found
No related tags found
No related merge requests found
Pipeline #19495 failed
...@@ -2,6 +2,7 @@ import zipfile ...@@ -2,6 +2,7 @@ import zipfile
import datetime import datetime
import tempfile import tempfile
import os import os
import sys
import logging import logging
import shutil import shutil
from typing import Dict, Set, Pattern from typing import Dict, Set, Pattern
...@@ -49,10 +50,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): ...@@ -49,10 +50,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
return zipinfo return zipinfo
@staticmethod @staticmethod
def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]: def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
metadata = {} metadata = {}
if zipinfo.create_system == 3: # this is Linux if zipinfo.create_system == 3: # this is Linux
pass metadata['create_system'] = 'Linux'
elif zipinfo.create_system == 2: elif zipinfo.create_system == 2:
metadata['create_system'] = 'Windows' metadata['create_system'] = 'Windows'
else: else:
...@@ -64,11 +65,15 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): ...@@ -64,11 +65,15 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time)) metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
ret = self._parse_files()
metadata_files = ret[0]
for name, _ in metadata_files.items():
metadata[name] = metadata_files
return metadata return metadata
def remove_all(self) -> bool: def _parse_files(self) -> tuple:
# pylint: disable=too-many-branches metadata = {} # type: dict
caller = sys._getframe(1).f_code.co_name
with zipfile.ZipFile(self.filename) as zin,\ with zipfile.ZipFile(self.filename) as zin,\
zipfile.ZipFile(self.output_filename, 'w') as zout: zipfile.ZipFile(self.output_filename, 'w') as zout:
...@@ -84,11 +89,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): ...@@ -84,11 +89,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
zin.extract(member=item, path=temp_folder) zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, item.filename) full_path = os.path.join(temp_folder, item.filename)
if self._specific_cleanup(full_path) is False: if caller == "remove_all":
logging.warning("Something went wrong during deep cleaning of %s", if self._specific_cleanup(full_path) is False:
item.filename) logging.warning("Something went wrong during deep cleaning of %s",
abort = True item.filename)
continue abort = True
continue
if any(map(lambda r: r.search(item.filename), self.files_to_keep)): if any(map(lambda r: r.search(item.filename), self.files_to_keep)):
# those files aren't supported, but we want to add them anyway # those files aren't supported, but we want to add them anyway
...@@ -112,8 +118,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): ...@@ -112,8 +118,11 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
abort = True abort = True
continue continue
if tmp_parser: if tmp_parser:
tmp_parser.remove_all() if caller == "remove_all":
os.rename(tmp_parser.output_filename, full_path) tmp_parser.remove_all()
os.rename(tmp_parser.output_filename, full_path)
else:
metadata[item.filename] = tmp_parser.get_meta()
zinfo = zipfile.ZipInfo(item.filename) # type: ignore zinfo = zipfile.ZipInfo(item.filename) # type: ignore
clean_zinfo = self._clean_zipinfo(zinfo) clean_zinfo = self._clean_zipinfo(zinfo)
...@@ -123,5 +132,21 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): ...@@ -123,5 +132,21 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
shutil.rmtree(temp_folder) shutil.rmtree(temp_folder)
if abort: if abort:
os.remove(self.output_filename) os.remove(self.output_filename)
return False return metadata, abort
return True # pylint: disable=too-many-branches
def remove_all(self) -> bool:
ret = self._parse_files()
return not ret[1]
class ZIPParser(ArchiveBasedAbstractParser):
mimetypes = {'application/zip'}
def get_meta(self) -> Dict[str, str]:
metadata = {}
zipin = zipfile.ZipFile(self.filename)
for item in zipin.infolist():
for key, value in self._get_zipinfo_meta(self, item).items():
metadata[key] = value
zipin.close()
return metadata
...@@ -312,7 +312,7 @@ class MSOfficeParser(ArchiveBasedAbstractParser): ...@@ -312,7 +312,7 @@ class MSOfficeParser(ArchiveBasedAbstractParser):
metadata[key] = value metadata[key] = value
except (TypeError, UnicodeDecodeError): # We didn't manage to parse the xml file except (TypeError, UnicodeDecodeError): # We didn't manage to parse the xml file
metadata[item.filename] = 'harmful content' metadata[item.filename] = 'harmful content'
for key, value in self._get_zipinfo_meta(item).items(): for key, value in self._get_zipinfo_meta(self, item).items():
metadata[key] = value metadata[key] = value
zipin.close() zipin.close()
return metadata return metadata
...@@ -397,7 +397,7 @@ class LibreOfficeParser(ArchiveBasedAbstractParser): ...@@ -397,7 +397,7 @@ class LibreOfficeParser(ArchiveBasedAbstractParser):
metadata[key] = value metadata[key] = value
except (TypeError, UnicodeDecodeError): # We didn't manage to parse the xml file except (TypeError, UnicodeDecodeError): # We didn't manage to parse the xml file
metadata[item.filename] = 'harmful content' metadata[item.filename] = 'harmful content'
for key, value in self._get_zipinfo_meta(item).items(): for key, value in self._get_zipinfo_meta(self, item).items():
metadata[key] = value metadata[key] = value
zipin.close() zipin.close()
return metadata return metadata
...@@ -73,10 +73,22 @@ def show_meta(filename: str): ...@@ -73,10 +73,22 @@ def show_meta(filename: str):
return return
for k, v in metadata: for k, v in metadata:
zipmeta = v
try: # FIXME this is ugly. try: # FIXME this is ugly.
print(" %s: %s" % (k, v)) if not isinstance(zipmeta, dict):
print(" %s: %s" % (k, v))
except UnicodeEncodeError: except UnicodeEncodeError:
print(" %s: harmful content" % k) print(" %s: harmful content" % k)
if mtype == "application/zip":
print("[+] Metadata for files inside the archive :")
if isinstance(zipmeta, dict):
for name, metas in zipmeta.items():
try: # FIXME this is ugly.
print(" %s" % name)
for meta_name, meta in metas.items():
print(" %s: %s" % (meta_name, meta))
except UnicodeEncodeError:
print(" %s: harmful content" % k)
def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool: def clean_meta(filename: str, is_lightweight: bool, policy: UnknownMemberPolicy) -> bool:
if not __check_file(filename, os.R_OK|os.W_OK): if not __check_file(filename, os.R_OK|os.W_OK):
......
...@@ -65,7 +65,10 @@ class TestCorruptedEmbedded(unittest.TestCase): ...@@ -65,7 +65,10 @@ class TestCorruptedEmbedded(unittest.TestCase):
os.remove('./tests/data/clean.docx') os.remove('./tests/data/clean.docx')
def test_odt(self): def test_odt(self):
expected = { expected = {'Pictures/100002010000021D0000039CFEBF39BEE21A25FB.png':
{'PixelUnits': 'meters',
'PixelsPerUnitX': 341,
'PixelsPerUnitY': 341},
'create_system': 'Weird', 'create_system': 'Weird',
'date_time': '2018-06-10 17:18:18', 'date_time': '2018-06-10 17:18:18',
'meta.xml': 'harmful content' 'meta.xml': 'harmful content'
......
...@@ -41,7 +41,7 @@ class TestZipMetadata(unittest.TestCase): ...@@ -41,7 +41,7 @@ class TestZipMetadata(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.MSOfficeParser('./tests/data/clean.cleaned.docx') p = office.MSOfficeParser('./tests/data/clean.cleaned.docx')
self.assertEqual(p.get_meta(), {}) self.assertEqual(p.get_meta(), {'create_system': 'Linux', 'word/media/image1.png': {'word/media/image1.png': {}}})
self.__check_zip_meta(p) self.__check_zip_meta(p)
self.__check_deep_meta(p) self.__check_deep_meta(p)
...@@ -60,7 +60,7 @@ class TestZipMetadata(unittest.TestCase): ...@@ -60,7 +60,7 @@ class TestZipMetadata(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.LibreOfficeParser('./tests/data/clean.cleaned.odt') p = office.LibreOfficeParser('./tests/data/clean.cleaned.odt')
self.assertEqual(p.get_meta(), {}) self.assertEqual(p.get_meta(), {'Pictures/1000000000000032000000311EC5314D.png': {'Pictures/1000000000000032000000311EC5314D.png': {}}, 'create_system': 'Linux'})
self.__check_zip_meta(p) self.__check_zip_meta(p)
self.__check_deep_meta(p) self.__check_deep_meta(p)
......
...@@ -336,13 +336,30 @@ class TestCleaning(unittest.TestCase): ...@@ -336,13 +336,30 @@ class TestCleaning(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.MSOfficeParser('./tests/data/clean.cleaned.docx') p = office.MSOfficeParser('./tests/data/clean.cleaned.docx')
self.assertEqual(p.get_meta(), {}) self.assertEqual(p.get_meta(), {'create_system': 'Linux', 'word/media/image1.png': {'word/media/image1.png': {}}})
self.assertTrue(p.remove_all())
os.remove('./tests/data/clean.docx') os.remove('./tests/data/clean.docx')
os.remove('./tests/data/clean.cleaned.docx') os.remove('./tests/data/clean.cleaned.docx')
os.remove('./tests/data/clean.cleaned.cleaned.docx') os.remove('./tests/data/clean.cleaned.cleaned.docx')
def test_zip(self):
shutil.copy('./tests/data/test.zip', './tests/data/clean.zip')
p = office.MSOfficeParser('./tests/data/clean.zip')
meta = p.get_meta()
self.assertIsNotNone(meta)
ret = p.remove_all()
self.assertTrue(ret)
p = office.MSOfficeParser('./tests/data/clean.cleaned.zip')
self.assertEqual(p.get_meta(), {'create_system': 'Linux',
'dirty.mp3': {'dirty.mp3': {}, 'dirty.png': {}},
'dirty.png': {'dirty.mp3': {}, 'dirty.png': {}}})
os.remove('./tests/data/clean.zip')
os.remove('./tests/data/clean.cleaned.zip')
def test_libreoffice(self): def test_libreoffice(self):
shutil.copy('./tests/data/dirty.odt', './tests/data/clean.odt') shutil.copy('./tests/data/dirty.odt', './tests/data/clean.odt')
p = office.LibreOfficeParser('./tests/data/clean.odt') p = office.LibreOfficeParser('./tests/data/clean.odt')
...@@ -354,7 +371,7 @@ class TestCleaning(unittest.TestCase): ...@@ -354,7 +371,7 @@ class TestCleaning(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.LibreOfficeParser('./tests/data/clean.cleaned.odt') p = office.LibreOfficeParser('./tests/data/clean.cleaned.odt')
self.assertEqual(p.get_meta(), {}) self.assertEqual(p.get_meta(), {'Pictures/1000000000000032000000311EC5314D.png': {'Pictures/1000000000000032000000311EC5314D.png': {}}, 'create_system': 'Linux'})
self.assertTrue(p.remove_all()) self.assertTrue(p.remove_all())
os.remove('./tests/data/clean.odt') os.remove('./tests/data/clean.odt')
...@@ -426,8 +443,8 @@ class TestCleaning(unittest.TestCase): ...@@ -426,8 +443,8 @@ class TestCleaning(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.LibreOfficeParser('./tests/data/clean.cleaned.odf') p = office.LibreOfficeParser('./tests/data/clean.cleaned.odf')
self.assertEqual(p.get_meta(), {})
self.assertTrue(p.remove_all()) self.assertTrue(p.remove_all())
self.assertEqual(p.get_meta(), {'create_system': 'Linux'})
os.remove('./tests/data/clean.odf') os.remove('./tests/data/clean.odf')
os.remove('./tests/data/clean.cleaned.odf') os.remove('./tests/data/clean.cleaned.odf')
...@@ -444,8 +461,7 @@ class TestCleaning(unittest.TestCase): ...@@ -444,8 +461,7 @@ class TestCleaning(unittest.TestCase):
self.assertTrue(ret) self.assertTrue(ret)
p = office.LibreOfficeParser('./tests/data/clean.cleaned.odg') p = office.LibreOfficeParser('./tests/data/clean.cleaned.odg')
self.assertEqual(p.get_meta(), {}) self.assertEqual(p.get_meta(), {'create_system': 'Linux'})
self.assertTrue(p.remove_all())
os.remove('./tests/data/clean.odg') os.remove('./tests/data/clean.odg')
os.remove('./tests/data/clean.cleaned.odg') os.remove('./tests/data/clean.cleaned.odg')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment