diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala index 04cd5d4cd72364f4983c2628d490eb8de82824f3..31ab197a4f6a5c74518589f414ca4500893494d0 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala @@ -21,11 +21,23 @@ class IpfsCidDeltaUpdateCommandHandlerImpl( override def handle(cmd: IpfsCidDeltaUpdateCommand): IO[IpfsCidDeltaUpdateEvent[?]] = { for - _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals) - _ <- ipfsCidCopiesService.decrement(cmd.delta.removals.toSet) + _ <- handleAdditions(cmd) + _ <- handleRemovals(cmd) + yield IpfsCidDeltaUpdateSuccessEvent() + } + + private def handleAdditions(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = { + for _ <- ipfsCidCopiesService.increment(cmd.delta.additions.toSet) _ <- ipfsClusterService.addPins(cmd.delta.additions) + yield () + } + + private def handleRemovals(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = { + for + _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals) + _ <- ipfsCidCopiesService.decrement(cmd.delta.removals.toSet) _ <- ipfsClusterService.removePins(cmd.delta.removals) - yield IpfsCidDeltaUpdateSuccessEvent() + yield () } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala index 8b2c9d65180167f4c01a789418668cef143a6493..971160b6c71168c38f176993abcb69f9c2b88014 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala @@ -1,8 +1,7 @@ package acab.devcon0.domain.adapters.federationmember import acab.devcon0.domain.dtos._ -import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus -import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus.PINNED +import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus._ import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberSharingFolderUpdateMessage import acab.devcon0.domain.ports.input._ import acab.devcon0.domain.ports.input.federationmember._ @@ -80,7 +79,7 @@ class SharingFolderUpdateCommandHandlerImpl(changelogService: ChangelogService[I private def buildFederationMemberChangelogItem( message: FederationMemberSharingFolderUpdateMessage ): IO[Option[FederationMemberChangelogItem]] = { - IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp, IpfsCidPinStatus.PINNED))) + IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp, PINNED))) } private def logFilterNotPassedCase( diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala index 2a084f884ce0035d6f1affcb5cd2e4754519ba7b..3c2719e9163056f86dfd08dcee7a635f287623a6 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala @@ -6,7 +6,6 @@ import io.circe._ import io.circe.generic.auto._ import io.circe.generic.semiauto.deriveEncoder import io.circe.jawn.decode -import io.circe.syntax.EncoderOps object IpfsCodecs { @@ -50,13 +49,8 @@ object IpfsCodecs { } object Encoders { - implicit val ipfsPubSubPongMessage: Encoder[IpfsPubSubPongMessage] = deriveEncoder implicit val ipfsLsLinkResponse: Encoder[IpfsLsLinkResponse] = deriveEncoder implicit val ipfsLsResponse: Encoder[IpfsLsResponse] = deriveEncoder implicit val ipfsNamePublishHttpResponse: Encoder[IpfsNamePublishHttpResponse] = deriveEncoder - - object IpfsPubSubPongMessage { - def apply(dto: IpfsPubSubPongMessage): IO[String] = IO(EncoderOps[IpfsPubSubPongMessage](dto).asJson.noSpaces) - } } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala index b9ca796626c72be35a3be8b3a9ecc9e8e9818dbc..ee11b8c44c3dbd8e9ffe4d1ddc3e6aca547040ab 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala @@ -57,7 +57,7 @@ class FacadeServiceImpl( else flatRepository .deleteAll(ipfsCids) - .flatTap(_ => logger.info(s"deleteFlats for ipfsCids=$ipfsCids succeeded")) + .flatTap(_ => logger.debug(s"deleteFlats for ipfsCids=$ipfsCids succeeded")) .onError(logError) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala index dde6e3fa42ed8679ec6af5dc9407f1716d42eec8..acf92c14c97bb0a1efa99999b6e0fb9d573772fc 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala @@ -4,9 +4,10 @@ import acab.devcon0.domain.codecs.FederationCodecs import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommand import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommandHandler -import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateEvent import acab.devcon0.output.repository.redisutils.Redis +import acab.devcon0.trile.utils.IORetry import cats.effect._ +import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime import dev.profunktor.redis4cats.data._ import fs2.Pipe @@ -18,7 +19,7 @@ class FederationIpfsCidDeltaUpdateListener( ipfsCidDeltaUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler ) { - private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global + cats.effect.unsafe.IORuntime.global implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val channel: RedisChannel[String] = Redis.Channels.federationIpfsCidsDeltaUpdate @@ -29,25 +30,23 @@ class FederationIpfsCidDeltaUpdateListener( private def processMessage(): Pipe[IO, String, Unit] = stream => { stream .evalMap(FederationCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_)) - .evalTap(logReceivedMessage) .evalTap(processMessageInner) + .evalTap(logReceivedMessage) .evalMap(* => IO.unit) } private def processMessageInner(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = { - (for _ <- triggerCommand(message) - yield ()).unsafeRunAndForget() - IO.unit - } - - private def triggerCommand(message: FederationIpfsCidsDeltaUpdateMessage): IO[IpfsCidDeltaUpdateEvent[?]] = { - val cmd: IpfsCidDeltaUpdateCommand = IpfsCidDeltaUpdateCommand(delta = message.delta) - ipfsCidDeltaUpdateCommandHandler.handle(cmd) + Supervisor[IO](await = true) + .use(_ => { + val cmd: IpfsCidDeltaUpdateCommand = IpfsCidDeltaUpdateCommand(delta = message.delta) + IORetry.fibonacci(ipfsCidDeltaUpdateCommandHandler.handle(cmd)) + }) + .void } private def logReceivedMessage(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = { logger.info( - s"FederationIpfsCidDeltaUpdateListener received over Redis pub/sub. $message" + s"Message received & processed over Redis pub/sub. additions=${message.delta.additions.size} removals=${message.delta.removals.size}" ) } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala index acb3c2f11a5fab34760f35f8f819f4335f54df00..cf2749180a6db1376a9a7012e03da4781643c119 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala @@ -2,27 +2,31 @@ package acab.devcon0.input.redispubsub import acab.devcon0.domain.codecs.FederationMemberCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs.Decoders.ChangelogUpdateMessage +import acab.devcon0.domain.dtos.FederationMemberIpfsCidDelta import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage import acab.devcon0.domain.ports.input.federationmember._ import acab.devcon0.domain.ports.output.publisher.RedisPublisher import acab.devcon0.output.repository.redisutils.Redis import cats.effect._ +import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime +import cats.implicits.toTraverseOps import dev.profunktor.redis4cats.data._ import fs2.Pipe import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class FederationMemberChangelogUpdateEventListener( - redisListener: Listener, - ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler, - redisPublisher: RedisPublisher[IO] + redisListener: Listener, + ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler, + redisPublisher: RedisPublisher[IO] ) { private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val redisChannel: RedisChannel[String] = Redis.Channels.federationMemberChangelogUpdate + private val chunkSize: Int = 500 def run(): Unit = { redisListener.run(redisChannel, processMessage()) @@ -40,12 +44,41 @@ class FederationMemberChangelogUpdateEventListener( (for event <- triggerIpfsCidFileTreeUpdateCommand(message) _ <- event match - case IpfsCidDeltaUpdateSuccessEvent(delta) => - redisPublisher.publish(FederationMemberIpfsCidsDeltaUpdateMessage(delta)) - case IpfsCidDeltaUpdateErrorEvent() => IO.unit + case IpfsCidDeltaUpdateSuccessEvent(delta) => report(delta) + case IpfsCidDeltaUpdateErrorEvent() => IO.unit yield ()).unsafeRunAndForget() IO.unit } + + private def report(delta: FederationMemberIpfsCidDelta): IO[Unit] = { + for + _ <- reportAdditions(delta) + _ <- reportRemovals(delta) + yield () + } + + private def reportAdditions(delta: FederationMemberIpfsCidDelta): IO[Unit] = { + Supervisor[IO](await = true).use { supervisor => + val messages: List[FederationMemberIpfsCidsDeltaUpdateMessage] = delta.additions + .grouped(chunkSize) + .map(cids => FederationMemberIpfsCidDelta(delta.federationMemberId, cids, List())) + .map(newDelta => FederationMemberIpfsCidsDeltaUpdateMessage(newDelta)) + .toList + supervisor.supervise(messages.traverse(redisPublisher.publish)) + }.void + } + + private def reportRemovals(delta: FederationMemberIpfsCidDelta): IO[Unit] = { + Supervisor[IO](await = true).use { supervisor => + val messages = delta.removals + .grouped(chunkSize) + .map(cids => FederationMemberIpfsCidDelta(delta.federationMemberId, List(), cids)) + .map(newDelta => FederationMemberIpfsCidsDeltaUpdateMessage(newDelta)) + .toList + supervisor.supervise(messages.traverse(redisPublisher.publish)) + }.void + } + private def triggerIpfsCidFileTreeUpdateCommand( message: FederationMemberChangelogUpdateMessage ): IO[IpfsCidDeltaUpdateEvent[?]] = { @@ -57,8 +90,6 @@ class FederationMemberChangelogUpdateEventListener( } private def logReceivedMessage(message: FederationMemberChangelogUpdateMessage): IO[Unit] = { - logger.info( - s"FederationMemberChangelogUpdateEventListener received over Redis pub/sub. $message" - ) + logger.info(s"Message received & processed over Redis pub/sub. message=$message") } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala index 30fa04790cd67acb6fd885ddbd3a48eb966946ef..3f7e67e930b75f24104ad5a7702a8a1b2c30c607 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala @@ -9,7 +9,8 @@ import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaQueryHandler import acab.devcon0.domain.ports.output.publisher.RedisPublisher import acab.devcon0.output.repository.redisutils.Redis import cats.effect._ -import cats.effect.unsafe.IORuntime +import cats.effect.std.Supervisor +import cats.implicits.toTraverseOps import dev.profunktor.redis4cats.data._ import fs2.Pipe import org.typelevel.log4cats.Logger @@ -21,9 +22,10 @@ class FederationMemberIpfsCidDeltaUpdateListener( redisPublisher: RedisPublisher[IO] ) { - private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global + cats.effect.unsafe.IORuntime.global implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val channel: RedisChannel[String] = Redis.Channels.federationMemberIpfsCidsDeltaUpdate + private val chunkSize = 500 def run(): Unit = { redisListener.run(channel, processMessage()) @@ -33,17 +35,36 @@ class FederationMemberIpfsCidDeltaUpdateListener( stream .evalMap(FederationMemberCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_)) .evalTap(logReceivedMessage) - .evalTap(reportFederationIpfsCidDelta) + .evalTap(reportDelta) .evalMap(* => IO.unit) } - private def reportFederationIpfsCidDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = { - (for - federationIpfsCidDelta <- getFederationDelta(message) - message = FederationIpfsCidsDeltaUpdateMessage(delta = federationIpfsCidDelta) - _ <- redisPublisher.publish(message) - yield ()).unsafeRunAndForget() - IO.unit + private def reportDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = { + for + delta <- getFederationDelta(message) + _ <- reportAdditions(delta) + _ <- reportRemovals(delta) + yield () + } + + private def reportAdditions(delta: FederationIpfsCidDelta): IO[Unit] = { + Supervisor[IO](await = true).use { supervisor => + val messages = delta.additions + .grouped(chunkSize) + .map(cids => FederationIpfsCidsDeltaUpdateMessage(FederationIpfsCidDelta(additions = cids, removals = List()))) + .toList + supervisor.supervise(messages.traverse(redisPublisher.publish)) + }.void + } + + private def reportRemovals(delta: FederationIpfsCidDelta): IO[Unit] = { + Supervisor[IO](await = true).use { supervisor => + val messages = delta.removals + .grouped(chunkSize) + .map(cids => FederationIpfsCidsDeltaUpdateMessage(FederationIpfsCidDelta(additions = List(), removals = cids))) + .toList + supervisor.supervise(messages.traverse(redisPublisher.publish)) + }.void } private def getFederationDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[FederationIpfsCidDelta] = { @@ -54,8 +75,8 @@ class FederationMemberIpfsCidDeltaUpdateListener( } private def logReceivedMessage(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = { - logger.info( - s"FederationMemberIpfsCidDeltaUpdateListener received over Redis pub/sub. $message" - ) + val additions: Int = message.delta.additions.size + val removals: Int = message.delta.removals.size + logger.info(s"Message received & processed over Redis pub/sub. additions=$additions removals=$removals") } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala index fadc5a9e51d495699534dc736cbda7a09f11c3c2..69fe4e5ccaf2d3c674f5c788ee28f682dab9902e 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala @@ -15,7 +15,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger class Listener(redisClient: Resource[IO, RedisClient]) { private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8 def run(channel: RedisChannel[String], pipe: Pipe[IO, String, Unit]): Unit = { diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala index d3b8c1a177b2ea9a294d76c2fba8817ff8e7d5bd..677f39ec4b48f6bea507cd2f1d3960515019f04d 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala @@ -3,6 +3,8 @@ package acab.devcon0.output.publisher import acab.devcon0.domain.codecs.FederationCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs.Encoders.RedisSharingFolderUpdateMessage +import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidAdditionMessage +import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidRemovalMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage @@ -49,6 +51,18 @@ class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPu .flatMap(publishInner(_, Redis.Channels.federationIpfsCidsDeltaUpdate)) } + override def publish(message: FederationIpfsCidAdditionMessage): IO[Unit] = { + FederationCodecs.Encoders + .IpfsCidsAdditionMessage(message) + .flatMap(publishInner(_, Redis.Channels.federationIpfsCidAddition)) + } + + override def publish(message: FederationIpfsCidRemovalMessage): IO[Unit] = { + FederationCodecs.Encoders + .IpfsCidsRemovalMessage(message) + .flatMap(publishInner(_, Redis.Channels.federationIpfsCidRemoval)) + } + private def publishInner(event: String, redisChannel: RedisChannel[String]): IO[Unit] = { { for diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala index 3079e2309d3848aaa30662daded9334ac89c41f3..35b8cee1f8e8767a94b610046050bc8fc3af280e 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala @@ -22,8 +22,11 @@ object Redis { val ipfsClusterNodeId: String = "IPFS_CLUSTER_NODE_ID" } object Channels { + val federationIpfsCidsDeltaUpdate: RedisChannel[String] = RedisChannel("FEDERATION_IPFS_DELTA_UPDATE") + val federationIpfsCidAddition: RedisChannel[String] = RedisChannel("FEDERATION_IPFS_ADDITION") + val federationIpfsCidRemoval: RedisChannel[String] = RedisChannel("FEDERATION_IPFS_REMOVAL") + val federationMemberChangelogUpdate: RedisChannel[String] = RedisChannel("FEDERATION_MEMBER_CHANGELOG_UPDATE") - val federationIpfsCidsDeltaUpdate: RedisChannel[String] = RedisChannel("FEDERATION_MEMBER_IPFS_DELTA_UPDATE") val federationMemberSharingFolderUpdate: RedisChannel[String] = RedisChannel( "FEDERATION_MEMBER_SHARING_FOLDER_UPDATE" )