diff --git a/federation-controller-backend/build.sbt b/federation-controller-backend/build.sbt
index 9f5b758284f968a2b79d0d2f4f4f4984c8d94d6e..4d5ffd82d46a72479f8bf921dacf1a7c27a2ba8e 100644
--- a/federation-controller-backend/build.sbt
+++ b/federation-controller-backend/build.sbt
@@ -22,6 +22,7 @@ resolvers += ProjectResolvers.jitpack
resolvers += ProjectResolvers.libP2pRepo
libraryDependencies ++= Seq(
+ "org.apache.commons" % "commons-lang3" % "3.14.0",
Dependencies.circeFs2,
Dependencies.http4sEmber,
Dependencies.http4sDsl,
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
index c2469fb40503b43eb3e256f4bfbfdde921dbafc3..ed514de56a02ef5296bed5bc031addb8e74647eb 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
@@ -68,6 +68,10 @@ object CommonsIoC {
redisClient = CommonsIoC.InputOutput.redisClientFactory()
)
+ val redisBinaryListener: redispubsub.ByteListener = new ByteListener(
+ redisClient = CommonsIoC.InputOutput.redisClientFactory()
+ )
+
val nodeRoute: FederationMemberRoute =
FederationMemberRoute(informationQueryHandler = FederationMemberIoC.Domain.Port.informationQueryHandler)
val ipfsCidRoute: IpfsCidRoute = IpfsCidRoute(
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala
index e8cf665a0291963a2c5bdac0f58a11804216d875..3a1611e976a7efe6124eeab2b145d81390393ef8 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala
@@ -28,7 +28,7 @@ object FederationIoC {
}
val ipfsCidDeltaUpdateRedisListener: FederationIpfsCidDeltaUpdateListener = {
new FederationIpfsCidDeltaUpdateListener(
- redisListener = ioc.CommonsIoC.Input.redisListener,
+ redisListener = ioc.CommonsIoC.Input.redisBinaryListener,
ipfsCidDeltaUpdateCommandHandler = ioc.FederationIoC.Domain.Port.ipfsCidUpdateCommandHandler
)
}
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
index acf92c14c97bb0a1efa99999b6e0fb9d573772fc..87f30d248ef4c2561dd261a7dba65bd0ace4a148 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala
@@ -1,4 +1,5 @@
package acab.devcon0.input.redispubsub
+import java.time.Instant
import acab.devcon0.domain.codecs.FederationCodecs
import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage
@@ -11,11 +12,12 @@ import cats.effect.std.Supervisor
import cats.effect.unsafe.IORuntime
import dev.profunktor.redis4cats.data._
import fs2.Pipe
+import org.apache.commons.lang3.SerializationUtils
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
class FederationIpfsCidDeltaUpdateListener(
- redisListener: Listener,
+ redisListener: ByteListener,
ipfsCidDeltaUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler
) {
@@ -27,9 +29,11 @@ class FederationIpfsCidDeltaUpdateListener(
redisListener.run(channel, processMessage())
}
- private def processMessage(): Pipe[IO, String, Unit] = stream => {
+ private def processMessage(): Pipe[IO, Array[Byte], Unit] = stream => {
stream
- .evalMap(FederationCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_))
+ .evalTap(bytes => logTime())
+ .evalMap(bytes => IO(SerializationUtils.deserialize[FederationIpfsCidsDeltaUpdateMessage](bytes)))
+ .evalTap(bytes => logTime())
.evalTap(processMessageInner)
.evalTap(logReceivedMessage)
.evalMap(* => IO.unit)
@@ -44,6 +48,10 @@ class FederationIpfsCidDeltaUpdateListener(
.void
}
+ private def logTime(): IO[Unit] = {
+ logger.info(s"now=${Instant.now()}")
+ }
+
private def logReceivedMessage(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = {
logger.info(
s"Message received & processed over Redis pub/sub. additions=${message.delta.additions.size} removals=${message.delta.removals.size}"
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
index 69fe4e5ccaf2d3c674f5c788ee28f682dab9902e..565a26e461e7e5d6e7a069940b188bdd8121bfb4 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala
@@ -9,6 +9,8 @@ import dev.profunktor.redis4cats.pubsub.PubSub
import dev.profunktor.redis4cats.pubsub.SubscribeCommands
import fs2.Pipe
import fs2.Stream
+import io.lettuce.core.codec.ByteArrayCodec
+import io.lettuce.core.codec.StringCodec
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
@@ -40,3 +42,34 @@ class Listener(redisClient: Resource[IO, RedisClient]) {
Stream.resource(PubSub.mkSubscriberConnection[IO, String, String](client, stringCodec))
}
}
+
+class ByteListener(redisClient: Resource[IO, RedisClient]) {
+
+ private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global
+ private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
+ private val bytesCodec: RedisCodec[String, Array[Byte]] = RedisCodec(
+ io.lettuce.core.codec.RedisCodec.of(new StringCodec(), new ByteArrayCodec())
+ )
+
+ def run(channel: RedisChannel[String], pipe: Pipe[IO, Array[Byte], Unit]): Unit = {
+ subscribeToRedisStream(channel)
+ .through(pipe)
+ .compile
+ .foldMonoid
+ .foreverM
+ .unsafeRunAndForget()
+ }
+
+ private def subscribeToRedisStream(channel: RedisChannel[String]): Stream[IO, Array[Byte]] = {
+ Stream
+ .resource(redisClient)
+ .flatMap(getSubscriberConnectionResource)
+ .flatMap(_.subscribe(channel))
+ }
+
+ private def getSubscriberConnectionResource(
+ client: RedisClient
+ ): Stream[IO, SubscribeCommands[[String] =>> Stream[IO, String], String, Array[Byte]]] = {
+ Stream.resource(PubSub.mkSubscriberConnection[IO, String, Array[Byte]](client, bytesCodec))
+ }
+}
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
index d3b8c1a177b2ea9a294d76c2fba8817ff8e7d5bd..f69b2b7f2ab2bb58b4403703a68979acdde02526 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala
@@ -1,6 +1,5 @@
package acab.devcon0.output.publisher
-import acab.devcon0.domain.codecs.FederationCodecs
import acab.devcon0.domain.codecs.FederationMemberCodecs
import acab.devcon0.domain.codecs.FederationMemberCodecs.Encoders.RedisSharingFolderUpdateMessage
import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage
@@ -9,20 +8,28 @@ import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdate
import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberSharingFolderUpdateMessage
import acab.devcon0.domain.ports.output.publisher.RedisPublisher
import acab.devcon0.output.repository.redisutils.Redis
+import acab.devcon0.trile.utils.EffectsUtils
import cats.effect.IO
import cats.effect.kernel.Resource
+import cats.implicits.catsSyntaxMonadError
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.log4cats._
import dev.profunktor.redis4cats.pubsub.PubSub
import fs2.Stream
+import io.lettuce.core.codec.ByteArrayCodec
+import io.lettuce.core.codec.StringCodec
+import org.apache.commons.lang3.SerializationUtils
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPublisher[IO] {
private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8
+ private val bytesCodec: RedisCodec[String, Array[Byte]] = RedisCodec(
+ io.lettuce.core.codec.RedisCodec.of(new StringCodec(), new ByteArrayCodec())
+ )
override def publish(message: FederationMemberSharingFolderUpdateMessage): IO[Unit] = {
for
@@ -44,9 +51,20 @@ class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPu
}
override def publish(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = {
- FederationCodecs.Encoders
- .IpfsCidsDeltaUpdateMessage(message)
- .flatMap(publishInner(_, Redis.Channels.federationIpfsCidsDeltaUpdate))
+ IO(SerializationUtils.serialize(message))
+ .flatMap(publishBinary(_, Redis.Channels.federationIpfsCidsDeltaUpdate))
+ }
+
+ private def publishBinary(event: Array[Byte], redisChannel: RedisChannel[String]): IO[Unit] = {
+ {
+ for
+ client <- Stream.resource(redisClient)
+ pubSub <- Stream.resource(PubSub.mkPublisherConnection[IO, String, Array[Byte]](client, bytesCodec))
+ pub1 = pubSub.publish(redisChannel)
+ stream = Stream.eval(IO(event)).through(pub1)
+ _ <- stream.through({ stream => stream.evalTap(_ => IO(())) })
+ yield stream
+ }.compile.drain.attemptTap(EffectsUtils.attemptTLog)
}
private def publishInner(event: String, redisChannel: RedisChannel[String]): IO[Unit] = {