Verified Commit c59b9abd authored by aguestuser's avatar aguestuser

[347] redeem channels from dispatcher.dispatch

* check if any incoming message is from a channel with a pending
  recycle request
* if so, redeem it (and notify admins/maintainers) right away!

side-effect:

* refactor dispatcher.dispatch to extract handling of early-returning
  and non-early returning side-efffects just before `relay`
  * tuck all logic that detects actionable state from system messages
    and possibly returns early after processing it into a
    `detectInterventions` helper (which returns an `intervetnion` func
    and returns early after calling it)
  * tuck all logic that detects and takes action but does not require
    returning early into a `detectSideEffects` function, which returns
    an array of side-effects to perform before proceeding to `relay`
parent 7ea2b5e6
......@@ -2,6 +2,7 @@ const signal = require('../signal')
const callbacks = require('../signal/callbacks')
const channelRepository = require('../db/repositories/channel')
const membershipRepository = require('../db/repositories/membership')
const phoneNumberRegistrar = require('../registrar/phoneNumber')
const safetyNumbers = require('../registrar/safetyNumbers')
const diagnostics = require('../diagnostics')
const { memberTypes } = membershipRepository
......@@ -9,11 +10,16 @@ const executor = require('./commands')
const messenger = require('./messenger')
const resend = require('./resend')
const logger = require('./logger')
const util = require('../util')
const { messagesIn } = require('./strings/messages')
const { get, isEmpty, isNumber } = require('lodash')
const metrics = require('../metrics')
const { counters, errorTypes } = metrics
const { emphasize, redact } = require('../util')
const metrics = require('../metrics')
const {
counters: { SIGNALD_MESSAGES, RELAYABLE_MESSAGES, ERRORS },
errorTypes,
messageDirection: { INBOUND },
} = metrics
const {
defaultLanguage,
signal: { diagnosticsPhoneNumber },
......@@ -54,14 +60,15 @@ const {
// string -> Promise<SignalBoostStatus>
const dispatch = async msg => {
logger.debug(emphasize(redact(msg)))
// parse basic info from message
const inboundMsg = parseInboundSignaldMessage(msg)
const channelPhoneNumber = get(inboundMsg, 'data.username', 'SYSTEM')
metrics.incrementCounter(counters.SIGNALD_MESSAGES, [
inboundMsg.type,
channelPhoneNumber,
metrics.messageDirection.INBOUND,
])
// retrieve db info we need for dispatching...
// count what kind of message we are processing
metrics.incrementCounter(SIGNALD_MESSAGES, [inboundMsg.type, channelPhoneNumber, INBOUND])
// retrieve db info we need for dispatching
const [channel, sender] = _isMessage(inboundMsg)
? await Promise.all([
channelRepository.findDeep(inboundMsg.data.username),
......@@ -72,35 +79,61 @@ const dispatch = async msg => {
])
: []
// detect and handle callbacks if any
// handle callbacks for messages that have request/response semantics
callbacks.handle(inboundMsg)
// detect system-created messages, handle them, and return early
// maybe return early if we receive system messages that prompt interventions...
const interventions = await detectInterventions(channel, sender, inboundMsg)
if (interventions) return interventions()
// ...or if we get a non-relayable messsage:
if (!_isMessage(inboundMsg) || _isEmpty(inboundMsg)) return Promise.resolve()
// else, follow the happy path!
const sideEffects = await detectAndPerformSideEffects(channel, sender, inboundMsg)
await util.sequence(sideEffects)
return relay(channel, sender, inboundMsg)
}
/**********************
* DISPATCH HELPERS
**********************/
// (Channel | null, Sender | null, SdMessage) -> Promise<function | null>
const detectInterventions = async (channel, sender, inboundMsg) => {
const healthcheckId = detectHealthcheck(inboundMsg)
if (healthcheckId) return diagnostics.respondToHealthcheck(channelPhoneNumber, healthcheckId)
if (healthcheckId)
return () => diagnostics.respondToHealthcheck(channel.phoneNumber, healthcheckId)
// return early from healthcheck responses to avoid infinite feedback loops!
const isHealthcheckResponse = detectHealthcheckResponse(inboundMsg)
if (isHealthcheckResponse) return Promise.resolve()
if (isHealthcheckResponse) return () => Promise.resolve()
const rateLimitedMessage = detectRateLimitedMessage(inboundMsg)
if (rateLimitedMessage) return logAndResendRateLimitedMessage(rateLimitedMessage)
if (rateLimitedMessage) return () => logAndResendRateLimitedMessage(rateLimitedMessage)
const updatableFingerprint = await detectUpdatableFingerprint(inboundMsg)
if (updatableFingerprint) return safetyNumbers.updateFingerprint(updatableFingerprint)
if (updatableFingerprint) return () => safetyNumbers.updateFingerprint(updatableFingerprint)
}
const detectAndPerformSideEffects = async (channel, sender, inboundMsg) => {
let sideEffects = []
// Don't return early here b/c that would prevent processing of HELLO commands on channels w/ disappearing messages
const newExpiryTime = detectUpdatableExpiryTime(inboundMsg, channel)
// GOTCHA: Don't return early here b/c that would prevent HELLO commands on channels w/
// disappearing messages from ever being processed!
if (isNumber(newExpiryTime)) await updateExpiryTime(sender, channel, newExpiryTime)
if (isNumber(newExpiryTime))
sideEffects.push(() => updateExpiryTime(sender, channel, newExpiryTime))
// dispatch user-created messages
if (shouldRelay(inboundMsg)) return relay(channel, sender, inboundMsg)
// Don't return early here b/c the person "redeemed" channel by sending normal message that should be processed!
if (channel && channel.recycleRequest)
sideEffects.push(() => phoneNumberRegistrar.redeem(channel))
return sideEffects
}
const relay = async (channel, sender, inboundMsg) => {
const sdMessage = signal.parseOutboundSdMessage(inboundMsg)
try {
metrics.incrementCounter(counters.RELAYABLE_MESSAGES, [channel.phoneNumber])
metrics.incrementCounter(RELAYABLE_MESSAGES, [channel.phoneNumber])
const dispatchable = { channel, sender, sdMessage }
const commandResult = await executor.processCommand(dispatchable)
return messenger.dispatch({ dispatchable, commandResult })
......@@ -109,6 +142,10 @@ const relay = async (channel, sender, inboundMsg) => {
}
}
/******************
* INTERVENTIONS
*****************/
// InboundSdMessage => void
const logAndResendRateLimitedMessage = rateLimitedMessage => {
const _channelPhoneNumber = rateLimitedMessage.username
......@@ -119,7 +156,7 @@ const logAndResendRateLimitedMessage = rateLimitedMessage => {
resendInterval,
),
)
metrics.incrementCounter(counters.ERRORS, [
metrics.incrementCounter(ERRORS, [
resendInterval ? errorTypes.RATE_LIMIT_RESENDING : errorTypes.RATE_LIMIT_ABORTING,
_channelPhoneNumber,
])
......@@ -162,8 +199,6 @@ const parseInboundSignaldMessage = inboundMsg => {
}
}
const shouldRelay = inboundMsg => _isMessage(inboundMsg) && !_isEmpty(inboundMsg)
const _isMessage = inboundMsg =>
inboundMsg.type === signal.messageTypes.MESSAGE && get(inboundMsg, 'data.dataMessage')
......
......@@ -8,10 +8,10 @@ volumes:
# you can uncomment the below if you'd like to run unit tests behind a VPN.
# do not leave it uncommented as that will mess up CI!
#networks:
# default:
# external:
# name: localdev
networks:
default:
external:
name: localdev
services:
......
......@@ -4,10 +4,10 @@ version: '3'
# you can uncomment the below if you'd like to run unit tests behind a VPN.
# do not leave it uncommented as that will mess up CI!
#networks:
# default:
# external:
# name: localdev
networks:
default:
external:
name: localdev
volumes:
postgres_data:
......
......@@ -103,10 +103,14 @@ describe('recycleablePhoneNumber repository', () => {
expect(await recycleRequestRepository.destroyMany(toBeDeleted)).to.eql(toBeDeleted.length)
expect(await db.recycleRequest.count()).to.eql(recycleRequestCount - toBeDeleted.length)
expect(
await db.recycleRequest.findAll({ where: { channelPhoneNumber: { [Op.in]: toBeDeleted } } }),
await db.recycleRequest.findAll({
where: { channelPhoneNumber: { [Op.in]: toBeDeleted } },
}),
).to.have.length(0)
expect(
await db.recycleRequest.findAll({ where: { channelPhoneNumber: { [Op.in]: toBeIgnored } } }),
await db.recycleRequest.findAll({
where: { channelPhoneNumber: { [Op.in]: toBeIgnored } },
}),
).to.have.length(toBeIgnored.length)
})
})
......
......@@ -8,6 +8,7 @@ import { dispatch } from '../../../app/dispatcher'
import channelRepository, { getAllAdminsExcept } from '../../../app/db/repositories/channel'
import membershipRepository from '../../../app/db/repositories/membership'
import safetyNumbers from '../../../app/registrar/safetyNumbers'
import phoneNumberRegistrar from '../../../app/registrar/phoneNumber'
import signal, { messageTypes } from '../../../app/signal'
import executor from '../../../app/dispatcher/commands'
import messenger from '../../../app/dispatcher/messenger'
......@@ -359,5 +360,23 @@ describe('dispatcher module', () => {
])
})
})
describe('messages from a channel with a pending recycle request', () => {
let redeemStub
beforeEach(() => {
findDeepStub.returns(
Promise.resolve({
...channel,
recycleRequest: { channelPhoneNumber: channel.phoneNumber },
}),
)
redeemStub = sinon.stub(phoneNumberRegistrar, 'redeem').returns(Promise.resolve())
})
it('redeems the channel and relays the message', async () => {
await dispatch(JSON.stringify(sdInMessage))
expect(redeemStub.getCall(0).args).to.eql([channel])
})
})
})
})
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment