Skip to content
Snippets Groups Projects
Commit a7ebb587 authored by Julien (jvoisin) Voisin's avatar Julien (jvoisin) Voisin
Browse files

Handle weird permissions in tar archives

parent 14a4cddb
No related branches found
No related tags found
No related merge requests found
import abc
import stat
import zipfile
import datetime
import tarfile
......@@ -104,6 +105,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
full_path: str):
"""Add the file at full_path to the archive, via the given member."""
@staticmethod
def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember:
"""Set the permission of the archive member."""
# pylint: disable=unused-argument
return member
def get_meta(self) -> Dict[str, Union[str, dict]]:
meta = dict() # type: Dict[str, Union[str, dict]]
......@@ -120,6 +127,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, member_name)
os.chmod(full_path, stat.S_IRUSR)
specific_meta = self._specific_get_meta(full_path, member_name)
local_meta = {**local_meta, **specific_meta}
......@@ -164,6 +172,9 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
zin.extract(member=item, path=temp_folder)
full_path = os.path.join(temp_folder, member_name)
original_permissions = os.stat(full_path).st_mode
os.chmod(full_path, original_permissions | stat.S_IWUSR | stat.S_IRUSR)
if self._specific_cleanup(full_path) is False:
logging.warning("Something went wrong during deep cleaning of %s",
member_name)
......@@ -202,6 +213,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
os.rename(member_parser.output_filename, full_path)
zinfo = self.member_class(member_name) # type: ignore
zinfo = self._set_member_permissions(zinfo, original_permissions)
clean_zinfo = self._clean_member(zinfo)
self._add_file_to_archive(zout, clean_zinfo, full_path)
......@@ -216,11 +228,11 @@ class TarParser(ArchiveBasedAbstractParser):
mimetypes = {'application/x-tar'}
def __init__(self, filename):
super().__init__(filename)
# yes, it's tarfile.TarFile.open and not tarfile.TarFile,
# yes, it's tarfile.open and not tarfile.TarFile,
# as stated in the documentation:
# https://docs.python.org/3/library/tarfile.html#tarfile.TarFile
# This is required to support compressed archives.
self.archive_class = tarfile.TarFile.open
self.archive_class = tarfile.open
self.member_class = tarfile.TarInfo
def is_archive_valid(self):
......@@ -239,7 +251,7 @@ class TarParser(ArchiveBasedAbstractParser):
assert isinstance(member, tarfile.TarInfo) # please mypy
metadata = {}
if member.mtime != 0:
metadata['mtime'] = datetime.datetime.fromtimestamp(member.mtime)
metadata['mtime'] = str(datetime.datetime.fromtimestamp(member.mtime))
if member.uid != 0:
metadata['uid'] = str(member.uid)
if member.gid != 0:
......@@ -267,6 +279,12 @@ class TarParser(ArchiveBasedAbstractParser):
assert isinstance(member, tarfile.TarInfo) # please mypy
return member.name
@staticmethod
def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember:
assert isinstance(member, tarfile.TarInfo) # please mypy
member.mode = permissions
return member
class TarGzParser(TarParser):
compression = ':gz'
......
......@@ -293,7 +293,7 @@ class TestCorruptedFiles(unittest.TestCase):
os.remove('./tests/data/clean.epub')
def test_tar(self):
with tarfile.TarFile('./tests/data/clean.tar', 'w') as zout:
with tarfile.TarFile.open('./tests/data/clean.tar', 'w') as zout:
zout.add('./tests/data/dirty.flac')
zout.add('./tests/data/dirty.docx')
zout.add('./tests/data/dirty.jpg')
......@@ -302,6 +302,7 @@ class TestCorruptedFiles(unittest.TestCase):
tarinfo.mtime = time.time()
tarinfo.uid = 1337
tarinfo.gid = 1338
tarinfo.size = os.stat('./tests/data/dirty.png').st_size
with open('./tests/data/dirty.png', 'rb') as f:
zout.addfile(tarinfo, f)
p, mimetype = parser_factory.get_parser('./tests/data/clean.tar')
......@@ -316,3 +317,26 @@ class TestCorruptedFiles(unittest.TestCase):
with self.assertRaises(ValueError):
archive.TarParser('./tests/data/clean.tar')
os.remove('./tests/data/clean.tar')
class TestReadOnlyArchiveMembers(unittest.TestCase):
def test_onlymember_tar(self):
with tarfile.open('./tests/data/clean.tar', 'w') as zout:
zout.add('./tests/data/dirty.png')
tarinfo = tarfile.TarInfo('./tests/data/dirty.jpg')
tarinfo.mtime = time.time()
tarinfo.uid = 1337
tarinfo.mode = 0o000
tarinfo.size = os.stat('./tests/data/dirty.jpg').st_size
with open('./tests/data/dirty.jpg', 'rb') as f:
zout.addfile(tarinfo=tarinfo, fileobj=f)
p, mimetype = parser_factory.get_parser('./tests/data/clean.tar')
self.assertEqual(mimetype, 'application/x-tar')
meta = p.get_meta()
self.assertEqual(meta['./tests/data/dirty.jpg']['uid'], '1337')
self.assertTrue(p.remove_all())
p = archive.TarParser('./tests/data/clean.cleaned.tar')
self.assertEqual(p.get_meta(), {})
os.remove('./tests/data/clean.tar')
os.remove('./tests/data/clean.cleaned.tar')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment