From 6fc246cbae6760892497149e4a562c6bcb71f46e Mon Sep 17 00:00:00 2001
From: trilero <trile@riseup.net>
Date: Fri, 12 Apr 2024 14:50:20 +0200
Subject: [PATCH] [+][FCB] Cats effects was giving warning on starvation,
 increase throughput performance for delta messages via chunks

+ 1st it was attempted to make it fully granular, but...it does not behave well. Keep it in a branch and check another time.
+ It feels like every message on the redis stream opens a whole IO thing and the controller was eating all the memory very fast.
+ By grouping/chunking the problem gets put under certain control.
+ Testing showed to handle 20K delta items with grace, not instant, but gracefully. Felt like a good balance for redis and the controller.
+ Smaller chunks was giving starvation warnings, larger it's quite impactful on memory due to deser.
+ Maybe it's time to do binary streams, specially in this particular area of the code.
+ In any case, 500 chunks handles 20K properly. Still not the desired performance, but also, IPFS cluster is quite a bottleneck here.

It's kind of a tricky problem. Newcomers, specially big members with over >100K files might find it unstable to put their shared folder online and this would be their very first experience with the software...
But also...what are the chances that such a big member would it find unacceptable to have to chunk their file population?
Still, no compromise, aim for handling millions of files with no problem.

What's left then is:
+ Checking why does the CPU starvation happens. In >100K cases, the problem will still be there.
+ Binary streams. RPC to the rescue? This would need some serious metrics.
+ At some point, the file tree resolution will also act as a bottleneck. For 15K it's 2MBs in Redis and quite some memory pressure in controller's backend. Some solution will be needed here at some point.
---
 ...IpfsCidDeltaUpdateCommandHandlerImpl.scala | 18 +++++--
 ...haringFolderUpdateCommandHandlerImpl.scala |  5 +-
 .../devcon0/domain/codecs/IpfsCodecs.scala    |  6 ---
 .../service/ipfscid/FacadeServiceImpl.scala   |  2 +-
 ...FederationIpfsCidDeltaUpdateListener.scala | 23 +++++----
 ...onMemberChangelogUpdateEventListener.scala | 49 +++++++++++++++----
 ...tionMemberIpfsCidDeltaUpdateListener.scala | 47 +++++++++++++-----
 .../devcon0/input/redispubsub/Listener.scala  |  2 +-
 .../output/publisher/RedisPublisherImpl.scala | 14 ++++++
 .../output/repository/redisutils/Redis.scala  |  5 +-
 10 files changed, 122 insertions(+), 49 deletions(-)

diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala
index 04cd5d4..31ab197 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala
@@ -21,11 +21,23 @@ class IpfsCidDeltaUpdateCommandHandlerImpl(
 
   override def handle(cmd: IpfsCidDeltaUpdateCommand): IO[IpfsCidDeltaUpdateEvent[?]] = {
     for
-      _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals)
-      _ <- ipfsCidCopiesService.decrement(cmd.delta.removals.toSet)
+      _ <- handleAdditions(cmd)
+      _ <- handleRemovals(cmd)
+    yield IpfsCidDeltaUpdateSuccessEvent()
+  }
+
+  private def handleAdditions(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = {
+    for
       _ <- ipfsCidCopiesService.increment(cmd.delta.additions.toSet)
       _ <- ipfsClusterService.addPins(cmd.delta.additions)
+    yield ()
+  }
+
+  private def handleRemovals(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = {
+    for
+      _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals)
+      _ <- ipfsCidCopiesService.decrement(cmd.delta.removals.toSet)
       _ <- ipfsClusterService.removePins(cmd.delta.removals)
-    yield IpfsCidDeltaUpdateSuccessEvent()
+    yield ()
   }
 }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala
index 8b2c9d6..971160b 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala
@@ -1,8 +1,7 @@
 package acab.devcon0.domain.adapters.federationmember
 
 import acab.devcon0.domain.dtos._
-import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus
-import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus.PINNED
+import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus._
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberSharingFolderUpdateMessage
 import acab.devcon0.domain.ports.input._
 import acab.devcon0.domain.ports.input.federationmember._
@@ -80,7 +79,7 @@ class SharingFolderUpdateCommandHandlerImpl(changelogService: ChangelogService[I
   private def buildFederationMemberChangelogItem(
       message: FederationMemberSharingFolderUpdateMessage
   ): IO[Option[FederationMemberChangelogItem]] = {
-    IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp, IpfsCidPinStatus.PINNED)))
+    IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp, PINNED)))
   }
 
   private def logFilterNotPassedCase(
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala
index 2a084f8..3c2719e 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/codecs/IpfsCodecs.scala
@@ -6,7 +6,6 @@ import io.circe._
 import io.circe.generic.auto._
 import io.circe.generic.semiauto.deriveEncoder
 import io.circe.jawn.decode
-import io.circe.syntax.EncoderOps
 
 object IpfsCodecs {
 
@@ -50,13 +49,8 @@ object IpfsCodecs {
   }
 
   object Encoders {
-    implicit val ipfsPubSubPongMessage: Encoder[IpfsPubSubPongMessage]             = deriveEncoder
     implicit val ipfsLsLinkResponse: Encoder[IpfsLsLinkResponse]                   = deriveEncoder
     implicit val ipfsLsResponse: Encoder[IpfsLsResponse]                           = deriveEncoder
     implicit val ipfsNamePublishHttpResponse: Encoder[IpfsNamePublishHttpResponse] = deriveEncoder
-
-    object IpfsPubSubPongMessage {
-      def apply(dto: IpfsPubSubPongMessage): IO[String] = IO(EncoderOps[IpfsPubSubPongMessage](dto).asJson.noSpaces)
-    }
   }
 }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala
index b9ca796..ee11b8c 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/FacadeServiceImpl.scala
@@ -57,7 +57,7 @@ class FacadeServiceImpl(
     else
       flatRepository
         .deleteAll(ipfsCids)
-        .flatTap(_ => logger.info(s"deleteFlats for ipfsCids=$ipfsCids succeeded"))
+        .flatTap(_ => logger.debug(s"deleteFlats for ipfsCids=$ipfsCids succeeded"))
         .onError(logError)
   }
 
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
index dde6e3f..acf92c1 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
@@ -4,9 +4,10 @@ import acab.devcon0.domain.codecs.FederationCodecs
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage
 import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommand
 import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommandHandler
-import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateEvent
 import acab.devcon0.output.repository.redisutils.Redis
+import acab.devcon0.trile.utils.IORetry
 import cats.effect._
+import cats.effect.std.Supervisor
 import cats.effect.unsafe.IORuntime
 import dev.profunktor.redis4cats.data._
 import fs2.Pipe
@@ -18,7 +19,7 @@ class FederationIpfsCidDeltaUpdateListener(
     ipfsCidDeltaUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler
 ) {
 
-  private implicit val runtime: IORuntime   = cats.effect.unsafe.IORuntime.global
+  cats.effect.unsafe.IORuntime.global
   implicit val logger: Logger[IO]           = Slf4jLogger.getLogger[IO]
   private val channel: RedisChannel[String] = Redis.Channels.federationIpfsCidsDeltaUpdate
 
@@ -29,25 +30,23 @@ class FederationIpfsCidDeltaUpdateListener(
   private def processMessage(): Pipe[IO, String, Unit] = stream => {
     stream
       .evalMap(FederationCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_))
-      .evalTap(logReceivedMessage)
       .evalTap(processMessageInner)
+      .evalTap(logReceivedMessage)
       .evalMap(* => IO.unit)
   }
 
   private def processMessageInner(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = {
-    (for _ <- triggerCommand(message)
-    yield ()).unsafeRunAndForget()
-    IO.unit
-  }
-
-  private def triggerCommand(message: FederationIpfsCidsDeltaUpdateMessage): IO[IpfsCidDeltaUpdateEvent[?]] = {
-    val cmd: IpfsCidDeltaUpdateCommand = IpfsCidDeltaUpdateCommand(delta = message.delta)
-    ipfsCidDeltaUpdateCommandHandler.handle(cmd)
+    Supervisor[IO](await = true)
+      .use(_ => {
+        val cmd: IpfsCidDeltaUpdateCommand = IpfsCidDeltaUpdateCommand(delta = message.delta)
+        IORetry.fibonacci(ipfsCidDeltaUpdateCommandHandler.handle(cmd))
+      })
+      .void
   }
 
   private def logReceivedMessage(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = {
     logger.info(
-      s"FederationIpfsCidDeltaUpdateListener received over Redis pub/sub. $message"
+      s"Message received & processed over Redis pub/sub. additions=${message.delta.additions.size} removals=${message.delta.removals.size}"
     )
   }
 }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
index acb3c2f..cf27491 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
@@ -2,27 +2,31 @@ package acab.devcon0.input.redispubsub
 
 import acab.devcon0.domain.codecs.FederationMemberCodecs
 import acab.devcon0.domain.codecs.FederationMemberCodecs.Decoders.ChangelogUpdateMessage
+import acab.devcon0.domain.dtos.FederationMemberIpfsCidDelta
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage
 import acab.devcon0.domain.ports.input.federationmember._
 import acab.devcon0.domain.ports.output.publisher.RedisPublisher
 import acab.devcon0.output.repository.redisutils.Redis
 import cats.effect._
+import cats.effect.std.Supervisor
 import cats.effect.unsafe.IORuntime
+import cats.implicits.toTraverseOps
 import dev.profunktor.redis4cats.data._
 import fs2.Pipe
 import org.typelevel.log4cats.Logger
 import org.typelevel.log4cats.slf4j.Slf4jLogger
 
 class FederationMemberChangelogUpdateEventListener(
-                                                    redisListener: Listener,
-                                                    ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler,
-                                                    redisPublisher: RedisPublisher[IO]
+    redisListener: Listener,
+    ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler,
+    redisPublisher: RedisPublisher[IO]
 ) {
 
   private implicit val runtime: IORuntime        = cats.effect.unsafe.IORuntime.global
   implicit val logger: Logger[IO]                = Slf4jLogger.getLogger[IO]
   private val redisChannel: RedisChannel[String] = Redis.Channels.federationMemberChangelogUpdate
+  private val chunkSize: Int                     = 500
 
   def run(): Unit = {
     redisListener.run(redisChannel, processMessage())
@@ -40,12 +44,41 @@ class FederationMemberChangelogUpdateEventListener(
     (for
       event <- triggerIpfsCidFileTreeUpdateCommand(message)
       _ <- event match
-        case IpfsCidDeltaUpdateSuccessEvent(delta) =>
-          redisPublisher.publish(FederationMemberIpfsCidsDeltaUpdateMessage(delta))
-        case IpfsCidDeltaUpdateErrorEvent() => IO.unit
+        case IpfsCidDeltaUpdateSuccessEvent(delta) => report(delta)
+        case IpfsCidDeltaUpdateErrorEvent()        => IO.unit
     yield ()).unsafeRunAndForget()
     IO.unit
   }
+
+  private def report(delta: FederationMemberIpfsCidDelta): IO[Unit] = {
+    for
+      _ <- reportAdditions(delta)
+      _ <- reportRemovals(delta)
+    yield ()
+  }
+
+  private def reportAdditions(delta: FederationMemberIpfsCidDelta): IO[Unit] = {
+    Supervisor[IO](await = true).use { supervisor =>
+      val messages: List[FederationMemberIpfsCidsDeltaUpdateMessage] = delta.additions
+        .grouped(chunkSize)
+        .map(cids => FederationMemberIpfsCidDelta(delta.federationMemberId, cids, List()))
+        .map(newDelta => FederationMemberIpfsCidsDeltaUpdateMessage(newDelta))
+        .toList
+      supervisor.supervise(messages.traverse(redisPublisher.publish))
+    }.void
+  }
+
+  private def reportRemovals(delta: FederationMemberIpfsCidDelta): IO[Unit] = {
+    Supervisor[IO](await = true).use { supervisor =>
+      val messages = delta.removals
+        .grouped(chunkSize)
+        .map(cids => FederationMemberIpfsCidDelta(delta.federationMemberId, List(), cids))
+        .map(newDelta => FederationMemberIpfsCidsDeltaUpdateMessage(newDelta))
+        .toList
+      supervisor.supervise(messages.traverse(redisPublisher.publish))
+    }.void
+  }
+
   private def triggerIpfsCidFileTreeUpdateCommand(
       message: FederationMemberChangelogUpdateMessage
   ): IO[IpfsCidDeltaUpdateEvent[?]] = {
@@ -57,8 +90,6 @@ class FederationMemberChangelogUpdateEventListener(
   }
 
   private def logReceivedMessage(message: FederationMemberChangelogUpdateMessage): IO[Unit] = {
-    logger.info(
-      s"FederationMemberChangelogUpdateEventListener received over Redis pub/sub. $message"
-    )
+    logger.info(s"Message received & processed over Redis pub/sub. message=$message")
   }
 }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
index 30fa047..3f7e67e 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
@@ -9,7 +9,8 @@ import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaQueryHandler
 import acab.devcon0.domain.ports.output.publisher.RedisPublisher
 import acab.devcon0.output.repository.redisutils.Redis
 import cats.effect._
-import cats.effect.unsafe.IORuntime
+import cats.effect.std.Supervisor
+import cats.implicits.toTraverseOps
 import dev.profunktor.redis4cats.data._
 import fs2.Pipe
 import org.typelevel.log4cats.Logger
@@ -21,9 +22,10 @@ class FederationMemberIpfsCidDeltaUpdateListener(
     redisPublisher: RedisPublisher[IO]
 ) {
 
-  private implicit val runtime: IORuntime   = cats.effect.unsafe.IORuntime.global
+  cats.effect.unsafe.IORuntime.global
   implicit val logger: Logger[IO]           = Slf4jLogger.getLogger[IO]
   private val channel: RedisChannel[String] = Redis.Channels.federationMemberIpfsCidsDeltaUpdate
+  private val chunkSize                     = 500
 
   def run(): Unit = {
     redisListener.run(channel, processMessage())
@@ -33,17 +35,36 @@ class FederationMemberIpfsCidDeltaUpdateListener(
     stream
       .evalMap(FederationMemberCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_))
       .evalTap(logReceivedMessage)
-      .evalTap(reportFederationIpfsCidDelta)
+      .evalTap(reportDelta)
       .evalMap(* => IO.unit)
   }
 
-  private def reportFederationIpfsCidDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = {
-    (for
-      federationIpfsCidDelta <- getFederationDelta(message)
-      message = FederationIpfsCidsDeltaUpdateMessage(delta = federationIpfsCidDelta)
-      _ <- redisPublisher.publish(message)
-    yield ()).unsafeRunAndForget()
-    IO.unit
+  private def reportDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = {
+    for
+      delta <- getFederationDelta(message)
+      _     <- reportAdditions(delta)
+      _     <- reportRemovals(delta)
+    yield ()
+  }
+
+  private def reportAdditions(delta: FederationIpfsCidDelta): IO[Unit] = {
+    Supervisor[IO](await = true).use { supervisor =>
+      val messages = delta.additions
+        .grouped(chunkSize)
+        .map(cids => FederationIpfsCidsDeltaUpdateMessage(FederationIpfsCidDelta(additions = cids, removals = List())))
+        .toList
+      supervisor.supervise(messages.traverse(redisPublisher.publish))
+    }.void
+  }
+
+  private def reportRemovals(delta: FederationIpfsCidDelta): IO[Unit] = {
+    Supervisor[IO](await = true).use { supervisor =>
+      val messages = delta.removals
+        .grouped(chunkSize)
+        .map(cids => FederationIpfsCidsDeltaUpdateMessage(FederationIpfsCidDelta(additions = List(), removals = cids)))
+        .toList
+      supervisor.supervise(messages.traverse(redisPublisher.publish))
+    }.void
   }
 
   private def getFederationDelta(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[FederationIpfsCidDelta] = {
@@ -54,8 +75,8 @@ class FederationMemberIpfsCidDeltaUpdateListener(
   }
 
   private def logReceivedMessage(message: FederationMemberIpfsCidsDeltaUpdateMessage): IO[Unit] = {
-    logger.info(
-      s"FederationMemberIpfsCidDeltaUpdateListener received over Redis pub/sub. $message"
-    )
+    val additions: Int = message.delta.additions.size
+    val removals: Int  = message.delta.removals.size
+    logger.info(s"Message received & processed over Redis pub/sub. additions=$additions removals=$removals")
   }
 }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
index fadc5a9..69fe4e5 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
@@ -15,7 +15,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
 class Listener(redisClient: Resource[IO, RedisClient]) {
 
   private implicit val runtime: IORuntime             = cats.effect.unsafe.IORuntime.global
-  implicit val logger: Logger[IO]                     = Slf4jLogger.getLogger[IO]
+  private implicit val logger: Logger[IO]             = Slf4jLogger.getLogger[IO]
   private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8
 
   def run(channel: RedisChannel[String], pipe: Pipe[IO, String, Unit]): Unit = {
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
index d3b8c1a..677f39e 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
@@ -3,6 +3,8 @@ package acab.devcon0.output.publisher
 import acab.devcon0.domain.codecs.FederationCodecs
 import acab.devcon0.domain.codecs.FederationMemberCodecs
 import acab.devcon0.domain.codecs.FederationMemberCodecs.Encoders.RedisSharingFolderUpdateMessage
+import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidAdditionMessage
+import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidRemovalMessage
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage
 import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage
@@ -49,6 +51,18 @@ class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPu
       .flatMap(publishInner(_, Redis.Channels.federationIpfsCidsDeltaUpdate))
   }
 
+  override def publish(message: FederationIpfsCidAdditionMessage): IO[Unit] = {
+    FederationCodecs.Encoders
+      .IpfsCidsAdditionMessage(message)
+      .flatMap(publishInner(_, Redis.Channels.federationIpfsCidAddition))
+  }
+
+  override def publish(message: FederationIpfsCidRemovalMessage): IO[Unit] = {
+    FederationCodecs.Encoders
+      .IpfsCidsRemovalMessage(message)
+      .flatMap(publishInner(_, Redis.Channels.federationIpfsCidRemoval))
+  }
+
   private def publishInner(event: String, redisChannel: RedisChannel[String]): IO[Unit] = {
     {
       for
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala
index 3079e23..35b8cee 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala
@@ -22,8 +22,11 @@ object Redis {
     val ipfsClusterNodeId: String = "IPFS_CLUSTER_NODE_ID"
   }
   object Channels {
+    val federationIpfsCidsDeltaUpdate: RedisChannel[String] = RedisChannel("FEDERATION_IPFS_DELTA_UPDATE")
+    val federationIpfsCidAddition: RedisChannel[String]     = RedisChannel("FEDERATION_IPFS_ADDITION")
+    val federationIpfsCidRemoval: RedisChannel[String]      = RedisChannel("FEDERATION_IPFS_REMOVAL")
+
     val federationMemberChangelogUpdate: RedisChannel[String] = RedisChannel("FEDERATION_MEMBER_CHANGELOG_UPDATE")
-    val federationIpfsCidsDeltaUpdate: RedisChannel[String]   = RedisChannel("FEDERATION_MEMBER_IPFS_DELTA_UPDATE")
     val federationMemberSharingFolderUpdate: RedisChannel[String] = RedisChannel(
       "FEDERATION_MEMBER_SHARING_FOLDER_UPDATE"
     )
-- 
GitLab