diff --git a/federation-controller-backend/project/Dependencies.scala b/federation-controller-backend/project/Dependencies.scala index f4ae534224d32683978f655eb0c866d99cb3722e..b578f3ccd1696e04ad74d6a3f051849f791c9bba 100644 --- a/federation-controller-backend/project/Dependencies.scala +++ b/federation-controller-backend/project/Dependencies.scala @@ -12,8 +12,8 @@ object Dependencies { val redis4CatsLogs = "dev.profunktor" %% "redis4cats-log4cats" % DependencyVersions.redis4Cats val redis4CatsStreams = "dev.profunktor" %% "redis4cats-streams" % DependencyVersions.redis4Cats val redis4CatsEffects = "dev.profunktor" %% "redis4cats-effects" % DependencyVersions.redis4Cats - val sttp = "com.softwaremill.sttp.client3" %% "core" % DependencyVersions.sttp - val sttpFs2Backend = "com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % DependencyVersions.sttp + val sttp = "com.softwaremill.sttp.client4" %% "core" % DependencyVersions.sttp + val sttpFs2Backend = "com.softwaremill.sttp.client4" %% "async-http-client-backend-fs2" % DependencyVersions.sttp val trileBackendCommons = "acab.devcon0" %% "trile-backend-commons" % "0.1.0-SNAPSHOT" val typeSafeScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % DependencyVersions.typeSafeScalaLogging } diff --git a/federation-controller-backend/project/DependencyVersions.scala b/federation-controller-backend/project/DependencyVersions.scala index 7fe543fd93c43bb210a5133007b3b4f2eaccb01f..686594d7da5ddf14e2684ab916b35056e4dabde2 100644 --- a/federation-controller-backend/project/DependencyVersions.scala +++ b/federation-controller-backend/project/DependencyVersions.scala @@ -2,10 +2,10 @@ object DependencyVersions { val circe = "0.14.6" val circeFs2 = "0.14.1" val http4s = "0.23.16" - val libp2p = "1.1.0-RELEASE" + val libP2p = "1.1.0-RELEASE" val logback = "1.4.14" val redis4Cats = "1.5.2" - val sttp = "3.9.2" + val sttp = "4.0.0-M11" val typeSafeConfig = "1.4.3" val typeSafeScalaLogging = "3.9.5" } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala index c2469fb40503b43eb3e256f4bfbfdde921dbafc3..b28ae887bb203ae0d8b7929412d72f929be342d9 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala @@ -25,10 +25,6 @@ import dev.profunktor.redis4cats.data._ import dev.profunktor.redis4cats.log4cats._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import sttp.capabilities.WebSockets -import sttp.capabilities.fs2.Fs2Streams -import sttp.client3.SttpBackend -import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend object CommonsIoC { @@ -36,9 +32,6 @@ object CommonsIoC { val p2pBackend: P2pBackend[IO] = new P2pBackendImpl(p2pConfiguration = Configuration.configuration.p2p) - val sttpBackendResource: Resource[IO, SttpBackend[IO, Fs2Streams[IO] & WebSockets]] = - AsyncHttpClientFs2Backend.resource[IO]() - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8 private val redisConfiguration: RedisConfiguration = Configuration.configuration.redis diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala index 907a883dd9fec1713f5df351da35b322f01f3b12..0919f1fd300ccac74b2eb34d9508a67dfeca1de8 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala @@ -18,7 +18,9 @@ import acab.devcon0.output.client.IpfsClientImpl import acab.devcon0.output.client.IpfsClusterClientImpl import acab.devcon0.output.repository.ipfscid._ import cats.effect.IO -import sttp.client3.HttpURLConnectionBackend +import cats.effect.Resource +import sttp.client4.SyncBackend +import sttp.client4.httpurlconnection.HttpURLConnectionBackend object IpfsIoC { @@ -81,13 +83,16 @@ object IpfsIoC { object Output { + private val backend: SyncBackend = HttpURLConnectionBackend() + private val backendResource: Resource[IO, SyncBackend] = Resource.make(IO(backend))(res => IO(res.close())) + val ipfsClusterClient: IpfsClusterClient = new IpfsClusterClientImpl( apiUrl = Configuration.configuration.ipfsCluster.apiUrl, - sttpBackendResource = CommonsIoC.InputOutput.sttpBackendResource + backendResource = backendResource ) val ipfsClient: IpfsClient = new IpfsClientImpl( - sttpBackend = HttpURLConnectionBackend(), + backendResource = backendResource, ipfsConfiguration = Configuration.configuration.ipfs ) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala index 8900912c9933f5574c655931ea720228c66498a4..d6328fad59dc80aa7952a6c58464430fdefde9ab 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala @@ -31,7 +31,7 @@ class FederationMemberChangelogUpdateEventListener( cats.effect.unsafe.IORuntime.global private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val redisChannel: RedisChannel[String] = Redis.Channels.federationMemberChangelogUpdate - private val chunkSize: Int = 500 + private val chunkSize: Int = 5000 def run(): Unit = { redisListener.run(redisChannel, processMessage()) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala index 3f7e67e930b75f24104ad5a7702a8a1b2c30c607..060e7f3f50db76ff6591c0a2d81beec9175273f2 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala @@ -25,7 +25,7 @@ class FederationMemberIpfsCidDeltaUpdateListener( cats.effect.unsafe.IORuntime.global implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val channel: RedisChannel[String] = Redis.Channels.federationMemberIpfsCidsDeltaUpdate - private val chunkSize = 500 + private val chunkSize = 5000 def run(): Unit = { redisListener.run(channel, processMessage()) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala index 52043043964b414181f3637a361196613f8f31d6..75650f59c0fd1f5439c653cfe64c5fab77ca33d4 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala @@ -105,6 +105,8 @@ class FederationMemberSharingFolderUpdateListener( } private def logReceivedMessage(message: FederationMemberSharingFolderUpdateMessage): IO[Unit] = { - logger.debug(s"Redis pub/sub. p2pPeerId=${message.federationMemberId} ipfsCid=${message.ipfsCid}}") + logger.info( + s"Message received & processed over Redis pub/sub. p2pPeerId=${message.federationMemberId} ipfsCid=${message.ipfsCid}}" + ) } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala index 76773ecce3caaf743c2ffe3666075f9a711db987..f570e7d91906a23bb2819562bb9c3bace3f0e6d2 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala @@ -2,35 +2,28 @@ package acab.devcon0.output.client import acab.devcon0.configuration.IpfsConfiguration import acab.devcon0.domain.codecs.IpfsCodecs -import acab.devcon0.domain.dtos._ +import acab.devcon0.domain.dtos.* import acab.devcon0.domain.ports.output.client.IpfsClient import acab.devcon0.trile.domain.dtos.aliases.IpfsCid -import cats.effect.IO +import acab.devcon0.trile.utils.EffectsUtils +import cats.effect.{IO, Resource} +import cats.implicits.catsSyntaxMonadError import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import sttp.client3._ +import sttp.client4.* import sttp.model.Uri -class IpfsClientImpl(sttpBackend: SttpBackend[Identity, Any], ipfsConfiguration: IpfsConfiguration) extends IpfsClient { +class IpfsClientImpl(backendResource: Resource[IO, SyncBackend], ipfsConfiguration: IpfsConfiguration) + extends IpfsClient { - private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] - private val apiUrl: String = ipfsConfiguration.apiUrl + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private val apiUrl: String = ipfsConfiguration.apiUrl override def ls(ipfsCID: IpfsCid): IO[IpfsLsResponse] = { - ipfsLsRequest(ipfsCID) - .flatTap(logResponse) - .flatMap(response => IpfsCodecs.Decoders.LsResponse(response.body.getOrElse(""))) - .onError(throwable => logger.error(s"throwable=$throwable")) - } - - private def ipfsLsRequest(ipfsCID: IpfsCid): IO[Identity[Response[Either[String, String]]]] = IO { val uri: Uri = uri"$apiUrl/ls?arg=$ipfsCID" - basicRequest - .post(uri = uri) - .send(sttpBackend) - } - - private def logResponse(response: Response[Either[String, String]]): IO[Unit] = { - logger.debug(s"code=${response.code} response=$response") + backendResource + .use(backend => { IO.blocking(basicRequest.post(uri = uri).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) + .flatMap(response => IpfsCodecs.Decoders.LsResponse(response.body.getOrElse(""))) } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala index b5c7b6887a15f063b3d7f2050dbcaaa75569e0a2..0ec574f544cf8b106c47325e2b3fc96bcf607ffe 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala @@ -1,91 +1,56 @@ package acab.devcon0.output.client -import scala.concurrent.duration.Duration +import scala.concurrent.duration.DurationInt import acab.devcon0.domain.ports.output.client.IpfsClusterClient import acab.devcon0.trile.domain.codecs.IpfsClusterCodecs import acab.devcon0.trile.domain.dtos._ import acab.devcon0.trile.domain.dtos.aliases.IpfsCid +import acab.devcon0.trile.utils.EffectsUtils import cats.effect.IO -import cats.effect.kernel.Resource +import cats.effect.Resource +import cats.implicits.catsSyntaxMonadError import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import sttp.capabilities.fs2.Fs2Streams -import sttp.client3._ +import sttp.client4._ import sttp.model.Uri class IpfsClusterClientImpl( apiUrl: String, - sttpBackendResource: Resource[IO, SttpBackend[IO, Fs2Streams[IO]]] + backendResource: Resource[IO, SyncBackend] ) extends IpfsClusterClient { implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] override def getPeers: IO[IpfsClusterPeers] = { - ipfsClusterPeersRequest() - .flatTap(logResponse(_, "getPeers")) + val uri: Uri = uri"$apiUrl/peers" + backendResource + .use(backend => { IO.blocking(basicRequest.get(uri = uri).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeers(response.body.getOrElse(""))) - .onError(throwable => logger.error(s"throwable=$throwable")) } override def getPeer: IO[IpfsClusterPeer] = { - ipfsClusterPeerRequest() - .flatTap(logResponse(_, "getPeer")) - .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeer(response.body.getOrElse(""))) - .onError(throwable => logger.error(s"throwable=$throwable")) - } - - private def ipfsClusterPeersRequest(): IO[Response[Either[String, String]]] = { - val uri: Uri = uri"$apiUrl/peers" - sttpBackendResource - .use { sttpBackend => - basicRequest - .get(uri = uri) - .send(sttpBackend) - } - } - - private def ipfsClusterPeerRequest(): IO[Response[Either[String, String]]] = { val uri: Uri = uri"$apiUrl/id" - sttpBackendResource - .use { sttpBackend => - basicRequest - .get(uri = uri) - .send(sttpBackend) - } + backendResource + .use(backend => { IO.blocking(basicRequest.get(uri = uri).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) + .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeer(response.body.getOrElse(""))) } override def addPin(ipfsCid: IpfsCid): IO[Unit] = { val uri: Uri = uri"$apiUrl/pins/$ipfsCid" - sttpBackendResource - .use { sttpBackend => - basicRequest - .post(uri) - .readTimeout(Duration.Inf) - .send(sttpBackend) - } - .flatMap(logResponse(_, "add")) - .onError(logError) + backendResource + .use(backend => { IO.blocking(basicRequest.post(uri = uri).readTimeout(1.minutes).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) + .void } override def removePin(ipfsCid: IpfsCid): IO[Unit] = { val uri: Uri = uri"$apiUrl/pins/$ipfsCid" - sttpBackendResource - .use { sttpBackend => - basicRequest - .delete(uri) - .readTimeout(Duration.Inf) - .send(sttpBackend) - } - .flatMap(logResponse(_, "remove")) - .onError(logError) - } - - private def logError(throwable: Throwable): IO[Unit] = { - logger.error(s"throwable=$throwable") - } - - private def logResponse(response: Response[Either[String, String]], label: String): IO[Unit] = { - logger.debug(s"label=$label code=${response.code} response=${response.toString.substring(0, 64)}") + backendResource + .use(backend => { IO.blocking(basicRequest.delete(uri = uri).readTimeout(1.minutes).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) + .void } }