diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala index 682791542c13f0f2602e2083f4f98c92a0e5d48f..968e95915e27e70cb29a7ec8084fe19fc002e4dc 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala @@ -16,6 +16,7 @@ import acab.devcon0.trile.domain.dtos.pubsub.P2p.Metadata import cats.effect._ import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime +import cats.implicits.catsSyntaxMonadError import io.circe.Decoder import io.libp2p.core.pubsub.MessageApi import io.libp2p.core.pubsub.PubsubSubscription @@ -68,8 +69,7 @@ class P2pListener(p2pBackend: P2pBackend[IO]) { ): Unit = { Supervisor[IO](await = true) .use(supervisor => { - val messageAsString = getString(messageApi.getData) - (for message: Message[T] <- P2pCodecs.Decoders.Message[T](messageAsString) + (for message: Message[T] <- decode(messageApi) yield { lazy val matchingKey: Boolean = isMatchingKey(listenerParams.messageTypes, message.meta) lazy val shouldConsumeMessage: Boolean = isShouldConsumeMessage(message.meta, listenerParams.mode) @@ -79,6 +79,12 @@ class P2pListener(p2pBackend: P2pBackend[IO]) { .unsafeRunAndForget()(runtime) } + private def decode[T <: Data: Decoder](messageApi: MessageApi): IO[Message[T]] = { + P2pCodecs.Decoders + .Message[T](getString(messageApi.getData)) + .ensure(new RuntimeException("Drift in from"))(message => message.meta.from.equals(getString(messageApi.getFrom))) + } + private def isShouldConsumeMessage(metadata: Metadata, listenerMode: Mode): Boolean = { val maybeP2pPeerId: Option[P2pPeerId] = metadata.to @@ -96,6 +102,10 @@ class P2pListener(p2pBackend: P2pBackend[IO]) { p2pMessageKeys.contains(metadata.messageType) } + private def getString(byteArray: Array[Byte]): String = { + new String(byteArray.array, StandardCharsets.UTF_8) + } + private def getString(byteBuf: ByteBuf): String = { new String(byteBuf.array, StandardCharsets.UTF_8) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala index 159f396221576e10181b02d7e67b6e58d7ef9137..729d6d084965b5c3df4b99dc970a1704609065d3 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala @@ -29,7 +29,7 @@ private object FlatRepository { def getAll(ipfsCids: Set[IpfsCid]): RedisCommands[IO, String, String] => IO[Set[String]] = { redis => for keys <- IO(ipfsCids.map(key)) - resultMap <- keys.toList.traverse(redis.hmGet(_, "json")) + resultMap <- keys.toSeq.traverse(redis.hmGet(_, "json")) yield { resultMap.flatMap(_.values).toSet }