Verified Commit 0454dc35 by NavaL Committed by meskio

[feat] reset sign_used flag after regenerating key pair

- this flag is used by leap.mail to attach the new key
parent e5c567d7
......@@ -330,6 +330,7 @@ class KeyManager(object):
d.addCallback(signal_finished)
return d
@defer.inlineCallbacks
def regenerate_key(self):
"""
Regenerate a key bound to the user's address.
......@@ -345,9 +346,12 @@ class KeyManager(object):
self.log.info('Regenerating key for %s.' % self._address)
emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
d = self._openpgp.regenerate_key(self._address)
d.addCallback(signal_finished)
return d
new_key = yield self._openpgp.regenerate_key(self._address)
yield self._openpgp.reset_all_keys_sign_used()
emit_async(
catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
defer.returnValue(new_key)
#
# Setters/getters
......
......@@ -184,7 +184,7 @@ class OpenPGPKey(object):
return False
def merge(self, newkey):
def merge(self, newkey, key_renewal=False):
if newkey.fingerprint != self.fingerprint:
self.log.critical(
"Can't put a key whith the same key_id and different "
......@@ -216,7 +216,8 @@ class OpenPGPKey(object):
if newkey.last_audited_at > self.last_audited_at:
self.validation = newkey.last_audited_at
self.encr_used = newkey.encr_used or self.encr_used
self.sign_used = newkey.sign_used or self.sign_used
if not key_renewal:
self.sign_used = newkey.sign_used or self.sign_used
self.refreshed_at = datetime.now()
def get_json(self):
......
......@@ -30,6 +30,7 @@ from twisted.internet import defer
from twisted.internet.threads import deferToThread
from twisted.logger import Logger
from leap.bitmask.keymanager.migrator import KeyDocumentsMigrator
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.bitmask.keymanager import errors
from leap.bitmask.keymanager.wrapper import TempGPGWrapper
......@@ -120,7 +121,6 @@ class OpenPGPScheme(object):
self._wait_indexes("get_key", "put_key", "get_all_keys")
def _migrate_documents_schema(self, _):
from leap.bitmask.keymanager.migrator import KeyDocumentsMigrator
migrator = KeyDocumentsMigrator(self._soledad)
return migrator.migrate()
......@@ -149,6 +149,7 @@ class OpenPGPScheme(object):
d.addCallback(lambda _: self.stored[method](*args, **kw))
self.waiting.append(d)
return d
return wrapper
for method in methods:
......@@ -175,12 +176,14 @@ class OpenPGPScheme(object):
"""
leap_assert(is_address(address), 'Not an user address: %s' % address)
current_sec_key = yield self.get_key(address, private=True)
current_pub_key = yield self.get_key(address, private=False)
with TempGPGWrapper([current_sec_key], self._gpgbinary) as gpg:
if current_sec_key.has_expired():
temporary_extension_period = '1' # extend for 1 extra day
gpg.extend_key(current_sec_key.fingerprint,
validity=temporary_extension_period)
yield self.unactivate_key(address)
yield self.unactivate_key(address) # only one priv key allowed
yield self.delete_key(current_pub_key)
new_key = yield self.gen_key(address)
gpg.import_keys(new_key.key_data)
key_signing = yield from_thread(gpg.sign_key, new_key.fingerprint)
......@@ -405,7 +408,7 @@ class OpenPGPScheme(object):
d.addCallback(put_key, openpgp_privkey)
return d
def put_key(self, key):
def put_key(self, key, key_renewal=False):
"""
Put C{key} in local storage.
......@@ -425,7 +428,7 @@ class OpenPGPScheme(object):
active_content = activedoc.content
oldkey = build_key_from_dict(keydoc.content, active_content)
key.merge(oldkey)
key.merge(oldkey, key_renewal)
keydoc.set_json(key.get_json())
d = self._soledad.put_doc(keydoc)
d.addCallback(put_active, activedoc)
......@@ -578,6 +581,27 @@ class OpenPGPScheme(object):
active_doc = yield self._get_active_doc_from_address(address, False)
yield self._soledad.delete_doc(active_doc)
@defer.inlineCallbacks
def reset_all_keys_sign_used(self):
"""
Reset sign_used flag for all keys in storage, to False...
to indicate that the key pair has not interacted with all
keys in the key ring yet.
This should only be used when regenerating the key pair.
"""
all_keys = yield self.get_all_keys(private=False)
deferreds = []
@defer.inlineCallbacks
def reset_sign_used(key):
key.sign_used = False
yield self.put_key(key, key_renewal=True)
for open_pgp_key in all_keys:
deferreds.append(reset_sign_used(open_pgp_key))
yield defer.gatherResults(deferreds)
#
# Data encryption, decryption, signing and verifying
#
......
......@@ -577,6 +577,18 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
self.assertIn(old_key.fingerprint[-16:], renewed_public_key.signatures)
@defer.inlineCallbacks
def test_key_regenerate_resets_all_public_key_sign_used(self):
km = self._key_manager(user=ADDRESS_EXPIRING)
yield km._openpgp.put_raw_key(PRIVATE_EXPIRING_KEY, ADDRESS_EXPIRING)
yield km._openpgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
km._openpgp.reset_all_keys_sign_used = mock.Mock()
yield km.regenerate_key()
km._openpgp.reset_all_keys_sign_used.assert_called_once()
@defer.inlineCallbacks
def test_key_extension_with_invalid_period_throws_exception(self):
km = self._key_manager(user=ADDRESS_EXPIRING)
......
......@@ -100,6 +100,33 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
yield self._assert_key_not_found(pgp, ADDRESS)
@inlineCallbacks
def test_reset_sign_used_for_all_keys(self):
pgp = openpgp.OpenPGPScheme(
self._soledad, gpgbinary=self.gpg_binary_path)
yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
yield pgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
pubkey = yield pgp.get_key(ADDRESS)
pubkey.sign_used = True
yield pgp.put_key(pubkey)
pubkey2 = yield pgp.get_key(ADDRESS_2)
pubkey2.sign_used = True
yield pgp.put_key(pubkey2)
yield pgp.reset_all_keys_sign_used()
pubkey_refetched = yield pgp.get_key(ADDRESS)
pubkey2_refetched = yield pgp.get_key(ADDRESS_2)
self.assertEqual(False, pubkey_refetched.sign_used)
self.assertEqual(False, pubkey2_refetched.sign_used)
self.assertEqual(pubkey.fingerprint, pubkey_refetched.fingerprint)
self.assertEqual(pubkey.key_data, pubkey_refetched.key_data)
self.assertEqual(pubkey2.fingerprint, pubkey2_refetched.fingerprint)
self.assertEqual(pubkey2.key_data, pubkey2_refetched.key_data)
@inlineCallbacks
def test_openpgp_encrypt_decrypt(self):
data = 'data'
pgp = openpgp.OpenPGPScheme(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment