Commit d2d3a243 authored by Kali Kaneko's avatar Kali Kaneko
Browse files

move symmetric decryption of docs to be db-based too

parent 1a60f361
......@@ -3,6 +3,9 @@ simplejson
u1db
scrypt
pycryptopp
cchardet
taskthread
zope.proxy
#
# leap deps
......@@ -21,5 +24,3 @@ oauth
# pysqlite should not be a dep, see #2945
pysqlite
cchardet
taskthread
......@@ -324,7 +324,7 @@ class Soledad(object):
self._bootstrap() # might raise BootstrapSequenceError()
# initialize syncing queue encryption pool
self._sync_pool = SyncEncrypterPool(self._crypto, self._sync_db)
self._sync_enc_pool = SyncEncrypterPool(self._crypto, self._sync_db)
self._sync_watcher = TimerTask(self._encrypt_syncing_docs, delay=10)
self._sync_watcher.start()
......@@ -1145,7 +1145,7 @@ class Soledad(object):
if self._db:
return self._db.resolve_doc(doc, conflicted_doc_revs)
def sync(self):
def sync(self, decrypt_inline=False):
"""
Synchronize the local encrypted replica with a remote replica.
......@@ -1155,11 +1155,15 @@ class Soledad(object):
:param url: the url of the target replica to sync with
:type url: str
:return: the local generation before the synchronisation was
performed.
:param decrypt_inline: Whether to do the decryption of received
messages inline or not.
:type decrypt_inline: bool
:return: The local generation before the synchronisation was
performed.
:rtype: str
"""
#return
print "SYNC: inline? ", decrypt_inline
local_gen = None
if self._db:
# acquire lock before attempt to sync
......@@ -1176,8 +1180,9 @@ class Soledad(object):
local_gen = self._db.sync(
urlparse.urljoin(
self.server_url, 'user-%s' % self._uuid),
creds=self._creds, autocreate=True)
#signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
creds=self._creds, autocreate=True,
decrypt_inline=decrypt_inline)
signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
except Exception as exc:
logger.error("error during soledad sync")
logger.exception(exc)
......@@ -1388,7 +1393,7 @@ class Soledad(object):
return self._passphrase.encode('utf-8')
#
# Symmetric encryption / decryption
# Symmetric encryption of syncing docs
#
def _encrypt_syncing_docs(self):
......@@ -1396,6 +1401,8 @@ class Soledad(object):
Process the syncing queue and send the documents there
to be encrypted in the sync db. They will be read by the
SoledadSyncTarget during the sync_exchange.
Called periodical from the TimerTask self._sync_watcher.
"""
lock = self.encrypting_lock
# optional wait flag used to avoid blocking
......@@ -1406,7 +1413,7 @@ class Soledad(object):
try:
while not queue.empty():
doc = queue.get_nowait()
self._sync_pool.encrypt_doc(doc)
self._sync_enc_pool.encrypt_doc(doc)
except Exception as exc:
logger.error("Error while encrypting docs to sync")
logger.exception(exc)
......
# -*- coding: utf-8 -*-
# crypto.py
# Copyright (C) 2013,2014 LEAP
# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -24,12 +24,15 @@ import hashlib
import json
import logging
import multiprocessing
import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
from zope.proxy import sameProxiedObjects
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.crypto import (
......@@ -346,14 +349,13 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
})
# XXX change to docstr...
def decrypt_doc(crypto, doc):
def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
"""
Decrypt C{doc}'s content.
Return the JSON string representation of the document's decrypted content.
The content of the document should have the following structure:
The passed doc_dict argument should have the following structure:
{
ENC_JSON_KEY: '<enc_blob>',
......@@ -369,52 +371,67 @@ def decrypt_doc(crypto, doc):
EncryptionSchemes.SYMKEY and C{enc_method} is
EncryptionMethods.AES_256_CTR.
:param crypto: A SoledadCryto instance to perform the encryption.
:type crypto: leap.soledad.crypto.SoledadCrypto
:param doc: The document to be decrypted.
:type doc: SoledadDocument
:param doc_dict: The content of the document to be decrypted.
:type doc_dict: dict
:param doc_id: The document id.
:type doc_id: str
:param doc_rev: The document revision.
:type doc_rev: str
:param key: The key used to encrypt ``data`` (must be 256 bits long).
:type key: str
:param secret:
:type secret:
:return: The JSON serialization of the decrypted content.
:rtype: str
"""
soledad_assert(doc.is_tombstone() is False)
soledad_assert(ENC_JSON_KEY in doc.content)
soledad_assert(ENC_SCHEME_KEY in doc.content)
soledad_assert(ENC_METHOD_KEY in doc.content)
soledad_assert(MAC_KEY in doc.content)
soledad_assert(MAC_METHOD_KEY in doc.content)
# TODO where should we move these assertions, now that we're passed the
# string?
#soledad_assert(doc.is_tombstone() is False)
soledad_assert(ENC_JSON_KEY in doc_dict)
soledad_assert(ENC_SCHEME_KEY in doc_dict)
soledad_assert(ENC_METHOD_KEY in doc_dict)
soledad_assert(MAC_KEY in doc_dict)
soledad_assert(MAC_METHOD_KEY in doc_dict)
# verify MAC
ciphertext = binascii.a2b_hex( # content is stored as hex.
doc.content[ENC_JSON_KEY])
doc_dict[ENC_JSON_KEY])
mac = mac_doc(
doc.doc_id, doc.rev,
doc_id, doc_rev,
ciphertext,
doc.content[MAC_METHOD_KEY], crypto.secret)
doc_dict[MAC_METHOD_KEY], secret)
# we compare mac's hashes to avoid possible timing attacks that might
# exploit python's builtin comparison operator behaviour, which fails
# immediatelly when non-matching bytes are found.
doc_mac_hash = hashlib.sha256(
binascii.a2b_hex( # the mac is stored as hex
doc.content[MAC_KEY])).digest()
doc_dict[MAC_KEY])).digest()
calculated_mac_hash = hashlib.sha256(mac).digest()
if doc_mac_hash != calculated_mac_hash:
raise WrongMac('Could not authenticate document\'s contents.')
# decrypt doc's content
enc_scheme = doc.content[ENC_SCHEME_KEY]
enc_scheme = doc_dict[ENC_SCHEME_KEY]
plainjson = None
if enc_scheme == EncryptionSchemes.SYMKEY:
enc_method = doc.content[ENC_METHOD_KEY]
enc_method = doc_dict[ENC_METHOD_KEY]
if enc_method == EncryptionMethods.AES_256_CTR:
soledad_assert(ENC_IV_KEY in doc.content)
plainjson = crypto.decrypt_sym(
ciphertext,
crypto.doc_passphrase(doc.doc_id),
soledad_assert(ENC_IV_KEY in doc_dict)
plainjson = decrypt_sym(
ciphertext, key,
method=enc_method,
iv=doc.content[ENC_IV_KEY])
iv=doc_dict[ENC_IV_KEY])
else:
raise UnknownEncryptionMethod(enc_method)
else:
raise UnknownEncryptionScheme(enc_scheme)
print "PLAIN: ", plainjson
return plainjson
......@@ -451,6 +468,9 @@ class SyncEncryptDecryptPool(object):
:param sync_db: a database connection handle
:type sync_db: handle
:param insert_doc_cb: Optional callback for inserting doc.
:type insert_doc_cb: callable
"""
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
......@@ -492,9 +512,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
def encrypt_doc_cb(self, result):
doc_id, doc_rev, content = result
self.insert_encrypted_doc(doc_id, doc_rev, content)
self.insert_encrypted_local_doc(doc_id, doc_rev, content)
def insert_encrypted_doc(self, doc_id, doc_rev, content):
def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
......@@ -512,19 +532,216 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_db.commit()
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
decrypted_content = decrypt_doc_dict(
content, doc_id, doc_rev, key, secret)
return doc_id, doc_rev, decrypted_content, gen, trans_id
def get_insertable_docs_by_gen(expected, got):
"""
Return a list of documents ready to be inserted. This list is computed
by aligning the expected list with the already gotten docs, and returning
the maximum number of docs that can be processed in the expected order
before finding a gap.
:param expected: A list of generations to be inserted.
:type expected: list
:param got: A dictionary whose values are the docs to be inserted.
:type got: dict
"""
ordered = [got.get(i) for i in expected]
if None in ordered:
return ordered[:ordered.index(None)]
else:
return ordered
class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Pool of workers that spawn subprocesses to execute the symmetric decryption
of documents that were received.
The decryption of the received documents is done in two steps:
1. All the encrypted docs are collected, together with their generation
and transaction-id
2. The docs are enqueued for decryption. When completed, they are
inserted following the generation order.
"""
WORKERS = 10
TABLE_NAME = "docs_received"
FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
def decrypt_doc(self, doc_id, rev):
write_encrypted_lock = threading.Lock()
def __init__(self, *args, **kwargs):
"""
Initialize the decrypter pool, and setup a dict for putting the
results of the decrypted docs until they are picked by the insert
routine that gets them in order.
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
SyncEncryptDecryptPool.__init__(self, *args)
self.decrypted_docs = {}
def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
Insert a received message with encrypted content, to be decrypted later
on.
"""
docstr = json.dumps(content)
c = self._sync_db.cursor()
sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
c.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
self._sync_db.commit()
def delete_encrypted_received_doc(self, doc_id, doc_rev):
"""
Delete a encrypted received doc after it was inserted into the local
db.
:param doc_id: Document ID.
:type doc_id: str
:param doc_rev: Document revision.
:type doc_rev: str
"""
c = self._sync_db.cursor()
sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
self.TABLE_NAME,)
c.execute(sql_del, (doc_id, doc_rev))
self._sync_db.commit()
def decrypt_doc(self, doc_id, rev, source_replica_uid):
"""
Symmetrically decrypt a document.
:param doc: The document with contents to be encrypted.
:type doc: SoledadDocument
:param doc_id: The ID for the document with contents to be encrypted.
:type doc: str
:param rev: The revision of the document.
:type rev: str
:param source_replica_uid:
:type source_replica_uid: str
"""
self.source_replica_uid = source_replica_uid
if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
None):
print self._insert_doc_cb
logger.warning("No insert_doc callback, skipping decryption.")
return
# XXX move to get_doc function...
c = self._sync_db.cursor()
sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
self.TABLE_NAME,)
c.execute(sql, (doc_id, rev))
res = c.fetchone()
if res is None:
logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
return
doc_id, rev, docstr, gen, trans_id = res
content = json.loads(docstr)
key = self._crypto.doc_passphrase(doc_id)
secret = self._crypto.secret
args = doc_id, rev, content, gen, trans_id, key, secret
try:
self._pool.apply_async(decrypt_doc_task, args,
callback=self.decrypt_doc_cb)
except Exception as exc:
logger.exception(exc)
def decrypt_doc_cb(self, result):
"""
Temporarily store the decryption result in a dictionary where it will
be picked by process_decrypted.
:param result: the result of the decryption routine.
:type result: tuple
"""
doc_id, rev, content, gen, trans_id = result
self.decrypted_docs[gen] = result
def get_docs_by_generation(self):
"""
Get all documents in the received table from the sync db,
ordered by generation.
:return: list of doc_id, rev, generation
"""
c = self._sync_db.cursor()
sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
self.TABLE_NAME,)
c.execute(sql)
return c.fetchall()
def count_received_encrypted_docs(self):
"""
Count how many documents we have in the table for received and
encrypted docs.
:return: The count of documents.
:rtype: int
"""
c = self._sync_db.cursor()
sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
c.execute(sql)
res = c.fetchone()
print "res"
if res is not None:
print ">>>>>>>>>> GOT %s received encrypted docs" % res[0]
return res[0]
else:
return 0
def decrypt_received_docs(self, source_replica_uid):
"""
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
docs_by_generation = self.get_docs_by_generation()
for doc_id, rev, gen in docs_by_generation:
self.decrypt_doc(doc_id, rev, source_replica_uid)
def process_decrypted(self):
"""
Process the already decrypted documents, and insert as many documents
as can be taken from the expected order without finding a gap.
"""
# Acquire the lock to avoid processing while we're still
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
with self.write_encrypted_lock:
docs = self.get_docs_by_generation()
expected = [gen for doc_id, rev, gen in docs]
docs_to_insert = get_insertable_docs_by_gen(
expected, self.decrypted_docs)
for doc_fields in docs_to_insert:
self.insert_decrypted_local_doc(*doc_fields)
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
Insert the decrypted document into the local sqlcipher database.
Makes use of the passed callback `return_doc_cb` passed to the caller
by u1db sync.
"""
print "TRY TO INSERT GEN --->", gen
# could pass source_replica in params for callback chain
insert_fun = self._insert_doc_cb[self.source_replica_uid]
try:
doc = SoledadDocument(doc_id, doc_rev, content)
insert_fun(doc, int(gen), trans_id)
except Exception as exc:
logger.error("Error while inserting decrypted doc into local db")
logger.exception(exc)
else:
# If no errors found, remove it from the local temporary dict
# and from the received database.
self.decrypted_docs.pop(gen)
self.delete_encrypted_received_doc(doc_id, doc_rev)
......@@ -54,6 +54,7 @@ from u1db.sync import Synchronizer
from u1db import errors as u1db_errors
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
......@@ -339,7 +340,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
crypto=crypto, raw_key=raw_key, cipher=cipher,
kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
def sync(self, url, creds=None, autocreate=True):
def sync(self, url, creds=None, autocreate=True, decrypt_inline=False):
"""
Synchronize documents with remote replica exposed at url.
......@@ -355,12 +356,21 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: int
"""
print "***********************"
print "SQLCIPHER: sync started"
print "SQLCIPHER: sync started. inline?", decrypt_inline
if not self.syncer:
self._create_syncer(url, creds=creds)
old_decrypt_inline = self.syncer.sync_target.decrypt_inline
print "SETTING TARGET decrypt_inline to ", decrypt_inline
self.syncer.sync_target.set_decrypt_inline(decrypt_inline)
try:
res = self.syncer.sync(autocreate=autocreate)
except PendingReceivedDocsSyncError:
logger.warning("Local sync db is not clear, skipping sync...")
return
except httplib.CannotSendRequest:
# raised when you reuse httplib.HTTP object for new request
# while you havn't called its getresponse()
......@@ -371,10 +381,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._syncer = None
self._create_syncer(url, creds=creds)
print "SQLCIPHER: syncer created, about to sync..."
print "SETTING TARGET decrypt_inline to ", decrypt_inline
self.syncer.sync_target.set_decrypt_inline(decrypt_inline)
res = self.syncer.sync(autocreate=autocreate)
except Exception:
logger.error("error SQLITE sync")
raise
finally:
# restore the original decrypt inline behav
self.syncer.sync_target.set_decrypt_inline(old_decrypt_inline)
print "SQLCIPHER: sync DONE"
return res
......
......@@ -22,30 +22,33 @@ import cStringIO
import gzip
import logging
import os
import re
import sqlite3
import urllib
import simplejson as json
from collections import defaultdict
from time import sleep
import simplejson as json
from taskthread import TimerTask
from u1db.remote import utils, http_errors
from u1db.errors import BrokenSyncStream
from u1db import errors
from u1db.remote.http_target import HTTPSyncTarget
from u1db.remote.http_client import _encode_query_parameter
from zope.proxy import ProxyBase
from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted, decrypt_doc
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_docstr, decrypt_doc_dict
from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.common.check import leap_check
logger = logging.getLogger(__name__)
#
# Exceptions
#
def _gunzip(data):
"""
......@@ -65,10 +68,14 @@ def _gunzip(data):
return data
class PendingReceivedDocsSyncError(Exception):
pass
#
# SoledadSyncTarget
#
class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
A SyncTarget that encrypts data before sending and decrypts data after
......@@ -80,6 +87,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
written to the main database.
"""
# will later keep a reference to the insert-doc callback
# passed to sync_exchange
_insert_doc_cb = defaultdict(lambda: ProxyBase(None))
#
# Modified HTTPSyncTarget methods.
#
......@@ -109,10 +120,32 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
print "URL : ", url
self.source_replica_uid = re.findall("user-([0-9a-fA-F]+)", url)[0]
print "uid -->", self.source_replica_uid
self._sync_db = None
if sync_db_path is not None:
self._init_sync_db(sync_db_path)
# whether to bypass the received messages decryption deferral
self._decrypt_inline = False
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto, self._sync_db,
insert_doc_cb=self._insert_doc_cb)
self._sync_watcher = TimerTask(
self._decrypt_syncing_received_docs, delay=10)
self._sync_watcher.start()
def set_decrypt_inline(self, value):
self._decrypt_inline = value
@property
def decrypt_inline(self):
return self._decrypt_inline
@staticmethod
def connect(url, crypto=None):
return SoledadSyncTarget(url, crypto=crypto)
......@@ -151,31 +184,57 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise BrokenSyncStream
data = parts[1:-1]
comma = False