diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/bootstrap/HeartbeatAdvertiser.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/bootstrap/HeartbeatAdvertiser.scala index d3204b09ee8ca7e6f765ecda94975374f3335728..873282d99ebfaa305a1b5387219152dd9a7da321 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/bootstrap/HeartbeatAdvertiser.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/bootstrap/HeartbeatAdvertiser.scala @@ -1,21 +1,22 @@ package acab.devcon0.input.bootstrap -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration +import scala.concurrent.duration.DurationInt import scala.concurrent.duration.FiniteDuration import acab.devcon0.domain.ports.input.federation.HeartbeatCommand import acab.devcon0.domain.ports.input.federation.HeartbeatCommandHandler import acab.devcon0.domain.ports.input.federation.HeartbeatCommandImplicits.HeartbeatCommandEventFlattenOps +import acab.devcon0.trile.utils.EffectsUtils +import acab.devcon0.trile.utils.IORetry import cats.effect.IO +import cats.implicits.catsSyntaxMonadError import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) { - private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] - private val oneMinuteDuration: FiniteDuration = Duration(55, TimeUnit.SECONDS) + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private val halfMinute: FiniteDuration = 30.seconds def run(): IO[Unit] = { logger.info(s"Heartbeat-er started") >> triggerHeartbeat @@ -23,13 +24,16 @@ class HeartbeatAdvertiser(heartbeatCommandHandler: HeartbeatCommandHandler) { private def triggerHeartbeat: IO[Unit] = { val command: HeartbeatCommand = HeartbeatCommand() - heartbeatCommandHandler - .handle(command) - .flattenEvents - .onError(throwable => logger.error(s"throwable=$throwable")) - .recoverWith(_ => IO.unit.delayBy(oneMinuteDuration) >> run()) + triggerHeartbeat(command) + .attemptTap(EffectsUtils.attemptTLog) .flatTap(_ => logger.debug(s"Heartbeat sent to the federation members")) - .flatTap(_ => IO.unit.delayBy(oneMinuteDuration)) + .recoverWith(_ => IO.unit) + .flatTap(_ => IO.sleep(halfMinute)) .foreverM } + + private def triggerHeartbeat(command: HeartbeatCommand): IO[Unit] = { + val heartbeatIO: IO[Unit] = heartbeatCommandHandler.handle(command).flattenEvents + IORetry.fibonacci(heartbeatIO) + } }