diff --git a/federation-controller-backend/build.sbt b/federation-controller-backend/build.sbt index 4d5ffd82d46a72479f8bf921dacf1a7c27a2ba8e..9f5b758284f968a2b79d0d2f4f4f4984c8d94d6e 100644 --- a/federation-controller-backend/build.sbt +++ b/federation-controller-backend/build.sbt @@ -22,7 +22,6 @@ resolvers += ProjectResolvers.jitpack resolvers += ProjectResolvers.libP2pRepo libraryDependencies ++= Seq( - "org.apache.commons" % "commons-lang3" % "3.14.0", Dependencies.circeFs2, Dependencies.http4sEmber, Dependencies.http4sDsl, diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala index ed514de56a02ef5296bed5bc031addb8e74647eb..c2469fb40503b43eb3e256f4bfbfdde921dbafc3 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala @@ -68,10 +68,6 @@ object CommonsIoC { redisClient = CommonsIoC.InputOutput.redisClientFactory() ) - val redisBinaryListener: redispubsub.ByteListener = new ByteListener( - redisClient = CommonsIoC.InputOutput.redisClientFactory() - ) - val nodeRoute: FederationMemberRoute = FederationMemberRoute(informationQueryHandler = FederationMemberIoC.Domain.Port.informationQueryHandler) val ipfsCidRoute: IpfsCidRoute = IpfsCidRoute( diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala index 3a1611e976a7efe6124eeab2b145d81390393ef8..e8cf665a0291963a2c5bdac0f58a11804216d875 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala @@ -28,7 +28,7 @@ object FederationIoC { } val ipfsCidDeltaUpdateRedisListener: FederationIpfsCidDeltaUpdateListener = { new FederationIpfsCidDeltaUpdateListener( - redisListener = ioc.CommonsIoC.Input.redisBinaryListener, + redisListener = ioc.CommonsIoC.Input.redisListener, ipfsCidDeltaUpdateCommandHandler = ioc.FederationIoC.Domain.Port.ipfsCidUpdateCommandHandler ) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala index 87f30d248ef4c2561dd261a7dba65bd0ace4a148..acf92c14c97bb0a1efa99999b6e0fb9d573772fc 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationIpfsCidDeltaUpdateListener.scala @@ -1,5 +1,4 @@ package acab.devcon0.input.redispubsub -import java.time.Instant import acab.devcon0.domain.codecs.FederationCodecs import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage @@ -12,12 +11,11 @@ import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime import dev.profunktor.redis4cats.data._ import fs2.Pipe -import org.apache.commons.lang3.SerializationUtils import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class FederationIpfsCidDeltaUpdateListener( - redisListener: ByteListener, + redisListener: Listener, ipfsCidDeltaUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler ) { @@ -29,11 +27,9 @@ class FederationIpfsCidDeltaUpdateListener( redisListener.run(channel, processMessage()) } - private def processMessage(): Pipe[IO, Array[Byte], Unit] = stream => { + private def processMessage(): Pipe[IO, String, Unit] = stream => { stream - .evalTap(bytes => logTime()) - .evalMap(bytes => IO(SerializationUtils.deserialize[FederationIpfsCidsDeltaUpdateMessage](bytes))) - .evalTap(bytes => logTime()) + .evalMap(FederationCodecs.Decoders.IpfsCidsDeltaUpdateMessage(_)) .evalTap(processMessageInner) .evalTap(logReceivedMessage) .evalMap(* => IO.unit) @@ -48,10 +44,6 @@ class FederationIpfsCidDeltaUpdateListener( .void } - private def logTime(): IO[Unit] = { - logger.info(s"now=${Instant.now()}") - } - private def logReceivedMessage(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = { logger.info( s"Message received & processed over Redis pub/sub. additions=${message.delta.additions.size} removals=${message.delta.removals.size}" diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala index 565a26e461e7e5d6e7a069940b188bdd8121bfb4..69fe4e5ccaf2d3c674f5c788ee28f682dab9902e 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/Listener.scala @@ -9,8 +9,6 @@ import dev.profunktor.redis4cats.pubsub.PubSub import dev.profunktor.redis4cats.pubsub.SubscribeCommands import fs2.Pipe import fs2.Stream -import io.lettuce.core.codec.ByteArrayCodec -import io.lettuce.core.codec.StringCodec import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -42,34 +40,3 @@ class Listener(redisClient: Resource[IO, RedisClient]) { Stream.resource(PubSub.mkSubscriberConnection[IO, String, String](client, stringCodec)) } } - -class ByteListener(redisClient: Resource[IO, RedisClient]) { - - private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global - private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] - private val bytesCodec: RedisCodec[String, Array[Byte]] = RedisCodec( - io.lettuce.core.codec.RedisCodec.of(new StringCodec(), new ByteArrayCodec()) - ) - - def run(channel: RedisChannel[String], pipe: Pipe[IO, Array[Byte], Unit]): Unit = { - subscribeToRedisStream(channel) - .through(pipe) - .compile - .foldMonoid - .foreverM - .unsafeRunAndForget() - } - - private def subscribeToRedisStream(channel: RedisChannel[String]): Stream[IO, Array[Byte]] = { - Stream - .resource(redisClient) - .flatMap(getSubscriberConnectionResource) - .flatMap(_.subscribe(channel)) - } - - private def getSubscriberConnectionResource( - client: RedisClient - ): Stream[IO, SubscribeCommands[[String] =>> Stream[IO, String], String, Array[Byte]]] = { - Stream.resource(PubSub.mkSubscriberConnection[IO, String, Array[Byte]](client, bytesCodec)) - } -} diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala index f69b2b7f2ab2bb58b4403703a68979acdde02526..d3b8c1a177b2ea9a294d76c2fba8817ff8e7d5bd 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala @@ -1,5 +1,6 @@ package acab.devcon0.output.publisher +import acab.devcon0.domain.codecs.FederationCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs.Encoders.RedisSharingFolderUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage @@ -8,28 +9,20 @@ import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdate import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberSharingFolderUpdateMessage import acab.devcon0.domain.ports.output.publisher.RedisPublisher import acab.devcon0.output.repository.redisutils.Redis -import acab.devcon0.trile.utils.EffectsUtils import cats.effect.IO import cats.effect.kernel.Resource -import cats.implicits.catsSyntaxMonadError import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.data.RedisChannel import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.log4cats._ import dev.profunktor.redis4cats.pubsub.PubSub import fs2.Stream -import io.lettuce.core.codec.ByteArrayCodec -import io.lettuce.core.codec.StringCodec -import org.apache.commons.lang3.SerializationUtils import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPublisher[IO] { private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8 - private val bytesCodec: RedisCodec[String, Array[Byte]] = RedisCodec( - io.lettuce.core.codec.RedisCodec.of(new StringCodec(), new ByteArrayCodec()) - ) override def publish(message: FederationMemberSharingFolderUpdateMessage): IO[Unit] = { for @@ -51,20 +44,9 @@ class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPu } override def publish(message: FederationIpfsCidsDeltaUpdateMessage): IO[Unit] = { - IO(SerializationUtils.serialize(message)) - .flatMap(publishBinary(_, Redis.Channels.federationIpfsCidsDeltaUpdate)) - } - - private def publishBinary(event: Array[Byte], redisChannel: RedisChannel[String]): IO[Unit] = { - { - for - client <- Stream.resource(redisClient) - pubSub <- Stream.resource(PubSub.mkPublisherConnection[IO, String, Array[Byte]](client, bytesCodec)) - pub1 = pubSub.publish(redisChannel) - stream = Stream.eval(IO(event)).through(pub1) - _ <- stream.through({ stream => stream.evalTap(_ => IO(())) }) - yield stream - }.compile.drain.attemptTap(EffectsUtils.attemptTLog) + FederationCodecs.Encoders + .IpfsCidsDeltaUpdateMessage(message) + .flatMap(publishInner(_, Redis.Channels.federationIpfsCidsDeltaUpdate)) } private def publishInner(event: String, redisChannel: RedisChannel[String]): IO[Unit] = {