Commit b752cad6 authored by aguestuser's avatar aguestuser

Merge branch '347-send-channel-redemption-notices-instantly' into 'main'

[#347] Resolve "send channel redemption notifications instantly"

Closes #347

See merge request !389
parents 7a39de62 3d3b37f1
......@@ -157,7 +157,7 @@ dev.restart.metrics: ## force stop and start the app again (with prometheus/graf
#############
test.all: ## run all unit and e2e tests
npx eslint app && ./bin/test/unit && ./bin/test/integration && ./bin/test/e2e
npx eslint app && ./bin/test/unit && ./bin/test/integration
test.unit: ## run unit tests
./bin/test/unit
......
'use strict';
module.exports = {
up: async (queryInterface, Sequelize) => {
await queryInterface.renameColumn('recycleRequests', 'phoneNumber', 'channelPhoneNumber')
return queryInterface.changeColumn('recycleRequests', 'channelPhoneNumber', {
type: Sequelize.STRING,
primaryKey: true,
allowNull: false,
unique: true,
// old --^
// new --v
references: {
model: {
tableName: 'channels',
},
key: 'phoneNumber',
}
})
},
down: async (queryInterface, Sequelize) => {
await queryInterface.changeColumn('recycleRequests', 'channelPhoneNumber', {
type: Sequelize.STRING,
primaryKey: true,
allowNull: false,
unique: true,
})
return queryInterface.renameColumn('recycleRequests', 'channelPhoneNumber', 'phoneNumber')
}
};
......@@ -44,31 +44,18 @@ const channelOf = (sequelize, DataTypes) => {
},
})
channel.associate = db => {
channel.hasMany(db.membership, {
hooks: true,
onDelete: 'cascade',
})
channel.hasMany(db.deauthorization, {
hooks: true,
onDelete: 'cascade',
})
channel.hasMany(db.invite, {
hooks: true,
onDelete: 'cascade',
})
channel.hasOne(db.messageCount, {
hooks: true,
onDelete: 'cascade',
})
const associationDefaults = {
hooks: true,
onDelete: 'cascade',
}
channel.hasMany(db.hotlineMessage, {
hooks: true,
onDelete: 'cascade',
})
channel.associate = db => {
channel.hasMany(db.deauthorization, associationDefaults)
channel.hasMany(db.hotlineMessage, associationDefaults)
channel.hasMany(db.invite, associationDefaults)
channel.hasMany(db.membership, associationDefaults)
channel.hasOne(db.messageCount, associationDefaults)
channel.hasOne(db.recycleRequest, associationDefaults)
}
return channel
......
const { isPhoneNumber } = require('../validations')
const recycleRequestOf = (sequelize, Sequelize) =>
sequelize.define('recycleRequest', {
phoneNumber: {
const recycleRequestOf = (sequelize, Sequelize) => {
const recycleRequest = sequelize.define('recycleRequest', {
channelPhoneNumber: {
type: Sequelize.STRING,
primaryKey: true,
allowNull: false,
unique: true,
validate: isPhoneNumber,
references: {
model: {
tableName: 'channels',
},
key: 'phoneNumber',
},
},
createdAt: {
type: Sequelize.DATE,
......@@ -21,4 +27,11 @@ const recycleRequestOf = (sequelize, Sequelize) =>
},
})
recycleRequest.associate = db => {
recycleRequest.belongsTo(db.channel)
}
return recycleRequest
}
module.exports = { recycleRequestOf }
......@@ -49,6 +49,7 @@ const findAllDeep = () =>
{ model: app.db.invite },
{ model: app.db.membership },
{ model: app.db.messageCount },
{ model: app.db.recycleRequest },
],
})
......@@ -60,6 +61,7 @@ const findManyDeep = phoneNumbers =>
{ model: app.db.invite },
{ model: app.db.membership },
{ model: app.db.messageCount },
{ model: app.db.recycleRequest },
],
})
......@@ -73,6 +75,7 @@ const findDeep = phoneNumber =>
{ model: app.db.invite },
{ model: app.db.membership },
{ model: app.db.messageCount },
{ model: app.db.recycleRequest },
],
})
......
const { Op } = require('sequelize')
const app = require('../../../app')
const util = require('../../util')
const { map, partition } = require('lodash')
const { map } = require('lodash')
const {
job: { recycleGracePeriod },
} = require('../../config')
// (string) -> Promise<{ recycleRequest: RecycleRequest, wasCreated: boolean }>
const requestToRecycle = phoneNumber =>
const requestToRecycle = channelPhoneNumber =>
app.db.recycleRequest
.findOrCreate({ where: { phoneNumber } })
.findOrCreate({ where: { channelPhoneNumber } })
.then(([recycleRequest, wasCreated]) => ({
recycleRequest,
wasCreated,
}))
// (Array<string>) -> Promise<void>
const destroyMany = phoneNumbers =>
app.db.recycleRequest.destroy({
where: { phoneNumber: { [Op.in]: phoneNumbers } },
})
const destroy = channelPhoneNumber =>
app.db.recycleRequest.destroy({ where: { channelPhoneNumber } })
const evaluateRecycleRequests = async () => {
// channel admins have a 1 day grace period to redeem a channel slated for recycling
// by using it. calculate when that grace period started...
const gracePeriodStart = util.now().subtract(parseInt(recycleGracePeriod), 'ms')
// (Array<string>) -> Promise<void>
const destroyMany = channelPhoneNumbers =>
app.db.recycleRequest.destroy({ where: { channelPhoneNumber: { [Op.in]: channelPhoneNumbers } } })
// find all the requests issued before the start of the grace period, indicating
// channels which should be considered for recycling (b/c their grace period has passed)
// () => Promise<string>
const getMatureRecycleRequests = async () => {
// Admins have a 24hr grace period to redeem a channel slated for recycling by using it.
// Here, we find all the requests issued before the start of the grace period, and return their
// phone numbers to the caller for recycling. We may safely assume they can be recycled, becuase
// if redeemed (in dispatcher.dispatch)
const gracePeriodStart = util.now().subtract(parseInt(recycleGracePeriod), 'ms')
const matureRequests = await app.db.recycleRequest.findAll({
where: { createdAt: { [Op.lte]: gracePeriodStart } },
})
// make lists of redeemed and unredeemed channel phone numbers, where "redeemed" channels
// have been used since the start of the grace period, and thus should not be recycled
const [redeemed, toRecycle] = partition(
await app.db.messageCount.findAll({
where: { channelPhoneNumber: { [Op.in]: map(matureRequests, 'phoneNumber') } },
}),
messageCount => messageCount.updatedAt > gracePeriodStart,
)
// pluck the channel phone numbers and return them for processing!
return {
redeemed: map(redeemed, 'channelPhoneNumber'),
toRecycle: map(toRecycle, 'channelPhoneNumber'),
}
return map(matureRequests, 'channelPhoneNumber')
}
module.exports = { requestToRecycle, evaluateRecycleRequests, destroyMany }
module.exports = { requestToRecycle, getMatureRecycleRequests, destroy, destroyMany }
......@@ -2,6 +2,7 @@ const signal = require('../signal')
const callbacks = require('../signal/callbacks')
const channelRepository = require('../db/repositories/channel')
const membershipRepository = require('../db/repositories/membership')
const phoneNumberRegistrar = require('../registrar/phoneNumber')
const safetyNumbers = require('../registrar/safetyNumbers')
const diagnostics = require('../diagnostics')
const { memberTypes } = membershipRepository
......@@ -9,11 +10,16 @@ const executor = require('./commands')
const messenger = require('./messenger')
const resend = require('./resend')
const logger = require('./logger')
const util = require('../util')
const { messagesIn } = require('./strings/messages')
const { get, isEmpty, isNumber } = require('lodash')
const metrics = require('../metrics')
const { counters, errorTypes } = metrics
const { emphasize, redact } = require('../util')
const metrics = require('../metrics')
const {
counters: { SIGNALD_MESSAGES, RELAYABLE_MESSAGES, ERRORS },
errorTypes,
messageDirection: { INBOUND },
} = metrics
const {
defaultLanguage,
signal: { diagnosticsPhoneNumber },
......@@ -54,14 +60,15 @@ const {
// string -> Promise<SignalBoostStatus>
const dispatch = async msg => {
logger.debug(emphasize(redact(msg)))
// parse basic info from message
const inboundMsg = parseInboundSignaldMessage(msg)
const channelPhoneNumber = get(inboundMsg, 'data.username', 'SYSTEM')
metrics.incrementCounter(counters.SIGNALD_MESSAGES, [
inboundMsg.type,
channelPhoneNumber,
metrics.messageDirection.INBOUND,
])
// retrieve db info we need for dispatching...
// count what kind of message we are processing
metrics.incrementCounter(SIGNALD_MESSAGES, [inboundMsg.type, channelPhoneNumber, INBOUND])
// retrieve db info we need for dispatching
const [channel, sender] = _isMessage(inboundMsg)
? await Promise.all([
channelRepository.findDeep(inboundMsg.data.username),
......@@ -72,35 +79,61 @@ const dispatch = async msg => {
])
: []
// detect and handle callbacks if any
// handle callbacks for messages that have request/response semantics
callbacks.handle(inboundMsg)
// detect system-created messages, handle them, and return early
// maybe return early if we receive system messages that prompt interventions...
const interventions = await detectInterventions(channel, sender, inboundMsg)
if (interventions) return interventions()
// ...or if we get a non-relayable messsage:
if (!_isMessage(inboundMsg) || _isEmpty(inboundMsg)) return Promise.resolve()
// else, follow the happy path!
const sideEffects = await detectAndPerformSideEffects(channel, sender, inboundMsg)
await util.sequence(sideEffects)
return relay(channel, sender, inboundMsg)
}
/**********************
* DISPATCH HELPERS
**********************/
// (Channel | null, Sender | null, SdMessage) -> Promise<function | null>
const detectInterventions = async (channel, sender, inboundMsg) => {
const healthcheckId = detectHealthcheck(inboundMsg)
if (healthcheckId) return diagnostics.respondToHealthcheck(channelPhoneNumber, healthcheckId)
if (healthcheckId)
return () => diagnostics.respondToHealthcheck(channel.phoneNumber, healthcheckId)
// return early from healthcheck responses to avoid infinite feedback loops!
const isHealthcheckResponse = detectHealthcheckResponse(inboundMsg)
if (isHealthcheckResponse) return Promise.resolve()
if (isHealthcheckResponse) return () => Promise.resolve()
const rateLimitedMessage = detectRateLimitedMessage(inboundMsg)
if (rateLimitedMessage) return logAndResendRateLimitedMessage(rateLimitedMessage)
if (rateLimitedMessage) return () => logAndResendRateLimitedMessage(rateLimitedMessage)
const updatableFingerprint = await detectUpdatableFingerprint(inboundMsg)
if (updatableFingerprint) return safetyNumbers.updateFingerprint(updatableFingerprint)
if (updatableFingerprint) return () => safetyNumbers.updateFingerprint(updatableFingerprint)
}
const detectAndPerformSideEffects = async (channel, sender, inboundMsg) => {
let sideEffects = []
// Don't return early here b/c that would prevent processing of HELLO commands on channels w/ disappearing messages
const newExpiryTime = detectUpdatableExpiryTime(inboundMsg, channel)
// GOTCHA: Don't return early here b/c that would prevent HELLO commands on channels w/
// disappearing messages from ever being processed!
if (isNumber(newExpiryTime)) await updateExpiryTime(sender, channel, newExpiryTime)
if (isNumber(newExpiryTime))
sideEffects.push(() => updateExpiryTime(sender, channel, newExpiryTime))
// dispatch user-created messages
if (shouldRelay(inboundMsg)) return relay(channel, sender, inboundMsg)
// Don't return early here b/c the person "redeemed" channel by sending normal message that should be processed!
if (channel && channel.recycleRequest)
sideEffects.push(() => phoneNumberRegistrar.redeem(channel))
return sideEffects
}
const relay = async (channel, sender, inboundMsg) => {
const sdMessage = signal.parseOutboundSdMessage(inboundMsg)
try {
metrics.incrementCounter(counters.RELAYABLE_MESSAGES, [channel.phoneNumber])
metrics.incrementCounter(RELAYABLE_MESSAGES, [channel.phoneNumber])
const dispatchable = { channel, sender, sdMessage }
const commandResult = await executor.processCommand(dispatchable)
return messenger.dispatch({ dispatchable, commandResult })
......@@ -109,6 +142,10 @@ const relay = async (channel, sender, inboundMsg) => {
}
}
/******************
* INTERVENTIONS
*****************/
// InboundSdMessage => void
const logAndResendRateLimitedMessage = rateLimitedMessage => {
const _channelPhoneNumber = rateLimitedMessage.username
......@@ -119,7 +156,7 @@ const logAndResendRateLimitedMessage = rateLimitedMessage => {
resendInterval,
),
)
metrics.incrementCounter(counters.ERRORS, [
metrics.incrementCounter(ERRORS, [
resendInterval ? errorTypes.RATE_LIMIT_RESENDING : errorTypes.RATE_LIMIT_ABORTING,
_channelPhoneNumber,
])
......@@ -162,8 +199,6 @@ const parseInboundSignaldMessage = inboundMsg => {
}
}
const shouldRelay = inboundMsg => _isMessage(inboundMsg) && !_isEmpty(inboundMsg)
const _isMessage = inboundMsg =>
inboundMsg.type === signal.messageTypes.MESSAGE && get(inboundMsg, 'data.dataMessage')
......
......@@ -4,7 +4,7 @@ const { errors } = require('./common')
const { destroy } = require('./destroy')
const { list } = require('./present')
const { provisionN } = require('./provision')
const { requestToRecycle, recycle, processRecycleRequests } = require('./recycle')
const { requestToRecycle, processRecycleRequests, recycle, redeem } = require('./recycle')
const { register, registerAllPurchased, registerAllUnregistered } = require('./register')
const { handleSms } = require('./sms')
const { purchase, purchaseN } = require('./purchase')
......@@ -25,6 +25,7 @@ module.exports = {
processRecycleRequests,
requestToRecycle,
recycle,
redeem,
register,
registerAllPurchased,
registerAllUnregistered,
......
......@@ -47,22 +47,15 @@ const requestToRecycle = async phoneNumbers => {
// () -> Promise<Array<string>>
const processRecycleRequests = async () => {
try {
const { redeemed, toRecycle } = await recycleRequestRepository.evaluateRecycleRequests()
const recycleResults = await Promise.all(toRecycle.map(recycle))
await recycleRequestRepository.destroyMany([...redeemed, ...toRecycle])
const redeemedChannels = await channelRepository.findManyDeep(redeemed)
const numProcessed = redeemed.length + toRecycle.length
const phoneNumbersToRecycle = await recycleRequestRepository.getMatureRecycleRequests()
const recycleResults = await Promise.all(phoneNumbersToRecycle.map(recycle))
await recycleRequestRepository.destroyMany(phoneNumbersToRecycle)
return Promise.all([
...redeemedChannels.map(channel =>
notifier.notifyAdmins(channel, notificationKeys.CHANNEL_REDEEMED),
),
numProcessed === 0
phoneNumbersToRecycle.length === 0
? Promise.resolve()
: notifier.notifyMaintainers(
`${redeemed.length + toRecycle.length} recycle requests processed:\n\n` +
`${redeemed.map(r => `${r} redeemed by admins.`).join('\n')}` +
'\n' +
`${phoneNumbersToRecycle.length} recycle requests processed:\n\n` +
`${map(recycleResults, 'message').join('\n')}`,
),
])
......@@ -71,6 +64,21 @@ const processRecycleRequests = async () => {
}
}
// (Channel) -> Promise<void>
const redeem = async channel => {
try {
await recycleRequestRepository.destroy(channel.phoneNumber)
await Promise.all([
notifier.notifyAdmins(channel, notificationKeys.CHANNEL_REDEEMED),
notifier.notifyMaintainers(
`${channel.phoneNumber} had been scheduled for recycling, but was just redeemed.`,
),
])
} catch (err) {
return notifier.notifyMaintainers(`Error redeeming ${channel.phoneNumber}: ${err}`)
}
}
// (string) -> SignalboostStatus
const recycle = async phoneNumber => {
const channel = await channelRepository.findDeep(phoneNumber)
......@@ -93,4 +101,4 @@ const recycle = async phoneNumber => {
}
}
module.exports = { requestToRecycle, processRecycleRequests, recycle }
module.exports = { requestToRecycle, processRecycleRequests, recycle, redeem }
......@@ -8,10 +8,10 @@ volumes:
# you can uncomment the below if you'd like to run unit tests behind a VPN.
# do not leave it uncommented as that will mess up CI!
#networks:
# default:
# external:
# name: localdev
networks:
default:
external:
name: localdev
services:
......
......@@ -4,10 +4,10 @@ version: '3'
# you can uncomment the below if you'd like to run unit tests behind a VPN.
# do not leave it uncommented as that will mess up CI!
#networks:
# default:
# external:
# name: localdev
networks:
default:
external:
name: localdev
volumes:
postgres_data:
......
......@@ -28,6 +28,7 @@ export const deepChannelFactory = attrs => {
messageCount: messageCountFactory({ channelPhoneNumber }),
invites: times(2, () => inviteFactory({ channelPhoneNumber })),
deauthorizations: [deauthorizationFactory({ channelPhoneNumber })],
recycleRequest: { channelPhoneNumber },
...attrs,
}
}
......
......@@ -69,6 +69,17 @@ describe('channel model', () => {
},
)
const createChannelWithRecycleRequest = () =>
db.channel.create(
{
...channelFactory(),
recycleRequest: {},
},
{
include: [{ model: db.recycleRequest }],
},
)
before(async () => {
db = await run()
})
......@@ -78,6 +89,7 @@ describe('channel model', () => {
db.messageCount.destroy({ where: {}, force: true }),
db.membership.destroy({ where: {}, force: true }),
db.hotlineMessage.destroy({ where: {}, force: true }),
db.recycleRequest.destroy({ where: {}, force: true }),
])
await db.channel.destroy({ where: {}, force: true })
})
......@@ -248,5 +260,28 @@ describe('channel model', () => {
expect(await db.hotlineMessage.count()).to.eql(hotlineMessageCount - 2)
})
})
describe('recycle request', () => {
let recycleRequest
beforeEach(async () => {
channel = await createChannelWithRecycleRequest()
recycleRequest = await channel.getRecycleRequest()
})
it('has one recycle request', () => {
expect(recycleRequest).to.be.an('object')
})
it('deletes the recycle reqeust when it deletes channel', async () => {
const recycleRequestCount = await db.recycleRequest.count()
await channel.destroy()
expect(await db.recycleRequest.count()).to.eql(recycleRequestCount - 1)
})
it('returns null if no recycle requests exist for the account', async () => {
channel = await db.channel.create(channelFactory())
expect(await channel.getRecycleRequest()).to.be.null
})
})
})
})
import { expect } from 'chai'
import { describe, it, before, after, afterEach } from 'mocha'
import { describe, it, before, beforeEach, after, afterEach } from 'mocha'
import { run } from '../../../../app/db/index'
import { genPhoneNumber } from '../../../support/factories/phoneNumber'
import { channelFactory } from '../../../support/factories/channel'
describe('recycleRequest model', () => {
let db
let phoneNumber = genPhoneNumber()
let db, channel
let channelPhoneNumber = genPhoneNumber()
before(async () => (db = await run()))
afterEach(async () => await db.recycleRequest.destroy({ where: {} }))
beforeEach(async () => {
channel = await db.channel.create(channelFactory({ phoneNumber: channelPhoneNumber }))
})
afterEach(async () => {
await db.recycleRequest.destroy({ where: {} })
await db.channel.destroy({ where: {} })
})
after(async () => await db.stop())
it('has the correct fields', async () => {
const recycleRequest = await db.recycleRequest.create({ phoneNumber })
expect(recycleRequest.phoneNumber).to.be.a('string')
const recycleRequest = await db.recycleRequest.create({ channelPhoneNumber })
expect(recycleRequest.channelPhoneNumber).to.be.a('string')
expect(recycleRequest.createdAt).to.be.a('Date')
expect(recycleRequest.updatedAt).to.be.a('Date')
})
describe('validations', () => {
it('requires a phoneNumber', async () => {
const err = await db.recycleRequest.create({ phoneNumber: undefined }).catch(e => e)
expect(err.message).to.include('phoneNumber cannot be null')
it('requires a channelPhoneNumber', async () => {
const err = await db.recycleRequest.create({ channelPhoneNumber: undefined }).catch(e => e)
expect(err.message).to.include('channelPhoneNumber cannot be null')
})
it('requires phone number to have valid e164 format', async () => {
const err = await db.recycleRequest.create({ phoneNumber: 'foobar' }).catch(e => e)
const err = await db.recycleRequest.create({ channelPhoneNumber: 'foobar' }).catch(e => e)
expect(err.message).to.include('Validation error')
})
it("doesn't allow the same phone number to be enqueued twice", async () => {
await db.recycleRequest.create({ phoneNumber })
const err = await db.recycleRequest.create({ phoneNumber }).catch(e => e)
await db.recycleRequest.create({ channelPhoneNumber })
const err = await db.recycleRequest.create({ channelPhoneNumber }).catch(e => e)
expect(err.name).to.equal('SequelizeUniqueConstraintError')
})
it("doesn't allow a recycle request for a channel that does not exist", async () => {
const err = await db.recycleRequest
.create({ channelPhoneNumber: genPhoneNumber() })
.catch(e => e)
expect(err.name).to.equal('SequelizeForeignKeyConstraintError')
})
})
describe('associations', () => {
it('belongs to a channel', async () => {
const recycleRequest = await db.recycleRequest.create({
channelPhoneNumber: channel.phoneNumber,
})
expect((await recycleRequest.getChannel()).dataValues).to.eql(channel.dataValues)
})
})
})
import { expect } from 'chai'
import { describe, it, before, beforeEach, after, afterEach } from 'mocha'
import { deepChannelFactory } from '../../../support/factories/channel'
import { channelFactory, deepChannelFactory } from '../../../support/factories/channel'
import { genPhoneNumber } from '../../../support/factories/phoneNumber'
import { omit, keys, times, map } from 'lodash'
import channelRepository, { isSysadmin } from '../../../../app/db/repositories/channel'
......@@ -21,12 +21,13 @@ describe('channel repository', () => {
})