From 102ad6c7b29ec8c681480248a4ac65e57f920bb2 Mon Sep 17 00:00:00 2001
From: trilero <trile@riseup.net>
Date: Thu, 18 Apr 2024 08:34:09 +0200
Subject: [PATCH] [+][FCB] Verify from-s are matching, detect malicious
 attempts.

+ Signature happens under the hood
---
 .../acab/devcon0/input/p2ppubsub/P2pListener.scala | 14 ++++++++++++--
 .../output/repository/ipfscid/FlatRepository.scala |  2 +-
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala
index 6827915..968e959 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/P2pListener.scala
@@ -16,6 +16,7 @@ import acab.devcon0.trile.domain.dtos.pubsub.P2p.Metadata
 import cats.effect._
 import cats.effect.std.Supervisor
 import cats.effect.unsafe.IORuntime
+import cats.implicits.catsSyntaxMonadError
 import io.circe.Decoder
 import io.libp2p.core.pubsub.MessageApi
 import io.libp2p.core.pubsub.PubsubSubscription
@@ -68,8 +69,7 @@ class P2pListener(p2pBackend: P2pBackend[IO]) {
   ): Unit = {
     Supervisor[IO](await = true)
       .use(supervisor => {
-        val messageAsString = getString(messageApi.getData)
-        (for message: Message[T] <- P2pCodecs.Decoders.Message[T](messageAsString)
+        (for message: Message[T] <- decode(messageApi)
         yield {
           lazy val matchingKey: Boolean          = isMatchingKey(listenerParams.messageTypes, message.meta)
           lazy val shouldConsumeMessage: Boolean = isShouldConsumeMessage(message.meta, listenerParams.mode)
@@ -79,6 +79,12 @@ class P2pListener(p2pBackend: P2pBackend[IO]) {
       .unsafeRunAndForget()(runtime)
   }
 
+  private def decode[T <: Data: Decoder](messageApi: MessageApi): IO[Message[T]] = {
+    P2pCodecs.Decoders
+      .Message[T](getString(messageApi.getData))
+      .ensure(new RuntimeException("Drift in from"))(message => message.meta.from.equals(getString(messageApi.getFrom)))
+  }
+
   private def isShouldConsumeMessage(metadata: Metadata, listenerMode: Mode): Boolean = {
     val maybeP2pPeerId: Option[P2pPeerId] = metadata.to
 
@@ -96,6 +102,10 @@ class P2pListener(p2pBackend: P2pBackend[IO]) {
     p2pMessageKeys.contains(metadata.messageType)
   }
 
+  private def getString(byteArray: Array[Byte]): String = {
+    new String(byteArray.array, StandardCharsets.UTF_8)
+  }
+
   private def getString(byteBuf: ByteBuf): String = {
     new String(byteBuf.array, StandardCharsets.UTF_8)
   }
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala
index 159f396..729d6d0 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/FlatRepository.scala
@@ -29,7 +29,7 @@ private object FlatRepository {
     def getAll(ipfsCids: Set[IpfsCid]): RedisCommands[IO, String, String] => IO[Set[String]] = { redis =>
       for
         keys      <- IO(ipfsCids.map(key))
-        resultMap <- keys.toList.traverse(redis.hmGet(_, "json"))
+        resultMap <- keys.toSeq.traverse(redis.hmGet(_, "json"))
       yield {
         resultMap.flatMap(_.values).toSet
       }
-- 
GitLab