diff --git a/docker-compose-loadtest.yml b/docker-compose-loadtest.yml index b9f36251b3a03e0783b57bac11b142c02d6d21c2..f271a255edf433ab37cdb91e6930616d7c468e96 100644 --- a/docker-compose-loadtest.yml +++ b/docker-compose-loadtest.yml @@ -33,6 +33,11 @@ volumes: loadtest_prometheus_data: services: + cache: + image: redis:6 + container_name: signalc_cache + ports: + - 0.0.0.0:6379:6379 db: image: postgres:12 @@ -52,11 +57,13 @@ services: environment: SIGNALC_ENV: ${SIGNALC_ENV:-load} SIGNALC_DB_NAME: loadtest_receiver_signalc + SIGNALC_CACHE_NAME: receiver GRADLE_USER_HOME: /home/gradle/.gradle LOG_LEVEL: ${LOG_LEVEL:-debug} DEBUG_MODE: ${DEBUG_MODE:-0} DB_HOST: db depends_on: + - cache - db ports: - 0.0.0.0:5005:5005 # for remote debugger @@ -160,11 +167,13 @@ services: environment: SIGNALC_ENV: ${SIGNALC_ENV:-load} SIGNALC_DB_NAME: loadtest_sender_signalc + SIGNALC_CACHE_NAME: sender GRADLE_USER_HOME: /home/gradle/.gradle LOG_LEVEL: ${LOG_LEVEL:-debug} DEBUG_MODE: ${DEBUG_MODE:-0} DB_HOST: db depends_on: + - cache - db ports: - 0.0.0.0:5006:5005 # for remote debugger diff --git a/docker-compose-sc.yml b/docker-compose-sc.yml index 88e5e22e488252e9d6de60cd42fef9271ebde800..7a997012886dbf4ef83b1f7cd4ca4317fc7eddcd 100644 --- a/docker-compose-sc.yml +++ b/docker-compose-sc.yml @@ -92,6 +92,12 @@ services: - ./bin/db/:/bin/db/ environment: POSTGRES_HOST_AUTH_METHOD: trust + + cache: + image: redis:6 + container_name: signalc_cache + ports: + - 0.0.0.0:6379:6379 ngrok: container_name: signalboost_ngrok # todo: this should just be ngrok? diff --git a/signalc/build.gradle.kts b/signalc/build.gradle.kts index 794140ef0aa33932fcd9133d208696ee37d0d95d..d88818430197122875f748455f4de09087f4198f 100644 --- a/signalc/build.gradle.kts +++ b/signalc/build.gradle.kts @@ -78,6 +78,7 @@ object Versions { const val kotlin = "1.4.30" const val h2 = "1.4.199" const val hikariCp = "4.0.3" + const val lettuce = "6.1.2.RELEASE" const val libsignal = "2.15.3_unofficial_19_sb_3" const val liquibase = "4.2.2" const val liquibasePlugin = "2.0.4" @@ -127,6 +128,8 @@ dependencies { implementation("io.prometheus:simpleclient_hotspot:0.9.0") implementation("io.prometheus:simpleclient_httpserver:0.9.0") + implementation("io.lettuce:lettuce-core:${Versions.lettuce}") + // migrations implementation("org.liquibase:liquibase-core:${Versions.liquibase}") implementation("org.liquibase:liquibase-gradle-plugin:${Versions.liquibasePlugin}") diff --git a/signalc/src/main/kotlin/info/signalboost/signalc/Application.kt b/signalc/src/main/kotlin/info/signalboost/signalc/Application.kt index 5a929b4a21176c44eb39a5308e6e3dfd8df14261..4a8f569029c9256c660c385ee85a00711d726b4c 100644 --- a/signalc/src/main/kotlin/info/signalboost/signalc/Application.kt +++ b/signalc/src/main/kotlin/info/signalboost/signalc/Application.kt @@ -8,6 +8,9 @@ import info.signalboost.signalc.logic.* import info.signalboost.signalc.metrics.Metrics import info.signalboost.signalc.store.AccountStore import info.signalboost.signalc.store.ProtocolStore +import io.lettuce.core.RedisClient +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.api.sync.RedisCommands import io.mockk.coEvery import io.mockk.mockk import kotlinx.coroutines.* @@ -152,10 +155,14 @@ class Application(val config: Config.App){ lateinit var protocolStore: ProtocolStore private lateinit var dataSource: HikariDataSource - private val db by lazy { + val db by lazy { Database.connect(dataSource) } + private lateinit var redisClient: RedisClient + private lateinit var connection: StatefulRedisConnection<String, String> + private lateinit var cacheClient: RedisCommands<String, String> + /************** * LIFECYCLE *************/ @@ -177,13 +184,19 @@ class Application(val config: Config.App){ } ) + private fun initializeCache(): RedisCommands<String, String> { + redisClient = RedisClient.create("redis://cache:6379/0") + connection = redisClient.connect() + return connection.sync() + } + private inline fun <reified T: Any>initializeStore( component: KClass<T>, mockAnswers: T.() -> Unit = {} ): T = if(config.mocked.contains(component)) mockk(block = mockAnswers) - else (component.primaryConstructor!!::call)(arrayOf(db)) + else (component.primaryConstructor!!::call)(arrayOf(db, cacheClient)) private inline fun <reified T: Any>initializeColdComponent( @@ -243,6 +256,7 @@ class Application(val config: Config.App){ // storage resources dataSource = initializeDataSource(Mocks.dataSource) + cacheClient = initializeCache() accountStore = initializeStore(AccountStore::class) protocolStore = initializeStore(ProtocolStore::class, Mocks.protocolStore) @@ -304,6 +318,8 @@ class Application(val config: Config.App){ // then shutdown all resources... socketServer.stop() dataSource.closeQuietly() + connection.close() + redisClient.shutdown() logger.info { "...application stopped!"} logger.info { "<@3<@3<@3<@3<@3<@3<@3<@3"} diff --git a/signalc/src/main/kotlin/info/signalboost/signalc/store/AccountStore.kt b/signalc/src/main/kotlin/info/signalboost/signalc/store/AccountStore.kt index 5fe5bb86dc0605c62fe6b7ed73b59442b5e2cfb4..631fa35aacf415c63ba26da17cc6887b27402c8e 100644 --- a/signalc/src/main/kotlin/info/signalboost/signalc/store/AccountStore.kt +++ b/signalc/src/main/kotlin/info/signalboost/signalc/store/AccountStore.kt @@ -6,12 +6,13 @@ import info.signalboost.signalc.model.Account import info.signalboost.signalc.model.NewAccount import info.signalboost.signalc.model.RegisteredAccount import info.signalboost.signalc.model.VerifiedAccount +import io.lettuce.core.api.sync.RedisCommands import mu.KLogging import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction import java.util.* -class AccountStore(private val db: Database) { +class AccountStore(private val db: Database, private val cache: RedisCommands<String, String>) { companion object: KLogging() enum class Status(val asString: String) { diff --git a/signalc/src/main/kotlin/info/signalboost/signalc/store/ProtocolStore.kt b/signalc/src/main/kotlin/info/signalboost/signalc/store/ProtocolStore.kt index 540b4bd9dbdf2f909b810368bfbc7f0eb6f733c6..0c49731e47649a9d08991b0d0d510c8a16a2f61e 100644 --- a/signalc/src/main/kotlin/info/signalboost/signalc/store/ProtocolStore.kt +++ b/signalc/src/main/kotlin/info/signalboost/signalc/store/ProtocolStore.kt @@ -4,13 +4,12 @@ package info.signalboost.signalc.store import info.signalboost.signalc.db.* import info.signalboost.signalc.db.AccountWithAddress.Companion.deleteByAddress import info.signalboost.signalc.db.AccountWithAddress.Companion.findByAddress -import info.signalboost.signalc.db.AccountWithAddress.Companion.updateByAddress import info.signalboost.signalc.db.Identities.identityKeyBytes -import info.signalboost.signalc.db.Identities.isTrusted import info.signalboost.signalc.db.Identities.name -import info.signalboost.signalc.db.Sessions.sessionBytes import info.signalboost.signalc.util.KeyUtil import info.signalboost.signalc.model.Account +import io.lettuce.core.api.sync.RedisCommands +import mu.KLoggable import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.transaction import org.whispersystems.libsignal.IdentityKey @@ -19,11 +18,13 @@ import org.whispersystems.libsignal.InvalidKeyException import org.whispersystems.libsignal.SignalProtocolAddress import org.whispersystems.libsignal.protocol.CiphertextMessage import org.whispersystems.libsignal.state.* +import org.whispersystems.libsignal.util.Hex import org.whispersystems.signalservice.api.SignalServiceProtocolStore import org.whispersystems.signalservice.api.push.SignalServiceAddress +import java.util.* -class ProtocolStore(private val db: Database) { - fun of(account: Account): AccountProtocolStore = AccountProtocolStore(db, account.username) +class ProtocolStore constructor(private val db: Database, private val cache: RedisCommands<String, String>) { + fun of(account: Account): AccountProtocolStore = AccountProtocolStore(db, account.username, cache) fun countOwnIdentities(): Long = transaction(db) { OwnIdentities.selectAll().count() } @@ -31,8 +32,13 @@ class ProtocolStore(private val db: Database) { class AccountProtocolStore( private val db: Database, private val accountId: String, + private val cache: RedisCommands<String, String> ): SignalServiceProtocolStore { + companion object: Any(), KLoggable { + override val logger = logger() + } + /********* IDENTITIES *********/ override fun getIdentityKeyPair(): IdentityKeyPair = @@ -77,51 +83,67 @@ class ProtocolStore(private val db: Database) { } } - override fun saveIdentity(address: SignalProtocolAddress, identityKey: IdentityKey): Boolean = - // Insert or update an idenity key and: - // - trust the first identity key seen for a given address - // - deny trust for subsequent identity keys for same address + private fun cacheName(): String = System.getenv("SIGNALC_CACHE_NAME") + + private fun hashIdentityKey(accountId: String, address: SignalProtocolAddress): String { + return "${cacheName()}-identity-$accountId-${address.name}-${address.deviceId}" + } + + private fun hashIdentityValue(identityKey: IdentityKey, isTrusted: Boolean): String { + val encodedIdentityKey = Hex.toHexString(identityKey.serialize()) + val encodedIsTrusted = if(isTrusted) "t" else "f" + return "${encodedIdentityKey}-${encodedIsTrusted}" + } + + private fun parseIdentityValue(value: String): Pair<ByteArray, Boolean> { + val values = value.split("-") + val decodedIdentityKey = Hex.fromStringCondensed(values[0]) + val decodedIsTrusted = "t" == values[1] + return Pair(decodedIdentityKey, decodedIsTrusted) + } + + override fun saveIdentity(address: SignalProtocolAddress, identityKey: IdentityKey): Boolean { + // Insert or update an idenity key and: + // - trust the first identity key seen for a given address + // - deny trust for subsequent identity keys for same address // Returns true if this save was an update to an existing record, false otherwise - transaction(db) { - Identities.findByAddress(accountId, address) - ?.let { existingKey -> - Identities.updateByAddress(accountId, address) { - // store the new existingKey key in all cases - it[identityKeyBytes] = identityKey.serialize() - // only trust it if it matches the existing key - it[isTrusted] = existingKey[identityKeyBytes] contentEquals identityKey.serialize() - } - true - } ?: run { - Identities.insert { - it[accountId] = this@AccountProtocolStore.accountId - it[name] = address.name - it[deviceId] = address.deviceId - it[identityKeyBytes] = identityKey.serialize() - } - false - } + + logger.info { "saveIdentity for $accountId:${address.name}" } + + cache.get(hashIdentityKey(accountId, address))?.let { + val (cachedKey, _) = parseIdentityValue(it) + val shouldTrust = cachedKey contentEquals identityKey.serialize() + cache.set(hashIdentityKey(accountId, address), hashIdentityValue(identityKey, shouldTrust)) + return true } + cache.set(hashIdentityKey(accountId, address), hashIdentityValue(identityKey, true)) + return false + } override fun isTrustedIdentity( address: SignalProtocolAddress, identityKey: IdentityKey, direction: IdentityKeyStore.Direction - ): Boolean = transaction(db) { + ): Boolean { + logger.info { "isTrustedIdentity for $accountId:${address.name}" } + // trust a key if... - Identities.findByAddress(accountId, address)?.let{ - // it matches a key we have seen before - it[identityKeyBytes] contentEquals identityKey.serialize() && - // and we have not flagged it as untrusted - it[isTrusted] - } ?: true // or it is the first key we ever seen for a person (TOFU!) + cache.get(hashIdentityKey(accountId, address))?.let { + val (cachedKey, cachedIsTrusted) = parseIdentityValue(it) + // it matches a key we have seen before and we have not flagged it as untrusted + return cachedKey contentEquals identityKey.serialize() && cachedIsTrusted + } + // or it is the first key we ever seen for a person (TOFU!) + return true } - override fun getIdentity(address: SignalProtocolAddress): IdentityKey? = - transaction(db) { + override fun getIdentity(address: SignalProtocolAddress): IdentityKey? { + logger.info { "getIdentity for $accountId:${address.name}" } + return transaction(db) { Identities.findByAddress(accountId, address)?.let{ IdentityKey(it[identityKeyBytes], 0) } } + } fun removeIdentity(address: SignalProtocolAddress) { transaction(db) { @@ -238,14 +260,20 @@ class ProtocolStore(private val db: Database) { /********* SESSIONS *********/ - override fun loadSession(address: SignalProtocolAddress): SessionRecord = - transaction(db) { - Sessions.findByAddress(accountId, address)?.let { - SessionRecord(it[Sessions.sessionBytes]) - } ?: SessionRecord() + private fun hashSessionKey(accountId: String, address: SignalProtocolAddress): String { + return "${cacheName()}-session-$accountId-${address.name}-${address.deviceId}" + } + + override fun loadSession(address: SignalProtocolAddress): SessionRecord { + logger.info { "loadSession for $accountId:${address.name}" } + cache.get(hashSessionKey(accountId, address))?.let { + return SessionRecord(Base64.getDecoder().decode(it)) } + return SessionRecord() + } override fun getSubDeviceSessions(name: String): MutableList<Int> { + logger.info { "getSubDeviceSessions for $accountId:$name" } return transaction(db) { Sessions.select{ Sessions.accountId eq accountId and (Sessions.name eq name) @@ -254,40 +282,29 @@ class ProtocolStore(private val db: Database) { } override fun storeSession(address: SignalProtocolAddress, record: SessionRecord) { - // upsert the session record for a given address - transaction(db) { - Sessions.updateByAddress(accountId, address) { - it[sessionBytes] = record.serialize() - }.let { numUpdated -> - if(numUpdated == 0) { - Sessions.insert { - it[accountId] = this@AccountProtocolStore.accountId - it[name] = address.name - it[deviceId] = address.deviceId - it[sessionBytes] = record.serialize() - } - } - } - } + logger.info { "storeSession for $accountId:${address.name}" } + cache.set(hashSessionKey(accountId, address), Base64.getEncoder().encodeToString(record.serialize())) } - override fun containsSession(address: SignalProtocolAddress): Boolean = - transaction(db) { - Sessions.findByAddress(accountId, address)?.let { - val sessionRecord = SessionRecord(it[Sessions.sessionBytes]) - sessionRecord.hasSenderChain() - && sessionRecord.sessionVersion == CiphertextMessage.CURRENT_VERSION; - } ?: false + override fun containsSession(address: SignalProtocolAddress): Boolean { + logger.info { "containsSession for $accountId:${address.name}" } + cache.get(hashSessionKey(accountId, address))?.let { + val sessionRecord = SessionRecord(Base64.getDecoder().decode(it)) + return sessionRecord.hasSenderChain() + && sessionRecord.sessionVersion == CiphertextMessage.CURRENT_VERSION } - + return false + } override fun deleteSession(address: SignalProtocolAddress) { + logger.info { "deleteSession for $accountId:${address.name}" } transaction(db) { Sessions.deleteByAddress(accountId, address) } } override fun deleteAllSessions(name: String) { + logger.info { "deleteAllSessions for $accountId:${name}" } transaction(db) { Sessions.deleteWhere { Sessions.accountId eq accountId and (Sessions.name eq name) @@ -296,6 +313,7 @@ class ProtocolStore(private val db: Database) { } override fun archiveSession(address: SignalProtocolAddress) { + logger.info { "archiveSession for $accountId:${address.name}" } transaction(db) { loadSession(address).let { it.archiveCurrentState() diff --git a/simulator/testLag.js b/simulator/testLag.js index 50635e06517ecea3ed9f925de43ba11e2f3fff36..784067db9a086b9b5ed779f65a45a230b547e519 100644 --- a/simulator/testLag.js +++ b/simulator/testLag.js @@ -5,7 +5,7 @@ const { signalcPhoneNumbers, signaldPhoneNumbers } = require('./constants') const { nowTimestamp, loggerOf, wait } = require('../app/util') const logger = loggerOf('testLag') -const numRecipients = 100 +const numRecipients = 1000 ;(async () => { logger.log('STARTING LOAD TEST...')