Skip to content
Snippets Groups Projects
Commit fea84d37 authored by Trile Ro's avatar Trile Ro
Browse files

[FCB] Retry heartbeats since that they fail due to member/peer re-connection

parent 9aaca717
No related branches found
No related tags found
No related merge requests found
package acab.devcon0.input.bootstrap package acab.devcon0.input.bootstrap
import java.util.concurrent.TimeUnit import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import acab.devcon0.domain.ports.input.federation.HeartbeatCommand import acab.devcon0.domain.ports.input.federation.HeartbeatCommand
import acab.devcon0.domain.ports.input.federation.HeartbeatCommandHandler import acab.devcon0.domain.ports.input.federation.HeartbeatCommandHandler
import acab.devcon0.domain.ports.input.federation.HeartbeatCommandImplicits.HeartbeatCommandEventFlattenOps import acab.devcon0.domain.ports.input.federation.HeartbeatCommandImplicits.HeartbeatCommandEventFlattenOps
import acab.devcon0.trile.utils.EffectsUtils
import acab.devcon0.trile.utils.IORetry
import cats.effect.IO import cats.effect.IO
import cats.implicits.catsSyntaxMonadError
import org.typelevel.log4cats.Logger import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger import org.typelevel.log4cats.slf4j.Slf4jLogger
class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) { class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) {
private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val oneMinuteDuration: FiniteDuration = Duration(55, TimeUnit.SECONDS) private val halfMinute: FiniteDuration = 30.seconds
def run(): IO[Unit] = { def run(): IO[Unit] = {
logger.info(s"Heartbeat-er started") >> triggerHeartbeat logger.info(s"Heartbeat-er started") >> triggerHeartbeat
...@@ -23,13 +24,16 @@ class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) { ...@@ -23,13 +24,16 @@ class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) {
private def triggerHeartbeat: IO[Unit] = { private def triggerHeartbeat: IO[Unit] = {
val command: HeartbeatCommand = HeartbeatCommand() val command: HeartbeatCommand = HeartbeatCommand()
heartbeatCommandHandler triggerHeartbeat(command)
.handle(command) .attemptTap(EffectsUtils.attemptTLog)
.flattenEvents
.onError(throwable => logger.error(s"throwable=$throwable"))
.recoverWith(_ => IO.unit.delayBy(oneMinuteDuration) >> run())
.flatTap(_ => logger.debug(s"Heartbeat sent to the federation members")) .flatTap(_ => logger.debug(s"Heartbeat sent to the federation members"))
.flatTap(_ => IO.unit.delayBy(oneMinuteDuration)) .recoverWith(_ => IO.unit)
.flatTap(_ => IO.sleep(halfMinute))
.foreverM .foreverM
} }
private def triggerHeartbeat(command: HeartbeatCommand): IO[Unit] = {
val heartbeatIO: IO[Unit] = heartbeatCommandHandler.handle(command).flattenEvents
IORetry.fibonacci(heartbeatIO)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment