From f5c10f07b20ad60611182a8e45aa95c25799a696 Mon Sep 17 00:00:00 2001
From: trilero <trile@riseup.net>
Date: Wed, 17 Apr 2024 08:31:16 +0200
Subject: [PATCH] [+][FCB] Network I/O may take place in the blocking thread
pool model
This enables raising the chunk size
---
.../project/Dependencies.scala | 4 +-
.../project/DependencyVersions.scala | 4 +-
.../acab/devcon0/boot/ioc/CommonsIoC.scala | 7 --
.../scala/acab/devcon0/boot/ioc/IpfsIoC.scala | 11 ++-
...onMemberChangelogUpdateEventListener.scala | 2 +-
...tionMemberIpfsCidDeltaUpdateListener.scala | 2 +-
...ionMemberSharingFolderUpdateListener.scala | 4 +-
.../output/client/IpfsClientImpl.scala | 33 +++-----
.../output/client/IpfsClusterClientImpl.scala | 79 ++++++-------------
9 files changed, 52 insertions(+), 94 deletions(-)
diff --git a/federation-controller-backend/project/Dependencies.scala b/federation-controller-backend/project/Dependencies.scala
index f4ae534..b578f3c 100644
--- a/federation-controller-backend/project/Dependencies.scala
+++ b/federation-controller-backend/project/Dependencies.scala
@@ -12,8 +12,8 @@ object Dependencies {
val redis4CatsLogs = "dev.profunktor" %% "redis4cats-log4cats" % DependencyVersions.redis4Cats
val redis4CatsStreams = "dev.profunktor" %% "redis4cats-streams" % DependencyVersions.redis4Cats
val redis4CatsEffects = "dev.profunktor" %% "redis4cats-effects" % DependencyVersions.redis4Cats
- val sttp = "com.softwaremill.sttp.client3" %% "core" % DependencyVersions.sttp
- val sttpFs2Backend = "com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % DependencyVersions.sttp
+ val sttp = "com.softwaremill.sttp.client4" %% "core" % DependencyVersions.sttp
+ val sttpFs2Backend = "com.softwaremill.sttp.client4" %% "async-http-client-backend-fs2" % DependencyVersions.sttp
val trileBackendCommons = "acab.devcon0" %% "trile-backend-commons" % "0.1.0-SNAPSHOT"
val typeSafeScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % DependencyVersions.typeSafeScalaLogging
}
diff --git a/federation-controller-backend/project/DependencyVersions.scala b/federation-controller-backend/project/DependencyVersions.scala
index 7fe543f..686594d 100644
--- a/federation-controller-backend/project/DependencyVersions.scala
+++ b/federation-controller-backend/project/DependencyVersions.scala
@@ -2,10 +2,10 @@ object DependencyVersions {
val circe = "0.14.6"
val circeFs2 = "0.14.1"
val http4s = "0.23.16"
- val libp2p = "1.1.0-RELEASE"
+ val libP2p = "1.1.0-RELEASE"
val logback = "1.4.14"
val redis4Cats = "1.5.2"
- val sttp = "3.9.2"
+ val sttp = "4.0.0-M11"
val typeSafeConfig = "1.4.3"
val typeSafeScalaLogging = "3.9.5"
}
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
index c2469fb..b28ae88 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/CommonsIoC.scala
@@ -25,10 +25,6 @@ import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.log4cats._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
-import sttp.capabilities.WebSockets
-import sttp.capabilities.fs2.Fs2Streams
-import sttp.client3.SttpBackend
-import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
object CommonsIoC {
@@ -36,9 +32,6 @@ object CommonsIoC {
val p2pBackend: P2pBackend[IO] = new P2pBackendImpl(p2pConfiguration = Configuration.configuration.p2p)
- val sttpBackendResource: Resource[IO, SttpBackend[IO, Fs2Streams[IO] & WebSockets]] =
- AsyncHttpClientFs2Backend.resource[IO]()
-
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val stringCodec: RedisCodec[String, String] = RedisCodec.Utf8
private val redisConfiguration: RedisConfiguration = Configuration.configuration.redis
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala
index 907a883..0919f1f 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala
@@ -18,7 +18,9 @@ import acab.devcon0.output.client.IpfsClientImpl
import acab.devcon0.output.client.IpfsClusterClientImpl
import acab.devcon0.output.repository.ipfscid._
import cats.effect.IO
-import sttp.client3.HttpURLConnectionBackend
+import cats.effect.Resource
+import sttp.client4.SyncBackend
+import sttp.client4.httpurlconnection.HttpURLConnectionBackend
object IpfsIoC {
@@ -81,13 +83,16 @@ object IpfsIoC {
object Output {
+ private val backend: SyncBackend = HttpURLConnectionBackend()
+ private val backendResource: Resource[IO, SyncBackend] = Resource.make(IO(backend))(res => IO(res.close()))
+
val ipfsClusterClient: IpfsClusterClient = new IpfsClusterClientImpl(
apiUrl = Configuration.configuration.ipfsCluster.apiUrl,
- sttpBackendResource = CommonsIoC.InputOutput.sttpBackendResource
+ backendResource = backendResource
)
val ipfsClient: IpfsClient = new IpfsClientImpl(
- sttpBackend = HttpURLConnectionBackend(),
+ backendResource = backendResource,
ipfsConfiguration = Configuration.configuration.ipfs
)
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
index 8900912..d6328fa 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala
@@ -31,7 +31,7 @@ class FederationMemberChangelogUpdateEventListener(
cats.effect.unsafe.IORuntime.global
private val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val redisChannel: RedisChannel[String] = Redis.Channels.federationMemberChangelogUpdate
- private val chunkSize: Int = 500
+ private val chunkSize: Int = 5000
def run(): Unit = {
redisListener.run(redisChannel, processMessage())
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
index 3f7e67e..060e7f3 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberIpfsCidDeltaUpdateListener.scala
@@ -25,7 +25,7 @@ class FederationMemberIpfsCidDeltaUpdateListener(
cats.effect.unsafe.IORuntime.global
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
private val channel: RedisChannel[String] = Redis.Channels.federationMemberIpfsCidsDeltaUpdate
- private val chunkSize = 500
+ private val chunkSize = 5000
def run(): Unit = {
redisListener.run(channel, processMessage())
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala
index 5204304..75650f5 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala
@@ -105,6 +105,8 @@ class FederationMemberSharingFolderUpdateListener(
}
private def logReceivedMessage(message: FederationMemberSharingFolderUpdateMessage): IO[Unit] = {
- logger.debug(s"Redis pub/sub. p2pPeerId=${message.federationMemberId} ipfsCid=${message.ipfsCid}}")
+ logger.info(
+ s"Message received & processed over Redis pub/sub. p2pPeerId=${message.federationMemberId} ipfsCid=${message.ipfsCid}}"
+ )
}
}
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala
index 76773ec..f570e7d 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClientImpl.scala
@@ -2,35 +2,28 @@ package acab.devcon0.output.client
import acab.devcon0.configuration.IpfsConfiguration
import acab.devcon0.domain.codecs.IpfsCodecs
-import acab.devcon0.domain.dtos._
+import acab.devcon0.domain.dtos.*
import acab.devcon0.domain.ports.output.client.IpfsClient
import acab.devcon0.trile.domain.dtos.aliases.IpfsCid
-import cats.effect.IO
+import acab.devcon0.trile.utils.EffectsUtils
+import cats.effect.{IO, Resource}
+import cats.implicits.catsSyntaxMonadError
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
-import sttp.client3._
+import sttp.client4.*
import sttp.model.Uri
-class IpfsClientImpl(sttpBackend: SttpBackend[Identity, Any], ipfsConfiguration: IpfsConfiguration) extends IpfsClient {
+class IpfsClientImpl(backendResource: Resource[IO, SyncBackend], ipfsConfiguration: IpfsConfiguration)
+ extends IpfsClient {
- private val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
- private val apiUrl: String = ipfsConfiguration.apiUrl
+ private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
+ private val apiUrl: String = ipfsConfiguration.apiUrl
override def ls(ipfsCID: IpfsCid): IO[IpfsLsResponse] = {
- ipfsLsRequest(ipfsCID)
- .flatTap(logResponse)
- .flatMap(response => IpfsCodecs.Decoders.LsResponse(response.body.getOrElse("")))
- .onError(throwable => logger.error(s"throwable=$throwable"))
- }
-
- private def ipfsLsRequest(ipfsCID: IpfsCid): IO[Identity[Response[Either[String, String]]]] = IO {
val uri: Uri = uri"$apiUrl/ls?arg=$ipfsCID"
- basicRequest
- .post(uri = uri)
- .send(sttpBackend)
- }
-
- private def logResponse(response: Response[Either[String, String]]): IO[Unit] = {
- logger.debug(s"code=${response.code} response=$response")
+ backendResource
+ .use(backend => { IO.blocking(basicRequest.post(uri = uri).send(backend)) })
+ .attemptTap(EffectsUtils.attemptTLog)
+ .flatMap(response => IpfsCodecs.Decoders.LsResponse(response.body.getOrElse("")))
}
}
diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala
index b5c7b68..0ec574f 100644
--- a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala
+++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala
@@ -1,91 +1,56 @@
package acab.devcon0.output.client
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
import acab.devcon0.domain.ports.output.client.IpfsClusterClient
import acab.devcon0.trile.domain.codecs.IpfsClusterCodecs
import acab.devcon0.trile.domain.dtos._
import acab.devcon0.trile.domain.dtos.aliases.IpfsCid
+import acab.devcon0.trile.utils.EffectsUtils
import cats.effect.IO
-import cats.effect.kernel.Resource
+import cats.effect.Resource
+import cats.implicits.catsSyntaxMonadError
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
-import sttp.capabilities.fs2.Fs2Streams
-import sttp.client3._
+import sttp.client4._
import sttp.model.Uri
class IpfsClusterClientImpl(
apiUrl: String,
- sttpBackendResource: Resource[IO, SttpBackend[IO, Fs2Streams[IO]]]
+ backendResource: Resource[IO, SyncBackend]
) extends IpfsClusterClient {
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
override def getPeers: IO[IpfsClusterPeers] = {
- ipfsClusterPeersRequest()
- .flatTap(logResponse(_, "getPeers"))
+ val uri: Uri = uri"$apiUrl/peers"
+ backendResource
+ .use(backend => { IO.blocking(basicRequest.get(uri = uri).send(backend)) })
+ .attemptTap(EffectsUtils.attemptTLog)
.flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeers(response.body.getOrElse("")))
- .onError(throwable => logger.error(s"throwable=$throwable"))
}
override def getPeer: IO[IpfsClusterPeer] = {
- ipfsClusterPeerRequest()
- .flatTap(logResponse(_, "getPeer"))
- .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeer(response.body.getOrElse("")))
- .onError(throwable => logger.error(s"throwable=$throwable"))
- }
-
- private def ipfsClusterPeersRequest(): IO[Response[Either[String, String]]] = {
- val uri: Uri = uri"$apiUrl/peers"
- sttpBackendResource
- .use { sttpBackend =>
- basicRequest
- .get(uri = uri)
- .send(sttpBackend)
- }
- }
-
- private def ipfsClusterPeerRequest(): IO[Response[Either[String, String]]] = {
val uri: Uri = uri"$apiUrl/id"
- sttpBackendResource
- .use { sttpBackend =>
- basicRequest
- .get(uri = uri)
- .send(sttpBackend)
- }
+ backendResource
+ .use(backend => { IO.blocking(basicRequest.get(uri = uri).send(backend)) })
+ .attemptTap(EffectsUtils.attemptTLog)
+ .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeer(response.body.getOrElse("")))
}
override def addPin(ipfsCid: IpfsCid): IO[Unit] = {
val uri: Uri = uri"$apiUrl/pins/$ipfsCid"
- sttpBackendResource
- .use { sttpBackend =>
- basicRequest
- .post(uri)
- .readTimeout(Duration.Inf)
- .send(sttpBackend)
- }
- .flatMap(logResponse(_, "add"))
- .onError(logError)
+ backendResource
+ .use(backend => { IO.blocking(basicRequest.post(uri = uri).readTimeout(1.minutes).send(backend)) })
+ .attemptTap(EffectsUtils.attemptTLog)
+ .void
}
override def removePin(ipfsCid: IpfsCid): IO[Unit] = {
val uri: Uri = uri"$apiUrl/pins/$ipfsCid"
- sttpBackendResource
- .use { sttpBackend =>
- basicRequest
- .delete(uri)
- .readTimeout(Duration.Inf)
- .send(sttpBackend)
- }
- .flatMap(logResponse(_, "remove"))
- .onError(logError)
- }
-
- private def logError(throwable: Throwable): IO[Unit] = {
- logger.error(s"throwable=$throwable")
- }
-
- private def logResponse(response: Response[Either[String, String]], label: String): IO[Unit] = {
- logger.debug(s"label=$label code=${response.code} response=${response.toString.substring(0, 64)}")
+ backendResource
+ .use(backend => { IO.blocking(basicRequest.delete(uri = uri).readTimeout(1.minutes).send(backend)) })
+ .attemptTap(EffectsUtils.attemptTLog)
+ .void
}
}
--
GitLab