Commit bbe17fd5 authored by jvoisin's avatar jvoisin
Browse files

Add support for zipfiles!

parent 5e650940
''' Take care of archives formats
'''
import datetime
import logging
import os
import shutil
......@@ -11,12 +12,17 @@ import zipfile
import mat
import parser
ZIP_EPOCH = (1980, 1, 1, 0, 0, 0)
ZIP_EPOCH_SECONDS = (datetime.datetime(1980, 1, 1, 0, 0, 0)
- datetime.datetime(1970, 1, 1, 0, 0, 0)).total_seconds()
class GenericArchiveStripper(parser.GenericParser):
''' Represent a generic archive
'''
def __init__(self, filename, parser, mime, backup, is_writable, **kwargs):
super(GenericArchiveStripper, self).__init__(filename, parser, mime, backup, is_writable, **kwargs)
super(GenericArchiveStripper, self).__init__(filename,
parser, mime, backup, is_writable, **kwargs)
self.compression = ''
self.add2archive = kwargs['add2archive']
self.tempdir = tempfile.mkdtemp()
......@@ -48,13 +54,13 @@ class GenericArchiveStripper(parser.GenericParser):
class ZipStripper(GenericArchiveStripper):
''' Represent a zip file
'''
def is_file_clean(self, fileinfo):
def __is_zipfile_clean(self, fileinfo):
''' Check if a ZipInfo object is clean of metadatas added
by zip itself, independently of the corresponding file metadatas
'''
if fileinfo.comment != '':
return False
elif fileinfo.date_time != (1980, 1, 1, 0, 0, 0):
elif fileinfo.date_time != ZIP_EPOCH:
return False
elif fileinfo.create_system != 3: # 3 is UNIX
return False
......@@ -70,83 +76,100 @@ class ZipStripper(GenericArchiveStripper):
logging.debug('%s has a comment' % self.filename)
return False
for item in zipin.infolist():
# I have not found a way to remove the crap added by zipfile :/
# if not self.is_file_clean(item):
# logging.debug('%s from %s has compromising zipinfo' %
# (item.filename, self.filename))
# return False
zipin.extract(item, self.tempdir)
name = os.path.join(self.tempdir, item.filename)
if not self.__is_zipfile_clean(item) and not list_unsupported:
logging.debug('%s from %s has compromising zipinfo' %
(item.filename, self.filename))
return False
if os.path.isfile(name):
cfile = mat.create_class_file(name, False, add2archive=self.add2archive)
if cfile:
if not cfile.is_clean():
return False
logging.debug('%s from %s has compromising zipinfo' %
(item.filename, self.filename))
if not list_unsupported:
return False
ret_list.append(item.filename)
else:
logging.info('%s\'s fileformat is not supported, or is harmless' % item.filename)
logging.info('%s\'s fileformat is not supported or harmless.'
% item.filename)
basename, ext = os.path.splitext(name)
bname = os.path.basename(item.filename)
if ext not in parser.NOMETA:
if bname != 'mimetype' and bname != '.rels':
if list_unsupported:
ret_list.append(bname)
else:
if os.path.basename(item.filename) not in ('mimetype', '.rels'):
if ext not in parser.NOMETA:
if not list_unsupported:
return False
ret_list.append(item.filename)
zipin.close()
if list_unsupported:
return ret_list
return True
def get_meta(self):
''' Return all the metadata of a ZipFile (don't return metadatas
of contained files : should it ?)
'''
''' Return all the metadata of a zip archive'''
zipin = zipfile.ZipFile(self.filename, 'r')
metadata = {}
for field in zipin.infolist():
zipmeta = {}
if field.comment != '':
zipmeta['comment'] = field.comment
if field.date_time != (1980, 1, 1, 0, 0, 0):
zipmeta['modified'] = field.date_time
if field.create_system != 3: # 3 is UNIX
zipmeta['system'] = "windows" if field.create_system == 2 else "unknown"
if zipin.comment != '':
metadata["%s comment" % self.filename] = zipin.comment
metadata['comment'] = zipin.comment
for item in zipin.infolist():
zipinfo_meta = self.__get_zipinfo_meta(item)
if zipinfo_meta != {}: # zipinfo metadata
metadata[item.filename + "'s zipinfo"] = str(zipinfo_meta)
zipin.extract(item, self.tempdir)
name = os.path.join(self.tempdir, item.filename)
if os.path.isfile(name):
cfile = mat.create_class_file(name, False, add2archive=self.add2archive)
if cfile:
cfile_meta = cfile.get_meta()
if cfile_meta != {}:
metadata[item.filename] = str(cfile_meta)
else:
logging.info('%s\'s fileformat is not supported or harmless'
% item.filename)
zipin.close()
return metadata
def remove_all(self):
''' So far, the zipfile module does not allow to write a ZipInfo
object into a zipfile (and it's a shame !) : so data added
by zipfile itself could not be removed. It's a big concern.
Is shipping a patched version of zipfile.py a good idea ?
def __get_zipinfo_meta(self, zipinfo):
''' Return all the metadata of a ZipInfo
'''
metadata = {}
if zipinfo.comment != '':
metadata['comment'] = zipinfo.comment
if zipinfo.date_time != ZIP_EPOCH:
metadata['modified'] = zipinfo.date_time
if zipinfo.create_system != 3: # 3 is UNIX
metadata['system'] = "windows" if zipinfo.create_system == 2 else "unknown"
return metadata
def remove_all(self, whitelist=[], beginning_blacklist=[], ending_blacklist=[]):
''' Remove all metadata from a zip archive, even thoses
added by Python's zipfile itself. It will not add
files starting with "begining_blacklist", or ending with
"ending_blacklist". This method also add files present in
whitelist to the archive.
'''
zipin = zipfile.ZipFile(self.filename, 'r')
zipout = zipfile.ZipFile(self.output, 'w', allowZip64=True)
for item in zipin.infolist():
zipin.extract(item, self.tempdir)
name = os.path.join(self.tempdir, item.filename)
if os.path.isfile(name):
try:
cfile = mat.create_class_file(name, False,
add2archive=self.add2archive)
beginning = any((True for f in beginning_blacklist if item.filename.startswith(f)))
ending = any((True for f in ending_blacklist if item.filename.endswith(f)))
if os.path.isfile(name) and not beginning and not ending:
cfile = mat.create_class_file(name, False, add2archive=self.add2archive)
if cfile is not None:
cfile.remove_all()
logging.debug('Processing %s from %s' % (item.filename,
self.filename))
zipout.write(name, item.filename)
except:
logging.info('%s\'s format is not supported or harmless' %
item.filename)
_, ext = os.path.splitext(name)
if self.add2archive or ext in parser.NOMETA:
zipout.write(name, item.filename)
logging.debug('Processing %s from %s' % (item.filename, self.filename))
elif item.filename not in whitelist:
logging.info('%s\'s format is not supported or harmless' % item.filename)
basename, ext = os.path.splitext(name)
if not (self.add2archive or ext in parser.NOMETA):
continue
os.utime(name, (ZIP_EPOCH_SECONDS, ZIP_EPOCH_SECONDS))
zipout.write(name, item.filename)
zipin.close()
for zipFile in zipout.infolist():
zipFile.orig_filename = zipFile.filename
zipFile.date_time = (1980, 1, 1, 0, 0, 0)
zipFile.create_system = 3 # 3 is UNIX
zipout.comment = ''
zipout.close()
logging.info('%s processed' % self.filename)
......@@ -167,7 +190,7 @@ class TarStripper(GenericArchiveStripper):
current_file.gname = ''
return current_file
def remove_all(self, exclude_list=[]):
def remove_all(self, whitelist=[]):
tarin = tarfile.open(self.filename, 'r' + self.compression, encoding='utf-8')
tarout = tarfile.open(self.output, 'w' + self.compression, encoding='utf-8')
for item in tarin.getmembers():
......@@ -179,8 +202,9 @@ class TarStripper(GenericArchiveStripper):
cfile.remove_all()
elif self.add2archive or os.path.splitext(item.name)[1] in parser.NOMETA:
logging.info('%s\' format is either not supported or harmless' % item.name)
elif item.name in exclude_list:
logging.debug('%s is not supported, but MAt was told to add it anyway.' % item.name)
elif item.name in whitelist:
logging.debug('%s is not supported, but MAT was told to add it anyway.'
% item.name)
else:
continue
tarout.add(complete_name, item.name, filter=self._remove)
......@@ -209,7 +233,6 @@ class TarStripper(GenericArchiveStripper):
'''
if list_unsupported:
ret_list = []
tempdir_len = len(self.tempdir) + 1 # trim the tempfile path
tarin = tarfile.open(self.filename, 'r' + self.compression)
for item in tarin.getmembers():
if not self.is_file_clean(item) and not list_unsupported:
......@@ -217,20 +240,21 @@ class TarStripper(GenericArchiveStripper):
tarin.extract(item, self.tempdir)
complete_name = os.path.join(self.tempdir, item.name)
if item.isfile():
class_file = mat.create_class_file(complete_name, False, add2archive=self.add2archive)
class_file = mat.create_class_file(complete_name,
False, add2archive=self.add2archive)
if class_file:
# We don't support nested archives
if not class_file.is_clean():
if not list_unsupported:
return False
elif isinstance(class_file, GenericArchiveStripper):
ret_list.append(complete_name[tempdir_len:])
ret_list.append(item.name)
else:
logging.error('%s\'s format is not supported or harmless' % item.name)
if os.path.splitext(complete_name)[1] not in parser.NOMETA:
if not list_unsupported:
return False
ret_list.append(complete_name[tempdir_len:])
ret_list.append(item.name)
tarin.close()
if list_unsupported:
return ret_list
......
''' Care about office's formats
'''
import os
import logging
import zipfile
import fileinput
import tempfile
import os
import shutil
import tempfile
import xml.dom.minidom as minidom
import zipfile
try:
import cairo
......@@ -16,7 +15,6 @@ except ImportError:
logging.info('office.py loaded without PDF support')
pass
import mat
import parser
import archive
......@@ -30,89 +28,83 @@ class OpenDocumentStripper(archive.ZipStripper):
''' Return a dict with all the meta of the file by
trying to read the meta.xml file.
'''
metadata = super(OpenDocumentStripper, self).get_meta()
zipin = zipfile.ZipFile(self.filename, 'r')
metadata = {}
try:
content = zipin.read('meta.xml')
dom1 = minidom.parseString(content)
elements = dom1.getElementsByTagName('office:meta')
for i in elements[0].childNodes:
if i.tagName != 'meta:document-statistic':
nodename = ''.join([k for k in i.nodeName.split(':')[1:]])
nodename = ''.join(i.nodeName.split(':')[1:])
metadata[nodename] = ''.join([j.data for j in i.childNodes])
else:
# thank you w3c for not providing a nice
# method to get all attributes of a node
pass
zipin.close()
except KeyError: # no meta.xml file found
logging.debug('%s has no opendocument metadata' % self.filename)
zipin.close()
return metadata
def remove_all(self):
''' Removes metadata
'''
FIXME ?
There is a patch implementing the Zipfile.remove()
method here : http://bugs.python.org/issue6818
return super(OpenDocumentStripper, self).remove_all(ending_blacklist=['meta.xml'])
def is_clean(self):
''' Check if the file is clean from harmful metadatas
'''
clean_super = super(OpenDocumentStripper, self).is_clean()
if clean_super is False:
return False
zipin = zipfile.ZipFile(self.filename, 'r')
zipout = zipfile.ZipFile(self.output, 'w', allowZip64=True)
try:
zipin.getinfo('meta.xml')
except KeyError: # no meta.xml in the file
return True
zipin.close()
return False
for item in zipin.namelist():
name = os.path.join(self.tempdir, item)
_, ext = os.path.splitext(name)
if item.endswith('manifest.xml'):
# contain the list of all files present in the archive
zipin.extract(item, self.tempdir)
for line in fileinput.input(name, inplace=1):
# remove the line which contains "meta.xml"
line = line.strip()
if not 'meta.xml' in line:
print line
zipout.write(name, item)
class OpenXmlStripper(archive.ZipStripper):
''' Represent an office openxml document, which is like
an opendocument format, with some tricky stuff added.
It contains mostly xml, but can have media blobs, crap, ...
(I don't like this format.)
'''
def remove_all(self):
return super(OpenXmlStripper, self).remove_all(
beginning_blacklist=('docProps/'), whitelist=('.rels'))
elif ext in parser.NOMETA or item == 'mimetype':
# keep NOMETA files, and the "manifest" file
if item != 'meta.xml': # contains the metadata
zipin.extract(item, self.tempdir)
zipout.write(name, item)
def is_clean(self):
''' Check if the file is clean from harmful metadatas.
This implementation is faster than something like
"return this.get_meta() == {}".
'''
clean_super = super(OpenXmlStripper, self).is_clean()
if clean_super is False:
return False
else:
zipin.extract(item, self.tempdir)
if os.path.isfile(name):
try:
cfile = mat.create_class_file(name, False,
add2archive=self.add2archive)
cfile.remove_all()
logging.debug('Processing %s from %s' % (item,
self.filename))
zipout.write(name, item)
except:
logging.info('%s\'s fileformat is not supported' % item)
if self.add2archive:
zipout.write(name, item)
zipout.comment = ''
logging.info('%s processed' % self.filename)
zipin = zipfile.ZipFile(self.filename, 'r')
for item in zipin.namelist():
if item.startswith('docProps/'):
return False
zipin.close()
zipout.close()
self.do_backup()
return True
def is_clean(self):
''' Check if the file is clean from harmful metadatas
def get_meta(self):
''' Return a dict with all the meta of the file
'''
metadata = super(OpenXmlStripper, self).get_meta()
zipin = zipfile.ZipFile(self.filename, 'r')
try:
zipin.getinfo('meta.xml')
except KeyError: # no meta.xml in the file
czf = archive.ZipStripper(self.filename, self.parser,
'application/zip', False, True, add2archive=self.add2archive)
if czf.is_clean():
zipin.close()
return True
for item in zipin.namelist():
if item.startswith('docProps/'):
metadata[item] = 'harmful content'
zipin.close()
return False
return metadata
class PdfStripper(parser.GenericParser):
......@@ -128,8 +120,8 @@ class PdfStripper(parser.GenericParser):
self.pdf_quality = False
self.document = Poppler.Document.new_from_file(uri, self.password)
self.meta_list = frozenset(['title', 'author', 'subject', 'keywords', 'creator',
'producer', 'metadata'])
self.meta_list = frozenset(['title', 'author', 'subject',
'keywords', 'creator', 'producer', 'metadata'])
def is_clean(self):
''' Check if the file is clean from harmful metadatas
......@@ -168,7 +160,7 @@ class PdfStripper(parser.GenericParser):
surface.finish()
shutil.move(output, self.output)
except:
logging.error('Something went wrong when cleaning %s. File not cleaned' % self.filename)
logging.error('Something went wrong when cleaning %s.' % self.filename)
return False
try:
......@@ -182,8 +174,7 @@ class PdfStripper(parser.GenericParser):
writer.write(self.output)
self.do_backup()
except:
logging.error('Unable to remove all metadata from %s, please install\
pdfrw' % self.output)
logging.error('Unable to remove all metadata from %s, please install pdfrw' % self.output)
return False
return True
......@@ -195,73 +186,3 @@ pdfrw' % self.output)
if self.document.get_property(key):
metadata[key] = self.document.get_property(key)
return metadata
class OpenXmlStripper(archive.GenericArchiveStripper):
'''
Represent an office openxml document, which is like
an opendocument format, with some tricky stuff added.
It contains mostly xml, but can have media blobs, crap, ...
(I don't like this format.)
'''
def remove_all(self):
'''
FIXME ?
There is a patch implementing the Zipfile.remove()
method here : http://bugs.python.org/issue6818
'''
zipin = zipfile.ZipFile(self.filename, 'r')
zipout = zipfile.ZipFile(self.output, 'w',
allowZip64=True)
for item in zipin.namelist():
name = os.path.join(self.tempdir, item)
_, ext = os.path.splitext(name)
if item.startswith('docProps/'): # metadatas
pass
elif ext in parser.NOMETA or item == '.rels':
# keep parser.NOMETA files, and the file named ".rels"
zipin.extract(item, self.tempdir)
zipout.write(name, item)
else:
zipin.extract(item, self.tempdir)
if os.path.isfile(name): # don't care about folders
try:
cfile = mat.create_class_file(name, False,
add2archive=self.add2archive)
cfile.remove_all()
logging.debug('Processing %s from %s' % (item,
self.filename))
zipout.write(name, item)
except:
logging.info('%s\'s fileformat is not supported' % item)
if self.add2archive:
zipout.write(name, item)
zipout.comment = ''
logging.info('%s processed' % self.filename)
zipin.close()
zipout.close()
self.do_backup()
return True
def is_clean(self):
''' Check if the file is clean from harmful metadatas
'''
zipin = zipfile.ZipFile(self.filename, 'r')
for item in zipin.namelist():
if item.startswith('docProps/'):
return False
zipin.close()
czf = archive.ZipStripper(self.filename, self.parser,
'application/zip', False, True, add2archive=self.add2archive)
return czf.is_clean()
def get_meta(self):
''' Return a dict with all the meta of the file
'''
zipin = zipfile.ZipFile(self.filename, 'r')
metadata = {}
for item in zipin.namelist():
if item.startswith('docProps/'):
metadata[item] = 'harmful content'
zipin.close()
return metadata
......@@ -14,6 +14,8 @@ import subprocess
STRIPPERS = {
'application/x-tar': archive.TarStripper,
'application/x-bzip2': archive.Bzip2Stripper,
'application/x-gzip': archive.GzipStripper,
'application/zip': archive.ZipStripper,
'audio/mpeg': audio.MpegAudioStripper,
'application/x-bittorrent': misc.TorrentStripper,
'application/opendocument': office.OpenDocumentStripper,
......
......@@ -410,7 +410,7 @@ non-anonymised) file to output archive'))
unsupported_list = self.liststore[line][0].file.list_unsupported()
if unsupported_list:
list_to_add = self.__popup_archive(unsupported_list)
if self.liststore[line][0].file.remove_all(list_to_add):
if self.liststore[line][0].file.remove_all(whitelist=list_to_add):
self.liststore[line][2] = _('Clean')
elif self.liststore[line][0].file.remove_all():
self.liststore[line][2] = _('Clean')
......
No preview for this file type
No preview for this file type
......@@ -99,6 +99,7 @@ class TestSecureRemove(unittest.TestCase):
'''
self.assertRaises(MAT.exceptions.UnableToRemoveFile, MAT.mat.secure_remove, '/NOTREMOVABLE')
class TestArchiveProcessing(test.MATTest):
''' Test archives processing
'''
......@@ -107,7 +108,7 @@ class TestArchiveProcessing(test.MATTest):
'''
tarpath = os.path.join(self.tmpdir, "test.tar.bz2")
tar = tarfile.open(tarpath, "w:bz2")
for clean,dirty in self.file_list:
for clean, dirty in self.file_list:
tar.add(dirty)
tar.add(clean)
tar.close()
......@@ -121,7 +122,7 @@ class TestArchiveProcessing(test.MATTest):
'''
tarpath = os.path.join(self.tmpdir, "test.tar")
tar = tarfile.open(tarpath, "w")
for clean,dirty in self.file_list:
for clean, dirty in self.file_list:
tar.add(dirty)
tar.add(clean)
tar.close()
......@@ -135,7 +136,7 @@ class TestArchiveProcessing(test.MATTest):
'''
tarpath = os.path.join(self.tmpdir, "test.tar.gz")
tar = tarfile.open(tarpath, "w")
for clean,dirty in self.file_list:
for clean, dirty in self.file_list:
tar.add(dirty)
tar.add(clean)
tar.close()
......@@ -156,6 +157,7 @@ class TestArchiveProcessing(test.MATTest):
unsupported_files = set(current_file.is_clean(list_unsupported=True))
self.assertEqual(unsupported_files, set(('mat.desktop', 'README.security', 'setup.py')))
def get_tests():
''' Returns every libtests'''
suite = unittest.TestSuite()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment