Verified Commit b7cf7b97 by NavaL Committed by meskio

[bug] decryption will be tried for all inactive keys

parent bf9fa332
......@@ -208,46 +208,24 @@ class KeyManager(object):
raw_key, address=address, validation=validation_level)
@defer.inlineCallbacks
def get_key(self, address, private=False, active=True, fetch_remote=True):
def get_inactive_private_keys(self):
"""
Return a key bound to address.
For an active key: first, search for the key in local storage.
If it is not available, then try to fetch from nickserver.
The inactive key is fetched locally, for the case of multiple keys
for the same address. This can be used to attempt decryption
from multiple keys.
:param address: The address bound to the key.
:type address: str
:param private: Look for a private key instead of a public one?
:type private: bool
:param active: Look for the current active key
:type private: bool
:param fetch_remote: If key not found in local storage try to fetch
from nickserver
:type fetch_remote: bool
Return all inactive private keys bound to address, that can are
stored locally.
This can be used to attempt decryption from multiple keys.
:return: A Deferred which fires with an EncryptionKey bound to address,
or which fails with KeyNotFound if no key was found neither
locally or in keyserver or fail with KeyVersionError if the
key has a format not supported by this version of KeyManager
:return: A Deferred which fires the list of inactive keys sorted
according to their expiry dates.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
if active:
key = yield self._get_key(address, private, fetch_remote)
defer.returnValue(key)
all_keys = yield self.get_all_keys(private)
all_keys = yield self.get_all_keys(private=True)
inactive_keys = filter(lambda _key: not _key.is_active(), all_keys)
if inactive_keys:
inactive_keys = sorted(inactive_keys,
key=lambda _key: _key.expiry_date)
defer.returnValue(inactive_keys[-1])
def _get_key(self, address, private=False, fetch_remote=True):
inactive_keys = \
sorted(inactive_keys, key=lambda _key: _key.expiry_date)
defer.returnValue(inactive_keys)
def get_key(self, address, private=False, fetch_remote=True):
"""
Return a key bound to address.
......@@ -549,21 +527,30 @@ class KeyManager(object):
defer.returnValue((decrypted, signature))
@defer.inlineCallbacks
def decrypt_with_inactive_key(keys, original_decrypt_error):
verify_key, active_key = keys
inactive_key = yield self.get_key(address, private=True,
active=False)
if inactive_key:
def decrypt_with_inactive_keys(inactive_keys, verify_key,
original_decrypt_err):
if not inactive_keys:
# when there are no more keys to go through
raise original_decrypt_err
try:
inactive_key = inactive_keys.pop()
result = yield _decrypt([verify_key, inactive_key])
defer.returnValue(result)
raise original_decrypt_error
except keymanager_errors.DecryptError:
result = yield decrypt_with_inactive_keys(inactive_keys,
verify_key,
original_decrypt_err)
defer.returnValue(result)
@defer.inlineCallbacks
def decrypt(keys):
try:
result = yield _decrypt(keys)
except keymanager_errors.DecryptError as e:
result = yield decrypt_with_inactive_key(keys, e)
verify_key, active_key = keys
inactive_keys = yield self.get_inactive_private_keys()
result = yield decrypt_with_inactive_keys(inactive_keys,
verify_key, e)
defer.returnValue(result)
dpriv = self.get_key(address, private=True)
......
......@@ -195,22 +195,22 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertTrue(key.private)
@defer.inlineCallbacks
def test_create_and_get_two_private_keys_sets_first_key_inactive(self):
def test_get_inactive_private_key(self):
km = self._key_manager()
yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
yield km._openpgp.put_raw_key(DIFFERENT_PRIVATE_KEY, ADDRESS)
# get the key
inactive_key = yield km.get_key(ADDRESS, private=True,
active=False, fetch_remote=False)
active_key = yield km.get_key(ADDRESS, private=True,
active=True, fetch_remote=False)
inactive_keys = yield km.get_inactive_private_keys()
active_key = yield km.get_key(
ADDRESS, private=True, fetch_remote=False)
self.assertEqual(1, len(inactive_keys))
self.assertEqual(
inactive_key.fingerprint.lower(), KEY_FINGERPRINT.lower())
inactive_keys[0].fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertEqual(
active_key.fingerprint.lower(), DIFFERENT_KEY_FPR.lower())
self.assertTrue(inactive_key.private)
self.assertTrue(inactive_keys[0].private)
self.assertTrue(active_key.private)
self.assertFalse(inactive_key.is_active())
self.assertFalse(inactive_keys[0].is_active())
self.assertTrue(active_key.is_active())
@defer.inlineCallbacks
......@@ -603,10 +603,11 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
old_key = yield km.get_key(ADDRESS_EXPIRING)
new_key = yield km.regenerate_key()
retrieved_old_key = yield km.get_key(ADDRESS_EXPIRING,
private=True, active=False)
inactive_private_keys = yield km.get_inactive_private_keys()
renewed_public_key = yield km.get_key(ADDRESS_EXPIRING, private=False)
self.assertEqual(1, len(inactive_private_keys))
retrieved_old_key = inactive_private_keys[0]
self.assertEqual(old_key.fingerprint,
retrieved_old_key.fingerprint)
self.assertNotEqual(old_key.fingerprint,
......@@ -668,7 +669,7 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(signingkey.fingerprint, key.fingerprint)
@defer.inlineCallbacks
def test_keymanager_openpgp_decryption_tries_inactive_valid_key(self):
def test_keymanager_decryption_tries_inactive_valid_key(self):
km = self._key_manager()
# put raw private key
yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
......@@ -689,6 +690,21 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
self.assertEqual(signingkey.fingerprint, key.fingerprint)
@defer.inlineCallbacks
def test_decrypt_throws_error_when_all_keys_fails(self):
km = self._key_manager()
# put raw private key
yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
# renew key -- deactivate current key
yield km.regenerate_key()
# decrypt
with self.assertRaises(errors.DecryptError):
yield km.decrypt(ENCRYPTED_MESSAGE_FOR_DIFFERENT_KEY,
ADDRESS, verify=ADDRESS_2, fetch_remote=False)
@defer.inlineCallbacks
def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
km = self._key_manager()
# put raw keys
......@@ -797,3 +813,21 @@ QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
=gDzy
-----END PGP PUBLIC KEY BLOCK-----
"""
ENCRYPTED_MESSAGE_FOR_DIFFERENT_KEY = """
-----BEGIN PGP MESSAGE-----
hQEMAyIGRJSVjm17AQf/fyQrbcUKhy4Zv0UBsMFNdLj3h6YYkhkDecupmNeJzgSc
IeW8E5Un5thGpJRCF1iC3XirzybQxCEDCqVZdibXY/K0D5eQAE95m3Bc2euZN3sm
br4Ro/ybf/+0mt+cyPrvoaU/c/RKCWAXGDrTNCLe9f4UkwdiRj5tBpdC6WNEgsTD
SJfpZF5xP+NMc0cBRmSnUZHMgspbBK1OYmQurxn8vjyxDXwJuJ9sWl+FrWop3WMW
l/IMSSgyaJUjHvau6WNzRhKLujhuqyZKWo0WuJdBT0lPM0aQCJls4QVpDwE9mTZy
Vm2M4VnrxP9/IMqCrevwJXQTIKgIz9ANif+iZdHWYNLALAGgnH+45wXeguhFP1vD
x3SVIgOp8aAW7Plf5IO/bRQBs/LTvS1HWkD07WW14NJ29eMTPgoSR/lTGNMbHGYH
EgqRxJIsH93A+fN+CQoPboaEW/0hhQVf0WO/b8soxhVwZDDPMI3qGAQBwrBD3N9z
ksEUD5XNT+6mtMpTSpPr/0j0W7LjqR5QT+Bf2lUiFLH8XwekO0JK/vq5XkTydiAw
ZZBCPpoBxM/gH3cMuFafZNbqE6KDd7UziKxZCR17SrDFjrK/BLMrRKXRSnZOQNsb
WuLF0jIGxN6NiaduJ77gmrOieuBu0wKqv0iAvo8s
=G2sp
-----END PGP MESSAGE-----
"""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment