diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala index e8cf665a0291963a2c5bdac0f58a11804216d875..ebfcd7dc538e10a08dbf1f4c311b7356f3408a18 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala @@ -45,7 +45,8 @@ object FederationIoC { val ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler = { new IpfsCidDeltaUpdateCommandHandlerImpl( ipfsCidFacadeService = IpfsIoC.Domain.Service.facadeService, - ipfsCidCopiesService = IpfsIoC.Domain.Service.copiesService, + ipfsCidHardCopiesService = IpfsIoC.Domain.Service.hardCopiesService, + ipfsCidSoftCopiesService = IpfsIoC.Domain.Service.softCopiesService, ipfsClusterService = IpfsIoC.Domain.Service.ipfsClusterService ) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala index 6cb37e874ff9df34426a031fd9457b7c2fd701f1..f48aa8aecb5ba9068b256f27a626c978694c9201 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/IpfsIoC.scala @@ -6,8 +6,9 @@ import acab.devcon0.domain.adapters.ipfscid._ import acab.devcon0.domain.ports.input.ipfscid._ import acab.devcon0.domain.ports.output.client.IpfsClient import acab.devcon0.domain.ports.output.client.IpfsClusterClient -import acab.devcon0.domain.ports.output.repository.ipfscid.CopiesRepository +import acab.devcon0.domain.ports.output.repository.ipfscid.HardCopiesRepository import acab.devcon0.domain.ports.output.repository.ipfscid.SearchNameRepository +import acab.devcon0.domain.ports.output.repository.ipfscid.SoftCopiesRepository import acab.devcon0.domain.service.IpfsClusterService import acab.devcon0.domain.service.IpfsClusterServiceImpl import acab.devcon0.domain.service.IpfsService @@ -45,13 +46,15 @@ object IpfsIoC { val metaDataQueryHandler: MetadataQueryHandler = MetadataQueryHandlerImpl( facadeService = Service.facadeService, ipfsService = Domain.Service.ipfsService, - copiesService = Service.copiesService + hardCopiesService = Service.hardCopiesService, + softCopiesService = Service.softCopiesService ) val searchQueryHandler: SearchQueryHandler = SearchQueryHandlerImpl( facadeService = Service.facadeService, searchNameService = Service.searchNameService, - copiesService = Service.copiesService + hardCopiesService = Service.hardCopiesService, + softCopiesService = Service.softCopiesService ) } @@ -62,7 +65,7 @@ object IpfsIoC { flatRepository = Output.flatRepository ) - val ipfsClusterService: IpfsClusterService = new IpfsClusterServiceImpl( + val ipfsClusterService: IpfsClusterService[IO] = new IpfsClusterServiceImpl( client = Output.ipfsClusterClient ) @@ -70,8 +73,12 @@ object IpfsIoC { repository = Output.searchNameRepository ) - val copiesService: CopiesService[IO] = new CopiesServiceImpl( - repository = Output.copiesRepository + val hardCopiesService: HardCopiesService[IO] = new HardCopiesServiceImpl( + repository = Output.hardCopiesRepository + ) + + val softCopiesService: SoftCopiesService[IO] = new SoftCopiesServiceImpl( + repository = Output.softCopiesRepository ) val facadeService: FacadeService[IO] = new FacadeServiceImpl( @@ -87,7 +94,7 @@ object IpfsIoC { private val backend: SyncBackend = HttpURLConnectionBackend() private val backendResource: Resource[IO, SyncBackend] = Resource.make(IO(backend))(res => IO(res.close())) - val ipfsClusterClient: IpfsClusterClient = new IpfsClusterClientImpl( + val ipfsClusterClient: IpfsClusterClient[IO] = new IpfsClusterClientImpl( apiUrl = Configuration.configuration.ipfsCluster.apiUrl, backendResource = backendResource ) @@ -97,7 +104,11 @@ object IpfsIoC { ipfsConfiguration = Configuration.configuration.ipfs ) - val copiesRepository: CopiesRepository[IO] = new CopiesRepositoryImpl( + val hardCopiesRepository: HardCopiesRepository[IO] = new HardCopiesRepositoryImpl( + commandsApi = CommonsIoC.InputOutput.redisCommandsFactory() + ) + + val softCopiesRepository: SoftCopiesRepository[IO] = new SoftCopiesRepositoryImpl( commandsApi = CommonsIoC.InputOutput.redisCommandsFactory() ) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/HeartbeatCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/HeartbeatCommandHandlerImpl.scala index b90df0f7dd687a228bc598deaf5bfd0d596dc79a..2d1b958c1311f9542afea2df9f33dd2147d3e990 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/HeartbeatCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/HeartbeatCommandHandlerImpl.scala @@ -9,7 +9,7 @@ import cats.effect.IO import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -class HeartbeatCommandHandlerImpl(p2PService: P2pService[IO], ipfsClusterService: IpfsClusterService) +class HeartbeatCommandHandlerImpl(p2PService: P2pService[IO], ipfsClusterService: IpfsClusterService[IO]) extends HeartbeatCommandHandler { implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala index f55d9a2ebcd3791baf1b9f124c3d6fc245d03519..b6c77375024d495072b2ac0010cc54d18bb911e2 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federation/IpfsCidDeltaUpdateCommandHandlerImpl.scala @@ -1,18 +1,23 @@ package acab.devcon0.domain.adapters.federation -import acab.devcon0.domain.ports.input._ +import scala.concurrent.duration.DurationInt + import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommand import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateCommandHandler import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateEvent import acab.devcon0.domain.ports.input.federation.IpfsCidDeltaUpdateSuccessEvent import acab.devcon0.domain.service._ -import acab.devcon0.domain.service.ipfscid.CopiesService import acab.devcon0.domain.service.ipfscid.FacadeService +import acab.devcon0.domain.service.ipfscid.HardCopiesService +import acab.devcon0.domain.service.ipfscid.SoftCopiesService import cats.effect.IO +import cats.effect.std.Supervisor +import cats.implicits.toTraverseOps class IpfsCidDeltaUpdateCommandHandlerImpl( ipfsCidFacadeService: FacadeService[IO], - ipfsCidCopiesService: CopiesService[IO], - ipfsClusterService: IpfsClusterService + ipfsCidHardCopiesService: HardCopiesService[IO], + ipfsCidSoftCopiesService: SoftCopiesService[IO], + ipfsClusterService: IpfsClusterService[IO] ) extends IpfsCidDeltaUpdateCommandHandler { override def handle(cmd: IpfsCidDeltaUpdateCommand): IO[IpfsCidDeltaUpdateEvent[?]] = { @@ -24,16 +29,29 @@ class IpfsCidDeltaUpdateCommandHandlerImpl( private def handleAdditions(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = { for - _ <- ipfsCidCopiesService.increment(cmd.delta.additions) + _ <- ipfsCidHardCopiesService.increment(cmd.delta.additions) _ <- ipfsClusterService.addPins(cmd.delta.additions) + _ <- handleAdditionsSoftCopies(cmd) yield () } private def handleRemovals(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = { for - _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals) - _ <- ipfsCidCopiesService.decrement(cmd.delta.removals) - _ <- ipfsClusterService.removePins(cmd.delta.removals) + _ <- ipfsCidFacadeService.deleteFlats(cmd.delta.removals) + _ <- ipfsCidHardCopiesService.decrement(cmd.delta.removals) + _ <- ipfsClusterService.removePins(cmd.delta.removals) + pinsAllocations <- cmd.delta.removals.toSeq.traverse(ipfsClusterService.getPinAllocations) + _ <- pinsAllocations.traverse(pin => ipfsCidSoftCopiesService.delete(pin.cid)) yield () } + + private def handleAdditionsSoftCopies(cmd: IpfsCidDeltaUpdateCommand): IO[Unit] = { + Supervisor[IO](await = false).use(supervisor => { + for + _ <- IO.sleep(10.seconds) + pinsAllocations <- cmd.delta.additions.toSeq.traverse(ipfsClusterService.getPinAllocations) + _ <- pinsAllocations.traverse(pin => ipfsCidSoftCopiesService.set(pin.cid, pin.allocations.size)) + yield () + }) + } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/CheckInAckCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/CheckInAckCommandHandlerImpl.scala index 91bd4f97009baa9ebf1d5b6556e7c1b20629ab9b..f1ab89624503e330dcdc5e7b27f3d4705357fd00 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/CheckInAckCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/CheckInAckCommandHandlerImpl.scala @@ -12,7 +12,7 @@ import cats.effect.IO import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -class CheckInAckCommandHandlerImpl(p2PService: P2pService[IO], ipfsClusterService: IpfsClusterService) +class CheckInAckCommandHandlerImpl(p2PService: P2pService[IO], ipfsClusterService: IpfsClusterService[IO]) extends CheckInAckCommandHandler { implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala index f0ada6ee1ff9b0903fb13f9fe3bcf70a1dd4864b..268af22698c3bf5cee52a4e0f39272ce0161d569 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala @@ -17,7 +17,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class UpdateInformationCommandHandlerImpl( - ipfsClusterService: IpfsClusterService, + ipfsClusterService: IpfsClusterService[IO], informationService: InformationService[IO] ) extends UpdateInformationCommandHandler { diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/MetadataQueryHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/MetadataQueryHandlerImpl.scala index 5e400f265d87468f416e88d5b2c88e9e5b63e6eb..db1003bbf665d27c03737c664cc84a8750725671 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/MetadataQueryHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/MetadataQueryHandlerImpl.scala @@ -6,37 +6,40 @@ import acab.devcon0.domain.dtos.IpfsCidMetadataDto import acab.devcon0.domain.mappers.IpfsCidMapper import acab.devcon0.domain.ports.input.ipfscid._ import acab.devcon0.domain.service.IpfsService -import acab.devcon0.domain.service.ipfscid.CopiesService import acab.devcon0.domain.service.ipfscid.FacadeService +import acab.devcon0.domain.service.ipfscid.HardCopiesService +import acab.devcon0.domain.service.ipfscid.SoftCopiesService import cats.effect.IO +import cats.implicits.catsSyntaxTuple2Parallel import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger class MetadataQueryHandlerImpl( ipfsService: IpfsService, facadeService: FacadeService[IO], - copiesService: CopiesService[IO] + hardCopiesService: HardCopiesService[IO], + softCopiesService: SoftCopiesService[IO] ) extends MetadataQueryHandler { private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] override def handle(query: MetadataQuery): IO[Option[IpfsCidMetadataDto]] = { for - maybeCacheHit <- facadeService.getFlat(query.ipfsCid) - hardCopies <- copiesService.get(query.ipfsCid) + maybeCacheHit <- facadeService.getFlat(query.ipfsCid) + (hardCopies, softCopies) <- (hardCopiesService.get(query.ipfsCid), softCopiesService.get(query.ipfsCid)).parTupled result <- maybeCacheHit match - case Some(value) => IO(Some(buildIpfsCidMetadataDto(value, hardCopies))) + case Some(value) => IO(Some(buildIpfsCidMetadataDto(value, hardCopies, softCopies))) case None => ipfsService .lsFileTree(query.ipfsCid) .map(IpfsCidMapper.Flat.to) .flatTap(facadeService.saveFlat) - .map(dto => Some(buildIpfsCidMetadataDto(dto, hardCopies))) + .map(dto => Some(buildIpfsCidMetadataDto(dto, hardCopies, softCopies))) .onError(logError) yield result } - private def buildIpfsCidMetadataDto(dto: IpfsCidDto, hardCopies: Int): IpfsCidMetadataDto = { + private def buildIpfsCidMetadataDto(dto: IpfsCidDto, hardCopies: Int, softCopies: Int): IpfsCidMetadataDto = { IpfsCidMetadataDto( cid = dto.cid, size = dto.size, @@ -45,7 +48,7 @@ class MetadataQueryHandlerImpl( contents = dto.contents, timestamp = dto.timestamp, `type` = dto.`type`, - copies = IpfsCidMetadataCopiesDto(hard = hardCopies, soft = 0), + copies = IpfsCidMetadataCopiesDto(hard = hardCopies, soft = softCopies), names = List() ) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/SearchQueryHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/SearchQueryHandlerImpl.scala index 630d7237836196bd4e627d60dc6a70eebee24857..141a3c323aedc9f279c315e05829c4e2414fc53e 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/SearchQueryHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/ipfscid/SearchQueryHandlerImpl.scala @@ -2,21 +2,20 @@ package acab.devcon0.domain.adapters.ipfscid import acab.devcon0.domain.dtos._ import acab.devcon0.domain.ports.input.ipfscid._ -import acab.devcon0.domain.service.ipfscid.CopiesService import acab.devcon0.domain.service.ipfscid.FacadeService +import acab.devcon0.domain.service.ipfscid.HardCopiesService import acab.devcon0.domain.service.ipfscid.SearchNameService +import acab.devcon0.domain.service.ipfscid.SoftCopiesService import cats.effect.IO import cats.implicits._ -import org.typelevel.log4cats.slf4j.Slf4jLogger class SearchQueryHandlerImpl( facadeService: FacadeService[IO], searchNameService: SearchNameService[IO], - copiesService: CopiesService[IO] + hardCopiesService: HardCopiesService[IO], + softCopiesService: SoftCopiesService[IO] ) extends SearchQueryHandler { - Slf4jLogger.getLogger[IO] - override def handle(query: SearchQuery): IO[IpfsCidSearchResponse] = { for searchResultIpfsCids <- searchNameService.search(query.searchParameters) @@ -26,7 +25,7 @@ class SearchQueryHandlerImpl( } private def buildIpfsCidMetadataDto(dto: IpfsCidDto): IO[IpfsCidMetadataDto] = { - for hardCopies <- copiesService.get(dto.cid) + for (hardCopies, softCopies) <- (hardCopiesService.get(dto.cid), softCopiesService.get(dto.cid)).parTupled yield IpfsCidMetadataDto( cid = dto.cid, size = dto.size, @@ -35,7 +34,7 @@ class SearchQueryHandlerImpl( contents = dto.contents, timestamp = dto.timestamp, `type` = dto.`type`, - copies = IpfsCidMetadataCopiesDto(hardCopies, 0), + copies = IpfsCidMetadataCopiesDto(hardCopies, softCopies), names = List() ) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/client/IpfsClusterClient.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/client/IpfsClusterClient.scala index 00b9a62ba5acdccb61c2c4d033bacdd9192a2220..521ce7ee8602f89451898b1563674742388d48aa 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/client/IpfsClusterClient.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/client/IpfsClusterClient.scala @@ -2,11 +2,11 @@ package acab.devcon0.domain.ports.output.client import acab.devcon0.trile.domain.dtos._ import acab.devcon0.trile.domain.dtos.aliases.IpfsCid -import cats.effect.IO -trait IpfsClusterClient { - def getPeers: IO[IpfsClusterPeers] - def getPeer: IO[IpfsClusterPeer] - def removePin(pinCID: IpfsCid): IO[Unit] - def addPin(cid: IpfsCid): IO[Unit] +trait IpfsClusterClient[F[_]] { + def getPeers: F[IpfsClusterPeers] + def getPeer: F[IpfsClusterPeer] + def getPinAllocations(ipfsCid: IpfsCid): F[IpfsClusterPinAllocation] + def removePin(pinCID: IpfsCid): F[Unit] + def addPin(cid: IpfsCid): F[Unit] } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/CopiesRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/HardCopiesRepository.scala similarity index 87% rename from federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/CopiesRepository.scala rename to federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/HardCopiesRepository.scala index 90b435c27130ccec3ea0747737a95b67b4d57ce0..7c14c494dcf629b4191266253ff79e65e0ac3e2e 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/CopiesRepository.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/HardCopiesRepository.scala @@ -1,7 +1,7 @@ package acab.devcon0.domain.ports.output.repository.ipfscid import acab.devcon0.trile.domain.dtos.aliases.IpfsCid -trait CopiesRepository[F[_]] { +trait HardCopiesRepository[F[_]] { def get(ipfsCid: IpfsCid): F[Int] def increment(ipfsCids: Set[IpfsCid]): F[Unit] def decrement(ipfsCids: Set[IpfsCid]): F[Unit] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/SoftCopiesRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/SoftCopiesRepository.scala new file mode 100644 index 0000000000000000000000000000000000000000..7eed9dc94b93f7360f5e2a4840de51956bcc9525 --- /dev/null +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/ipfscid/SoftCopiesRepository.scala @@ -0,0 +1,11 @@ +package acab.devcon0.domain.ports.output.repository.ipfscid + +import acab.devcon0.trile.domain.dtos.aliases.IpfsCid + +trait SoftCopiesRepository[F[_]] { + def get(ipfsCid: IpfsCid): F[Int] + + def set(ipfsCid: IpfsCid, count: Int): F[Unit] + + def delete(ipfsCid: IpfsCid): F[Unit] +} diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterService.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterService.scala index b0729ea82fba0ae39e1a230d646fcde5002fb2a8..efd8b5a356a22801b31ec51e8e85edb986788908 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterService.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterService.scala @@ -2,14 +2,15 @@ package acab.devcon0.domain.service import acab.devcon0.trile.domain.dtos.IpfsClusterPeer import acab.devcon0.trile.domain.dtos.IpfsClusterPeers +import acab.devcon0.trile.domain.dtos.IpfsClusterPinAllocation import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import acab.devcon0.trile.domain.dtos.aliases.P2pPeerId -import cats.effect._ -trait IpfsClusterService { - def getPeers: IO[IpfsClusterPeers] - def getPeer(p2pPeerId: P2pPeerId): IO[IpfsClusterPeer] - def getPeer: IO[IpfsClusterPeer] - def addPins(ipfsCids: Set[IpfsCid]): IO[Unit] - def removePins(ipfsCids: Set[IpfsCid]): IO[Unit] +trait IpfsClusterService[F[_]] { + def getPeers: F[IpfsClusterPeers] + def getPeer(p2pPeerId: P2pPeerId): F[IpfsClusterPeer] + def getPeer: F[IpfsClusterPeer] + def addPins(ipfsCids: Set[IpfsCid]): F[Unit] + def getPinAllocations(ipfsCid: IpfsCid): F[IpfsClusterPinAllocation] + def removePins(ipfsCids: Set[IpfsCid]): F[Unit] } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala index a251f66faf642199d9e74018fec8f3212f7cdf01..e5270437cba7cb27ac23e86494bc0df89ef45008 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala @@ -3,6 +3,7 @@ package acab.devcon0.domain.service import acab.devcon0.domain.ports.output.client.IpfsClusterClient import acab.devcon0.trile.domain.dtos.IpfsClusterPeer import acab.devcon0.trile.domain.dtos.IpfsClusterPeers +import acab.devcon0.trile.domain.dtos.IpfsClusterPinAllocation import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import acab.devcon0.trile.domain.dtos.aliases.IpfsClusterPeerId import acab.devcon0.trile.utils.EffectsUtils @@ -12,7 +13,7 @@ import cats.implicits.toTraverseOps import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -class IpfsClusterServiceImpl(client: IpfsClusterClient) extends IpfsClusterService { +class IpfsClusterServiceImpl(client: IpfsClusterClient[IO]) extends IpfsClusterService[IO] { implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] @@ -32,6 +33,10 @@ class IpfsClusterServiceImpl(client: IpfsClusterClient) extends IpfsClusterServi IORetry.fibonacci(getPeerIO) } + override def getPinAllocations(ipfsCid: IpfsCid): IO[IpfsClusterPinAllocation] = { + client.getPinAllocations(ipfsCid) + } + override def addPins(ipfsCids: Set[IpfsCid]): IO[Unit] = { ipfsCids.toSeq.traverse(addPin).void } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsServiceImpl.scala index 0e4c459682f4a1604bef496616f695e17d4fd72e..bc790cdacec2746c758c4d689af0171475dc130d 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsServiceImpl.scala @@ -83,7 +83,7 @@ class IpfsServiceImpl(ipfsClient: IpfsClient, flatRepository: FlatRepository) ex case Some(value) => IO(value) case None => ipfsClientLs(ipfsCid).flatMap(buildIpfsCidDto(_, None)) }) - case Right(link) => { + case Right(link) => flatRepository .get(link.hash) .flatMap(maybeCacheHit => { @@ -91,7 +91,6 @@ class IpfsServiceImpl(ipfsClient: IpfsClient, flatRepository: FlatRepository) ex case Some(value) => IO(value) case None => ipfsClientLs(link.hash).flatMap(buildIpfsCidDto(_, Some(link))) }) - } } private def getIpfsCidDtos(links: List[IpfsLsLinkResponse]): IO[List[IpfsCidDto]] = { diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesService.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesService.scala similarity index 88% rename from federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesService.scala rename to federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesService.scala index a4f9f387bdc660e2a2d255c230b1639bd4277819..9dbedaf6a1c7b149f39634a6f983a35614f34a83 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesService.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesService.scala @@ -1,7 +1,7 @@ package acab.devcon0.domain.service.ipfscid import acab.devcon0.trile.domain.dtos.aliases.IpfsCid -trait CopiesService[F[_]] { +trait HardCopiesService[F[_]] { def get(ipfsCid: IpfsCid): F[Int] def increment(ipfsCids: Set[IpfsCid]): F[Unit] def decrement(ipfsCids: Set[IpfsCid]): F[Unit] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesServiceImpl.scala similarity index 75% rename from federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesServiceImpl.scala rename to federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesServiceImpl.scala index 6d9522c48f1a0034b978afd94313ed60eec78e35..9529333c2f6324ed93b6ca811dd34dd30b7d1c2c 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/CopiesServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/HardCopiesServiceImpl.scala @@ -1,11 +1,11 @@ package acab.devcon0.domain.service.ipfscid -import acab.devcon0.domain.ports.output.repository.ipfscid.CopiesRepository +import acab.devcon0.domain.ports.output.repository.ipfscid.HardCopiesRepository import acab.devcon0.domain.service.ipfscid import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect.IO -class CopiesServiceImpl(repository: CopiesRepository[IO]) extends CopiesService[IO] { +class HardCopiesServiceImpl(repository: HardCopiesRepository[IO]) extends HardCopiesService[IO] { override def get(ipfsCid: IpfsCid): IO[Int] = { repository.get(ipfsCid) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesService.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesService.scala new file mode 100644 index 0000000000000000000000000000000000000000..42fe1629f7239beea4abfc94f85eb886f75a9591 --- /dev/null +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesService.scala @@ -0,0 +1,9 @@ +package acab.devcon0.domain.service.ipfscid + +import acab.devcon0.trile.domain.dtos.aliases.IpfsCid + +trait SoftCopiesService[F[_]] { + def get(ipfsCid: IpfsCid): F[Int] + def set(ipfsCid: IpfsCid, count: Int): F[Unit] + def delete(ipfsCid: IpfsCid): F[Unit] +} diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesServiceImpl.scala new file mode 100644 index 0000000000000000000000000000000000000000..d0f7ad6783cc43ca4ff955a25316138d8122f93b --- /dev/null +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/ipfscid/SoftCopiesServiceImpl.scala @@ -0,0 +1,20 @@ +package acab.devcon0.domain.service.ipfscid +import acab.devcon0.domain.ports.output.repository.ipfscid.SoftCopiesRepository +import acab.devcon0.domain.service.ipfscid +import acab.devcon0.trile.domain.dtos.aliases.IpfsCid +import cats.effect.IO + +class SoftCopiesServiceImpl(repository: SoftCopiesRepository[IO]) extends SoftCopiesService[IO] { + + override def get(ipfsCid: IpfsCid): IO[Int] = { + repository.get(ipfsCid) + } + + override def set(ipfsCid: IpfsCid, count: Int): IO[Unit] = { + repository.set(ipfsCid, count) + } + + override def delete(ipfsCid: IpfsCid): IO[Unit] = { + repository.delete(ipfsCid) + } +} diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala index dfcfd9911713822bce0d1292ed639a0fea432a47..4a0fc3f74f04eed194ae16f24cf5174b5a23f617 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala @@ -65,13 +65,12 @@ class FederationMemberSharingFolderUpdateListener( isUpdateChangelogProcessed: Boolean ): IO[Unit] = { (isUpdateChangelogProcessed, federationMemberUpdateChangelogEvent) match { - case (true, event: SharingFolderSuccessEvent) => { + case (true, event: SharingFolderSuccessEvent) => val changelogItem = event.federationMemberChangelogItem val message = FederationMemberChangelogUpdateMessage(federationMemberId, changelogItem) redisPublisher .publish(message) .onError(logError) - } case (_, _) => syncCommandHandler.handle(SyncAckCommand(federationMemberId)).flattenEvents >> logger.info(s"Not publishing isUpdateChangelogProcessed=$isUpdateChangelogProcessed") >> diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala index 0ec574f544cf8b106c47325e2b3fc96bcf607ffe..0279dd9963362229bbe14582d7edd1645dc68ef6 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/client/IpfsClusterClientImpl.scala @@ -18,9 +18,9 @@ import sttp.model.Uri class IpfsClusterClientImpl( apiUrl: String, backendResource: Resource[IO, SyncBackend] -) extends IpfsClusterClient { +) extends IpfsClusterClient[IO] { - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] override def getPeers: IO[IpfsClusterPeers] = { val uri: Uri = uri"$apiUrl/peers" @@ -38,6 +38,14 @@ class IpfsClusterClientImpl( .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPeer(response.body.getOrElse(""))) } + override def getPinAllocations(ipfsCid: IpfsCid): IO[IpfsClusterPinAllocation] = { + val uri: Uri = uri"$apiUrl/allocations/$ipfsCid" + backendResource + .use(backend => { IO.blocking(basicRequest.get(uri = uri).readTimeout(1.minutes).send(backend)) }) + .attemptTap(EffectsUtils.attemptTLog) + .flatMap(response => IpfsClusterCodecs.Decoders.IpfsClusterPinAllocation(response.body.getOrElse(""))) + } + override def addPin(ipfsCid: IpfsCid): IO[Unit] = { val uri: Uri = uri"$apiUrl/pins/$ipfsCid" backendResource diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/CopiesRepositoryImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/HardCopiesRepository.scala similarity index 86% rename from federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/CopiesRepositoryImpl.scala rename to federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/HardCopiesRepository.scala index 1ad59387922e460d928beb81f8032d2790669571..6a8ecf3c17cb3f5803b1f46647399c5e278536d2 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/CopiesRepositoryImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/HardCopiesRepository.scala @@ -1,6 +1,6 @@ package acab.devcon0.output.repository.ipfscid -import acab.devcon0.domain.ports.output.repository.ipfscid.CopiesRepository +import acab.devcon0.domain.ports.output.repository.ipfscid.HardCopiesRepository import acab.devcon0.output.repository.redisutils import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect.IO @@ -14,7 +14,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger private object CopiesRepository { def key(ipfsCid: IpfsCid): String = { - String.join(":", redisutils.Redis.SetPrefixes.ipfsCidFlatCopies, ipfsCid) + String.join(":", redisutils.Redis.SetPrefixes.ipfsCidFlatHardCopies, ipfsCid) } object Operations { @@ -24,11 +24,11 @@ private object CopiesRepository { } def increment(ipfsCids: Set[IpfsCid]): RedisCommands[IO, String, String] => IO[Unit] = { redis => - ipfsCids.toList.traverse(ipfsCid => redis.incr(key(ipfsCid))).void + ipfsCids.toSeq.traverse(ipfsCid => redis.incr(key(ipfsCid))).void } def decrement(ipfsCids: Set[IpfsCid]): RedisCommands[IO, String, String] => IO[Unit] = { redis => - ipfsCids.toList.traverse { ipfsCid => + ipfsCids.toSeq.traverse { ipfsCid => val redisKey: String = key(ipfsCid) for currentVal <- redis.get(redisKey).map(_.getOrElse("0").toInt) @@ -39,8 +39,8 @@ private object CopiesRepository { } } -class CopiesRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, String, String]]) - extends CopiesRepository[IO] { +class HardCopiesRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, String, String]]) + extends HardCopiesRepository[IO] { implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SearchNameRepositoryImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SearchNameRepository.scala similarity index 100% rename from federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SearchNameRepositoryImpl.scala rename to federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SearchNameRepository.scala diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SoftCopiesRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SoftCopiesRepository.scala new file mode 100644 index 0000000000000000000000000000000000000000..58fc55c691b30ba98067546a05b9b122b4e98e8f --- /dev/null +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/ipfscid/SoftCopiesRepository.scala @@ -0,0 +1,80 @@ +package acab.devcon0.output.repository.ipfscid + +import acab.devcon0.domain.ports.output.repository.ipfscid.SoftCopiesRepository +import acab.devcon0.output.repository.redisutils +import acab.devcon0.trile.domain.dtos.aliases.IpfsCid +import acab.devcon0.trile.utils.EffectsUtils +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.syntax.all._ +import dev.profunktor.redis4cats._ +import dev.profunktor.redis4cats.tx.TxStore +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +private object SoftCopiesRepository { + + def key(ipfsCid: IpfsCid): String = { + String.join(":", redisutils.Redis.SetPrefixes.ipfsCidFlatSoftCopies, ipfsCid) + } + + object Operations { + + def get(ipfsCid: IpfsCid): RedisCommands[IO, String, String] => IO[Int] = { redis => + redis.get(key(ipfsCid)).map(_.getOrElse("0").toInt) + } + + def set(ipfsCid: IpfsCid, count: Int): RedisCommands[IO, String, String] => IO[Unit] = { redis => + redis.set(key(ipfsCid), count.toString).void + } + + def delete(ipfsCid: IpfsCid): RedisCommands[IO, String, String] => IO[Unit] = { redis => + redis.del(key(ipfsCid)).void + } + } +} + +class SoftCopiesRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, String, String]]) + extends SoftCopiesRepository[IO] { + + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + override def get(ipfsCid: IpfsCid): IO[Int] = { + commandsApi + .use(SoftCopiesRepository.Operations.get(ipfsCid)(_)) + } + + override def set(ipfsCid: IpfsCid, count: Int): IO[Unit] = { + commandsApi + .use(setInner(ipfsCid, count, _)) + } + + override def delete(ipfsCid: IpfsCid): IO[Unit] = { + commandsApi + .use(deleteInner(ipfsCid, _)) + } + + private def setInner(ipfsCid: IpfsCid, count: Int, redis: RedisCommands[IO, String, String]): IO[Unit] = { + val operations: TxStore[IO, String, String] => List[IO[Unit]] = { _ => + List( + SoftCopiesRepository.Operations.set(ipfsCid, count)(redis) + ) + } + redis + .transact(operations) + .void + .attemptTap(EffectsUtils.attemptTLog) + } + + private def deleteInner(ipfsCid: IpfsCid, redis: RedisCommands[IO, String, String]): IO[Unit] = { + val operations: TxStore[IO, String, String] => List[IO[Unit]] = { _ => + List( + SoftCopiesRepository.Operations.delete(ipfsCid)(redis) + ) + } + redis + .transact(operations) + .void + .attemptTap(EffectsUtils.attemptTLog) + } +} diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala index d6edaba542c132e9a63f0c79c54588db81d62f52..2b977d050c6c89c97f03dc346ff6af434f60e685 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala @@ -4,11 +4,12 @@ import dev.profunktor.redis4cats.data.RedisChannel object Redis { object SetPrefixes { - val ipfsCidFlat: String = "IPFS:CID:FLAT" - val ipfsCidFlatIndex: String = "IPFS:CID:FLAT/INDEX" - val ipfsCidFlatCopies: String = "IPFS:CID:FLAT/COPIES" - val ipfsCidNodeTree: String = "IPFS:CID:NODE_TREE" - val ipfsCidFileTree: String = "IPFS:CID:FILE_TREE" + val ipfsCidFlat: String = "IPFS:CID:FLAT" + val ipfsCidFlatIndex: String = "IPFS:CID:FLAT/INDEX" + val ipfsCidFlatHardCopies: String = "IPFS:CID:FLAT/HARD_COPIES" + val ipfsCidFlatSoftCopies: String = "IPFS:CID:FLAT/SOFT_COPIES" + val ipfsCidNodeTree: String = "IPFS:CID:NODE_TREE" + val ipfsCidFileTree: String = "IPFS:CID:FILE_TREE" val federationMemberInformation: String = "FEDERATION:MEMBER:INFORMATION" val federationMemberInformationIndex: String = "FEDERATION:MEMBER:INFORMATION/INDEX"