diff --git a/src/torrent.py b/src/torrent.py new file mode 100644 index 0000000000000000000000000000000000000000..df641610d3bc728cae640e0463905b256e38dd2f --- /dev/null +++ b/src/torrent.py @@ -0,0 +1,123 @@ +import os +import re +import shutil +import tempfile +import datetime +import zipfile + +from . import abstract, parser_factory + + + +class TorrentParser(abstract.AbstractParser): + mimetypes = {'application/x-bittorrent', } + whitelist = {b'announce', b'announce-list', b'info'} + + def __init__(self, filename): + super().__init__(filename) + self.__decode_func = { + ord('l'): self.__decode_list, + ord('d'): self.__decode_dict, + ord('i'): self.__decode_int + } + for i in range(0, 10): + self.__decode_func[ord(str(i))] = self.__decode_string + + self.__encode_func = { + int: self.__encode_int, + bytes: self.__encode_string, + list: self.__encode_list, + dict: self.__encode_dict, + } + + + def get_meta(self): + metadata = {} + with open(self.filename, 'rb') as f: + d = self.__bdecode(f.read()) + for k,v in d.items(): + if k not in self.whitelist: + metadata[k.decode('utf-8')] = v + return metadata + + + def remove_all(self): + cleaned = dict() + with open(self.filename, 'rb') as f: + d = self.__bdecode(f.read()) + for k,v in d.items(): + if k in self.whitelist: + cleaned[k] = v + with open(self.output_filename, 'wb') as f: + f.write(self.__bencode(cleaned)) + return True + + def __decode_int(self, s): + s = s[1:] + next_idx = s.index(b'e') + if s.startswith(b'-0'): + raise ValueError # negative zero doesn't exist + if s.startswith(b'0') and next_idx != 1: + raise ValueError # no leading zero except for zero itself + return int(s[:next_idx]), s[next_idx+1:] + + def __decode_string(self, s): + end = s.index(b':') + str_len = int(s[:end]) + if s[0] == b'0' and end != 1: + raise ValueError + s = s[1:] # skip terminal `:` + return s[end:end+str_len], s[end+str_len:] + + def __decode_list(self, s): + r = list() + s = s[1:] # skip leading `l` + while s[0] != ord('e'): + v, s = self.__decode_func[s[0]](s) + r.append(v) + return r, s[1:] + + def __decode_dict(self, s): + r = dict() + s = s[1:] + while s[0] != ord(b'e'): + k, s = self.__decode_string(s) + r[k], s = self.__decode_func[s[0]](s) + return r, s[1:] + + def __bdecode(self, s): + try: + r, l = self.__decode_func[s[0]](s) + except (IndexError, KeyError, ValueError) as e: + print("not a valid bencoded string: %s" % e) + return None + if l != b'': + print("invalid bencoded value (data after valid prefix)") + return None + return r + + @staticmethod + def __encode_int(x): + return b'i' + bytes(str(x), 'utf-8') + b'e' + + @staticmethod + def __encode_string(x:str): + return bytes((str(len(x))), 'utf-8') + b':' + x + + def __encode_list(self, x): + ret = b'' + for i in x: + ret += self.__encode_func[type(i)](i) + return b'l' + ret + b'e' + + def __encode_dict(self, x): + ret = b'' + for k, v in sorted(x.items()): + ret += self.__encode_func[type(k)](k) + ret += self.__encode_func[type(v)](v) + return b'd' + ret + b'e' + + def __bencode(self, x): + return self.__encode_func[type(x)](x) + + diff --git a/tests/data/dirty.torrent b/tests/data/dirty.torrent new file mode 100644 index 0000000000000000000000000000000000000000..472371be2750b88a6b160360d36e5a9f9750946f Binary files /dev/null and b/tests/data/dirty.torrent differ diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index ae04dc2cd0b454bcd4da343b37d16b9e1c5cff42..ff5c196c5e48ff1b48243aca2df61d597757d574 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -6,7 +6,7 @@ import os import zipfile import tempfile -from src import pdf, images, audio, office, parser_factory +from src import pdf, images, audio, office, parser_factory, torrent class TestParserFactory(unittest.TestCase): @@ -28,6 +28,11 @@ class TestGetMeta(unittest.TestCase): "3.1415926-2.5-1.40.14 (TeX Live 2013/Debian) kpathsea " \ "version 6.1.1") + def test_torrent(self): + p = torrent.TorrentParser('./tests/data/dirty.torrent') + meta = p.get_meta() + self.assertEqual(meta['created by'], b'mktorrent 1.0') + def test_png(self): p = images.PNGParser('./tests/data/dirty.png') meta = p.get_meta() @@ -322,3 +327,19 @@ class TestCleaning(unittest.TestCase): self.assertEqual(p.get_meta(), {}) os.remove('./tests/data/clean.bmp') + + + def test_torrent(self): + shutil.copy('./tests/data/dirty.torrent', './tests/data/clean.torrent') + p = torrent.TorrentParser('./tests/data/clean.torrent') + + meta = p.get_meta() + self.assertEqual(meta, {'created by': b'mktorrent 1.0', 'creation date': 1522397702}) + + ret = p.remove_all() + self.assertTrue(ret) + + p = torrent.TorrentParser('./tests/data/clean.torrent.cleaned') + self.assertEqual(p.get_meta(), {}) + + os.remove('./tests/data/clean.torrent')