Verified Commit e5c567d7 by NavaL Committed by meskio

[feat] first phase of key regeneration

- if current key pair is expired, it'll be extended for a day first
- new key pair will be signed by the old key
parent 3eab1813
......@@ -330,6 +330,25 @@ class KeyManager(object):
d.addCallback(signal_finished)
return d
def regenerate_key(self):
"""
Regenerate a key bound to the user's address.
:return: A Deferred which fires with the generated EncryptionKey.
:rtype: Deferred
"""
def signal_finished(key):
emit_async(
catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
return key
self.log.info('Regenerating key for %s.' % self._address)
emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
d = self._openpgp.regenerate_key(self._address)
d.addCallback(signal_finished)
return d
#
# Setters/getters
#
......
......@@ -283,6 +283,9 @@ class OpenPGPKey(object):
value = str(value)
return key, value
def has_expired(self):
return self.expiry_date < datetime.now()
def __iter__(self):
return self
......
......@@ -22,10 +22,8 @@ Infrastructure for using OpenPGP keys in Key Manager.
import os
import re
import tempfile
import traceback
import io
from datetime import datetime
from multiprocessing import cpu_count
from twisted.internet import defer
......@@ -162,6 +160,41 @@ class OpenPGPScheme(object):
#
# Keys management
#
@defer.inlineCallbacks
def regenerate_key(self, address):
"""
Deactivate Current keypair,
Generate a new OpenPGP keypair bound to C{address},
and sign the new key with the old key.
:param address: The address bound to the key.
:type address: str
:return: A Deferred which fires with the new key bound to address.
:rtype: Deferred
"""
leap_assert(is_address(address), 'Not an user address: %s' % address)
current_sec_key = yield self.get_key(address, private=True)
with TempGPGWrapper([current_sec_key], self._gpgbinary) as gpg:
if current_sec_key.has_expired():
temporary_extension_period = '1' # extend for 1 extra day
gpg.extend_key(current_sec_key.fingerprint,
validity=temporary_extension_period)
yield self.unactivate_key(address)
new_key = yield self.gen_key(address)
gpg.import_keys(new_key.key_data)
key_signing = yield from_thread(gpg.sign_key, new_key.fingerprint)
if key_signing.status == 'ok':
fetched_keys = gpg.list_keys(secret=False)
fetched_key = filter(lambda k: k['fingerprint'] ==
new_key.fingerprint, fetched_keys)[0]
key_data = gpg.export_keys(new_key.fingerprint, secret=False)
renewed_key = self._build_key_from_gpg(
fetched_key,
key_data,
new_key.address)
yield self.put_key(renewed_key)
defer.returnValue(new_key)
def gen_key(self, address):
"""
......@@ -382,6 +415,7 @@ class OpenPGPScheme(object):
:return: A Deferred which fires when the key is in the storage.
:rtype: Deferred
"""
def merge_and_put((keydoc, activedoc)):
if not keydoc:
return put_new_key(activedoc)
......@@ -440,6 +474,7 @@ class OpenPGPScheme(object):
(keydoc, activedoc) or None if it does not exist.
:rtype: Deferred
"""
def get_key_from_active_doc(activedoc):
if not activedoc:
return (None, None)
......@@ -654,7 +689,7 @@ class OpenPGPScheme(object):
yield self.put_key(renewed_key)
defer.returnValue(renewed_key)
except Exception as e:
logger.warn('Failed to Extend Key: %s expiration date.' % str(e))
log.warn('Failed to Extend Key: %s expiration date.' % str(e))
raise errors.KeyExpiryExtensionError(str(e))
@defer.inlineCallbacks
......@@ -844,6 +879,7 @@ class OpenPGPScheme(object):
the deletions are completed
:rtype: Deferred
"""
def log_key_doc(doc):
self.log.error("\t%s: %s" % (doc.content[KEY_UIDS_KEY],
doc.content[KEY_FINGERPRINT_KEY]))
......
......@@ -21,7 +21,7 @@ import json
import urllib
import tempfile
import pkg_resources
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
from twisted.internet import defer
from twisted.trial import unittest
......@@ -554,26 +554,27 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km.send_key.assert_called_once_with()
@defer.inlineCallbacks
def test_keymanager_extend_key_expiry_date_for_key_pair(self):
def test_key_regenerate_gets_new_expiry_date_and_signed_by_old_key(self):
km = self._key_manager(user=ADDRESS_EXPIRING)
yield km._openpgp.put_raw_key(PRIVATE_EXPIRING_KEY, ADDRESS_EXPIRING)
key = yield km.get_key(ADDRESS_EXPIRING)
old_key = yield km.get_key(ADDRESS_EXPIRING)
yield km.extend_key(validity='1w')
new_key = yield km.regenerate_key()
new_expiry_date = datetime.strptime(
KEY_EXPIRING_CREATION_DATE, '%Y-%m-%d')
new_expiry_date += timedelta(weeks=1)
today = datetime.now()
new_expiry_date = date(today.year + 1, today.month, today.day)
renewed_public_key = yield km.get_key(ADDRESS_EXPIRING)
renewed_private_key = yield km.get_key(ADDRESS_EXPIRING, private=True)
self.assertEqual(new_expiry_date.date(),
self.assertEqual(new_expiry_date,
renewed_public_key.expiry_date.date())
self.assertEqual(new_expiry_date.date(),
self.assertEqual(new_expiry_date,
renewed_private_key.expiry_date.date())
self.assertEqual(key.fingerprint, renewed_public_key.fingerprint)
self.assertEqual(key.fingerprint, renewed_private_key.fingerprint)
self.assertNotEqual(old_key.fingerprint,
renewed_public_key.fingerprint)
self.assertEqual(new_key.fingerprint, renewed_public_key.fingerprint)
self.assertIn(old_key.fingerprint[-16:], renewed_public_key.signatures)
@defer.inlineCallbacks
def test_key_extension_with_invalid_period_throws_exception(self):
......
......@@ -50,7 +50,7 @@ commands =
py.test -k 'not bench' --pep8 -x {posargs}
deps =
-r{toxinidir}/pkg/requirements-testing.pip
-egit+https://github.com/pixelated/python-gnupg.git@feat_extend_key#egg=gnupg
-egit+https://github.com/pixelated/python-gnupg.git@key_extension_and_sign#egg=gnupg
-e../leap_commondev
-e../soledad
setenv =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment