diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala index 9795aac310d31297dedcba36292001ceeb5a15cf..e8cf665a0291963a2c5bdac0f58a11804216d875 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationIoC.scala @@ -52,7 +52,7 @@ object FederationIoC { val ipfsCidDeltaQueryHandler: IpfsCidDeltaQueryHandler = new IpfsCidDeltaQueryHandlerImpl( federationMemberCidsService = FederationMemberIoC.Domain.Service.cidsService, - federationMemberChangelogService = FederationMemberIoC.Domain.Service.changelogService + federationMemberInformationService = FederationMemberIoC.Domain.Service.informationService ) } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationMemberIoC.scala b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationMemberIoC.scala index 50edc91f81ff66a2fee91170636b29adbc321a2e..52fa95c1a7da86f714e55c2cdb26789c668c5be5 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationMemberIoC.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/boot/ioc/FederationMemberIoC.scala @@ -60,7 +60,6 @@ object FederationMemberIoC { new redispubsub.FederationMemberSharingFolderUpdateListener( redisListener = CommonsIoC.Input.redisListener, sharingFolderUpdateCommandHandler = Domain.Port.updateChangelogCommandHandler, - updateInformationRefreshCommandHandler = Domain.Port.updateInformationCommandHandler, informationQueryHandler = Domain.Port.informationQueryHandler, redisPublisher = CommonsIoC.Output.redisPubSubPublisher, syncCommandHandler = Domain.Port.syncCommandHandler @@ -71,7 +70,9 @@ object FederationMemberIoC { new FederationMemberChangelogUpdateEventListener( redisListener = CommonsIoC.Input.redisListener, ipfsCidUpdateCommandHandler = Domain.Port.ipfsCidUpdateCommandHandler, - redisPublisher = CommonsIoC.Output.redisPubSubPublisher + updateInformationRefreshCommandHandler = Domain.Port.updateInformationCommandHandler, + redisPublisher = CommonsIoC.Output.redisPubSubPublisher, + syncCommandHandler = Domain.Port.syncCommandHandler ) } } @@ -81,7 +82,8 @@ object FederationMemberIoC { object Port { val syncCommandHandler: SyncCommandHandler = new SyncCommandHandlerImpl( - p2PService = CommonsIoC.Domain.Service.p2pService + p2PService = CommonsIoC.Domain.Service.p2pService, + informationService = Service.informationService ) val checkInAckCommandHandler: CheckInAckCommandHandler = new CheckInAckCommandHandlerImpl( diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/IpfsCidDeltaUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/IpfsCidDeltaUpdateCommandHandlerImpl.scala index 6c8397b8355aff43bcac460bba6f13f62cbe2a36..7c5268c3d825341c77da293b188c5a31e7bbca80 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/IpfsCidDeltaUpdateCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/IpfsCidDeltaUpdateCommandHandlerImpl.scala @@ -60,7 +60,6 @@ class IpfsCidDeltaUpdateCommandHandlerImpl( ): IO[Option[IpfsCid]] = { changelogService .getPrevious(federationMemberId, federationMemberChangelogItem) - .map(_.lastOption) .map(_.map(_.ipfsCid)) } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SyncCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SyncCommandHandlerImpl.scala index 0cc1b452eeec1bd490c7d1c1c6d9382c68e94d48..dbccabf286fe0a97ca332e7fd494131d3f4e7a78 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SyncCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SyncCommandHandlerImpl.scala @@ -1,30 +1,58 @@ package acab.devcon0.domain.adapters.federationmember +import acab.devcon0.domain.dtos.FederationMemberInformation import acab.devcon0.domain.ports.input._ import acab.devcon0.domain.ports.input.federationmember._ import acab.devcon0.domain.service.P2pService +import acab.devcon0.domain.service.federationmember.InformationService +import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId import acab.devcon0.trile.domain.dtos.pubsub.P2p.FederationMemberSyncAck import acab.devcon0.trile.domain.dtos.pubsub.P2p.FederationMemberSyncNack +import acab.devcon0.trile.utils.EffectsUtils +import acab.devcon0.trile.utils.IORetry import cats.effect.IO +import cats.implicits.catsSyntaxMonadError import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -class SyncCommandHandlerImpl(p2PService: P2pService[IO]) extends SyncCommandHandler { +class SyncCommandHandlerImpl(p2PService: P2pService[IO], informationService: InformationService[IO]) + extends SyncCommandHandler { - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] override def handle(cmd: SyncResponseCommand): IO[SyncResponseEvent[?]] = { + IORetry + .fibonacci(for + federationMemberInformation <- getInformation(cmd) + message <- buildMessage(cmd, federationMemberInformation) + _ <- publishMessage(message) + yield SyncResponseSuccessEvent()) + .attemptTap(EffectsUtils.attemptTLog) + .onError(throwable => IO(SyncResponseErrorEvent(throwable = throwable))) + } + + private def publishMessage(message: Product): IO[Unit] = { + message match + case msg: FederationMemberSyncAck => p2PService.publish(msg) + case msg: FederationMemberSyncNack => p2PService.publish(msg) + } + + private def buildMessage(cmd: SyncResponseCommand, information: FederationMemberInformation): IO[Product] = { + IO(cmd match + case SyncAckCommand(federationMemberId) => FederationMemberSyncAck(information.p2pPeerId) + case SyncNackCommand(federationMemberId) => FederationMemberSyncNack(information.p2pPeerId) + ) + } + + private def getInformation(cmd: SyncResponseCommand): IO[FederationMemberInformation] = { cmd match - case SyncAckCommand(p2pPeerId) => - p2PService - .publish(FederationMemberSyncAck(p2pPeerId)) - .map(_ => SyncResponseSuccessEvent()) - .onError(throwable => IO(SyncResponseErrorEvent(throwable = throwable))) - - case SyncNackCommand(p2pPeerId) => - p2PService - .publish(FederationMemberSyncNack(p2pPeerId)) - .map(_ => SyncResponseSuccessEvent()) - .onError(throwable => IO(SyncResponseErrorEvent(throwable = throwable))) + case SyncAckCommand(federationMemberId) => getInformation(federationMemberId) + case SyncNackCommand(federationMemberId) => getInformation(federationMemberId) + } + + private def getInformation(federationMemberId: FederationMemberId): IO[FederationMemberInformation] = { + informationService + .getById(federationMemberId) + .flatMap(EffectsUtils.forceOptionExistence()) } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala index bf0cb566d5900e0737c0c020f6bdef7c2b88cc21..bc8001e77ae0fb963b1a4c7bf5f0d0c5bc302b6b 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/UpdateInformationCommandHandlerImpl.scala @@ -37,16 +37,16 @@ class UpdateInformationCommandHandlerImpl( } private def update(command: InformationRefreshCommand): IO[FederationMemberInformation] = { - refreshInner(command.ipfsPeerRootCid, command.id) + refreshInner(command.sharedFolderCid, command.id) } private def refreshInner( - ipfsPeerRootCid: IpfsCid, + sharedFolderCid: IpfsCid, federationMemberId: FederationMemberId ): IO[FederationMemberInformation] = { for federationMemberInformation <- getFederationMemberInformation(federationMemberId) - newInformation = federationMemberInformation.copy(ipfsPeerRootCid = ipfsPeerRootCid, lastSeen = now) + newInformation = federationMemberInformation.copy(sharedFolderCid = sharedFolderCid, lastSeen = now) _ <- informationService.save(newInformation) _ <- logResult(newInformation) yield newInformation @@ -88,7 +88,7 @@ class UpdateInformationCommandHandlerImpl( ipfsClusterPeerId = ipfsClusterPeer.id, ipfsClusterNodeName = ipfsClusterPeer.peername, ipfsPeerId = ipfsClusterPeer.ipfs.id, - ipfsPeerRootCid = message.ipfsPeerRootCid, + sharedFolderCid = message.sharedFolderCid, ipfsClusterAddresses = ipfsClusterPeer.addresses, ipfsIpfsAddresses = ipfsClusterPeer.ipfs.addresses, lastSeen = now diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/dtos/FederationMember.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/dtos/FederationMember.scala index a89db083fb8825701ae65338ee23bd6ef92f1662..87741646d629b06d127af4ee44111fdaa9af80de 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/dtos/FederationMember.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/dtos/FederationMember.scala @@ -2,7 +2,6 @@ package acab.devcon0.domain.dtos import java.time.Instant -import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus import acab.devcon0.trile.domain.dtos.aliases._ final case class FederationMemberInformation( @@ -12,13 +11,13 @@ final case class FederationMemberInformation( ipfsClusterPeerId: IpfsClusterPeerId, ipfsClusterNodeName: IpfsClusterNodeName, ipfsPeerId: IpfsPeerId, - ipfsPeerRootCid: IpfsCid, + sharedFolderCid: IpfsCid, ipfsClusterAddresses: List[String], ipfsIpfsAddresses: List[String], lastSeen: Instant ) -final case class FederationMemberChangelogItem(ipfsCid: IpfsCid, timestamp: Instant, status: IpfsCidPinStatus) +final case class FederationMemberChangelogItem(ipfsCid: IpfsCid, timestamp: Instant) final case class FederationMemberIpfsCidDelta( federationMemberId: FederationMemberId, diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/SyncCommand.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/SyncCommand.scala index 7cc26253b3779304bcd719029f95031519cf0a07..5a92e6026f08f6e515189e8c4dde183cebe7421a 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/SyncCommand.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/SyncCommand.scala @@ -3,14 +3,15 @@ package acab.devcon0.domain.ports.input.federationmember import acab.devcon0.commons.Command import acab.devcon0.commons.CommandHandler import acab.devcon0.commons.Event +import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId import acab.devcon0.trile.domain.dtos.aliases.P2pPeerId import cats.effect.IO import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -sealed trait SyncResponseCommand extends Command[String] -final case class SyncAckCommand(p2pPeerId: P2pPeerId) extends SyncResponseCommand -final case class SyncNackCommand(p2pPeerId: P2pPeerId) extends SyncResponseCommand +sealed trait SyncResponseCommand extends Command[String] +final case class SyncAckCommand(federationMemberId: FederationMemberId) extends SyncResponseCommand +final case class SyncNackCommand(federationMemberId: P2pPeerId) extends SyncResponseCommand sealed abstract class SyncResponseEvent[T] extends Event[T] final case class SyncResponseErrorEvent(throwable: Throwable) extends SyncResponseEvent[Unit] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/UpdateInformationCommand.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/UpdateInformationCommand.scala index 070dae933fbcd42341dcc123eefb40e026174a82..8f6f4d881b30cb120cba400af110b01db5777de7 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/UpdateInformationCommand.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/input/federationmember/UpdateInformationCommand.scala @@ -12,7 +12,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger sealed trait InformationCommand extends Command[String] -final case class InformationRefreshCommand(id: FederationMemberId, ipfsPeerRootCid: IpfsCid) extends InformationCommand +final case class InformationRefreshCommand(id: FederationMemberId, sharedFolderCid: IpfsCid) extends InformationCommand final case class CheckInInformationCommand(p2pCheckInMessage: FederationMemberCheckInMessage) extends InformationCommand sealed abstract class InformationRefreshEvent[T] extends Event[T] diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/federationmember/ChangelogRepository.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/federationmember/ChangelogRepository.scala index aa2fe1ef56cc3e445eab0e66975ed9a73627fc9a..e94803bc5bb35fd2e8070ff6d72ec2e4150afaf6 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/federationmember/ChangelogRepository.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/ports/output/repository/federationmember/ChangelogRepository.scala @@ -7,7 +7,6 @@ import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId trait ChangelogRepository[F[_]] { def getNewest(id: FederationMemberId): F[Option[FederationMemberChangelogItem]] - def getNewest: F[List[(FederationMemberId, FederationMemberChangelogItem)]] def getPrevious( id: FederationMemberId, timestamp: Instant diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala index 345dd437636e40d818af3b87069b70fe04487b20..410157599eef00b099677411cabf9ed681834255 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/IpfsClusterServiceImpl.scala @@ -33,10 +33,13 @@ class IpfsClusterServiceImpl(client: IpfsClusterClient) extends IpfsClusterServi } override def addPins(ipfsCids: List[IpfsCid]): IO[Unit] = { - ipfsCids.traverse(client.addPin).void + ipfsCids.traverse(addPin).void } override def removePins(ipfsCids: List[IpfsCid]): IO[Unit] = { ipfsCids.traverse(client.removePin).void } + private def addPin(ipfsCid: IpfsCid): IO[Unit] = { + IORetry.fibonacci(client.addPin(ipfsCid)) + } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogService.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogService.scala index 980eb0169b02c9226a6d537572c81c500095bab8..43baa532e71f541ffc00ff54d95fcd1112e6a26a 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogService.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogService.scala @@ -5,17 +5,12 @@ import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId trait ChangelogService[F[_]] { def getNewest(id: FederationMemberId): F[Option[FederationMemberChangelogItem]] - def getNewest: F[List[(FederationMemberId, FederationMemberChangelogItem)]] def getPrevious( id: FederationMemberId, federationMemberChangelogItem: FederationMemberChangelogItem - ): F[List[FederationMemberChangelogItem]] + ): F[Option[FederationMemberChangelogItem]] def add( id: FederationMemberId, federationMemberChangelogItem: FederationMemberChangelogItem ): F[FederationMemberChangelogItem] - def setToRemoved( - id: FederationMemberId, - federationMemberChangelogItem: FederationMemberChangelogItem - ): F[FederationMemberChangelogItem] } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogServiceImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogServiceImpl.scala index e7ad4316710c6a039cea7ccb581d26af31772f81..d43366653f54b4256081d8395fc7f854cdf4b01f 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogServiceImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/service/federationmember/ChangelogServiceImpl.scala @@ -1,7 +1,6 @@ package acab.devcon0.domain.service.federationmember import acab.devcon0.domain.dtos._ -import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus.REMOVED import acab.devcon0.domain.ports.output.repository.federationmember.ChangelogRepository import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId import cats.effect.IO @@ -12,19 +11,14 @@ class ChangelogServiceImpl(val repository: ChangelogRepository[IO]) extends Chan repository.getNewest(id) } - override def getNewest: IO[List[(FederationMemberId, FederationMemberChangelogItem)]] = { - repository.getNewest - } - override def getPrevious( id: FederationMemberId, federationMemberChangelogItem: FederationMemberChangelogItem - ): IO[List[FederationMemberChangelogItem]] = { + ): IO[Option[FederationMemberChangelogItem]] = { repository - .getPrevious(id, federationMemberChangelogItem.timestamp) - .map(list => { - list.filter(!_.ipfsCid.equals(federationMemberChangelogItem.ipfsCid)) - }) + .getPrevious(id, federationMemberChangelogItem.timestamp.minusMillis(1)) + .map(_.filter(!_.ipfsCid.equals(federationMemberChangelogItem.ipfsCid))) + .map(_.lastOption) } override def add( @@ -33,15 +27,4 @@ class ChangelogServiceImpl(val repository: ChangelogRepository[IO]) extends Chan ): IO[FederationMemberChangelogItem] = { repository.add(id, federationMemberChangelogItem) } - - override def setToRemoved( - id: FederationMemberId, - federationMemberChangelogItem: FederationMemberChangelogItem - ): IO[FederationMemberChangelogItem] = { - repository.update( - id, - federationMemberChangelogItem, - federationMemberChangelogItem.copy(status = REMOVED) - ) - } } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberCheckInListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberCheckInListener.scala index b22e9ec398a56e17e9e3519d9f7726b54d1d09d1..0722ffa4881cfe7e0e35a5668cf6b34011e900bb 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberCheckInListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberCheckInListener.scala @@ -57,7 +57,7 @@ class FederationMemberCheckInListener( ): IO[Unit] = { val ipfsPeerIpnsUpdateMessage = FederationMemberSharingFolderUpdateMessage( federationMemberInformation.id, - p2pCheckInMessage.ipfsPeerRootCid, + p2pCheckInMessage.sharedFolderCid, p2pCheckInMessage.timestamp ) val command: BounceSharingFolderUpdateCommand = BounceSharingFolderUpdateCommand(ipfsPeerIpnsUpdateMessage) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberHeartbeatListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberHeartbeatListener.scala index 8427b17bc17f90c81d15e6a7b26713772e6098e6..315f7972d73696ead18e240b3d7c7e5f543fc721 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberHeartbeatListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberHeartbeatListener.scala @@ -40,7 +40,7 @@ class FederationMemberHeartbeatListener( _ <- logger.debug(s"Hearbeat in message received over P2P pub/sub. from=${p2pResponse.p2pPeerId}}") heartbeat <- P2pCodecs.Decoders.MemberHeartbeat(p2pResponse.data) information <- informationQueryHandler.handle(InformationByP2pPeerIdQuery(p2pResponse.p2pPeerId)) - command = InformationRefreshCommand(id = information.id, heartbeat.ipfsPeerRootCid) + command = InformationRefreshCommand(id = information.id, heartbeat.sharedFolderCid) _ <- updateInformationCommandHandler.handle(command) yield ()).unsafeRunAndForget() } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSharingFolderUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSharingFolderUpdateListener.scala index 459dfcb72880dbc41f0bf015b9d97a8ad348a251..9020ce17b42dfa5c80622493747c3502bcbe5a3f 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSharingFolderUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSharingFolderUpdateListener.scala @@ -46,7 +46,7 @@ class FederationMemberSharingFolderUpdateListener( pubSubMessage <- P2pCodecs.Decoders.SharingFolderUpdateMessage(jsonAsString) ipfsPeerIpnsUpdateMessage = pubsub.Redis.FederationMemberSharingFolderUpdateMessage( federationMemberInformation.id, - pubSubMessage.ipfsPeerRootCid, + pubSubMessage.sharedFolderCid, pubSubMessage.timestamp ) command = BounceSharingFolderUpdateCommand(ipfsPeerIpnsUpdateMessage) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSyncListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSyncListener.scala index 26ca2c8237cb31f54dc9b202a63ee067b68fbe30..0316db99f77a9ddbd1ba512b03e085c4a3548afc 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSyncListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/p2ppubsub/FederationMemberSyncListener.scala @@ -59,7 +59,7 @@ class FederationMemberSyncListener( ): IO[Unit] = { val ipfsPeerIpnsUpdateMessage = pubsub.Redis.FederationMemberSharingFolderUpdateMessage( federationMemberInformation.id, - syncMessage.ipfsPeerRootCid, + syncMessage.sharedFolderCid, syncMessage.timestamp ) val command = BounceSharingFolderUpdateCommand(ipfsPeerIpnsUpdateMessage) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala index cf2749180a6db1376a9a7012e03da4781643c119..8900912c9933f5574c655931ea720228c66498a4 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberChangelogUpdateEventListener.scala @@ -5,9 +5,12 @@ import acab.devcon0.domain.codecs.FederationMemberCodecs.Decoders.ChangelogUpdat import acab.devcon0.domain.dtos.FederationMemberIpfsCidDelta import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage +import acab.devcon0.domain.ports.input.federationmember.SyncCommandImplicits.SyncAckCommandEventFlattenOps import acab.devcon0.domain.ports.input.federationmember._ import acab.devcon0.domain.ports.output.publisher.RedisPublisher import acab.devcon0.output.repository.redisutils.Redis +import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId +import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect._ import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime @@ -20,11 +23,13 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger class FederationMemberChangelogUpdateEventListener( redisListener: Listener, ipfsCidUpdateCommandHandler: IpfsCidDeltaUpdateCommandHandler, - redisPublisher: RedisPublisher[IO] + updateInformationRefreshCommandHandler: UpdateInformationCommandHandler, + redisPublisher: RedisPublisher[IO], + syncCommandHandler: SyncCommandHandler ) { - private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + cats.effect.unsafe.IORuntime.global + private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val redisChannel: RedisChannel[String] = Redis.Channels.federationMemberChangelogUpdate private val chunkSize: Int = 500 @@ -41,13 +46,24 @@ class FederationMemberChangelogUpdateEventListener( } private def processInner(message: FederationMemberChangelogUpdateMessage): IO[Unit] = { - (for - event <- triggerIpfsCidFileTreeUpdateCommand(message) - _ <- event match - case IpfsCidDeltaUpdateSuccessEvent(delta) => report(delta) - case IpfsCidDeltaUpdateErrorEvent() => IO.unit - yield ()).unsafeRunAndForget() - IO.unit + Supervisor[IO](await = true).use { supervisor => + for + event <- triggerIpfsCidFileTreeUpdateCommand(message) + _ <- triggerUpdateInformation(message.federationMemberId, message.federationMemberChangelogItem.ipfsCid) + _ <- reportSyncAckToMember(message) + _ <- event match + case IpfsCidDeltaUpdateSuccessEvent(delta) => report(delta) + case IpfsCidDeltaUpdateErrorEvent() => reportSyncNackToMember(message) + yield () + } + } + + private def reportSyncAckToMember(message: FederationMemberChangelogUpdateMessage): IO[Unit] = { + syncCommandHandler.handle(SyncAckCommand(message.federationMemberId)).flattenEvents + } + + private def reportSyncNackToMember(message: FederationMemberChangelogUpdateMessage): IO[Unit] = { + syncCommandHandler.handle(SyncNackCommand(message.federationMemberId)).flattenEvents } private def report(delta: FederationMemberIpfsCidDelta): IO[Unit] = { @@ -79,6 +95,12 @@ class FederationMemberChangelogUpdateEventListener( }.void } + private def triggerUpdateInformation(federationMemberId: FederationMemberId, ipfsCid: IpfsCid): IO[Unit] = { + val command = InformationRefreshCommand(federationMemberId, ipfsCid) + logger.info(s"triggerUpdateInformation command=$command") >> + updateInformationRefreshCommandHandler.handle(command).void + } + private def triggerIpfsCidFileTreeUpdateCommand( message: FederationMemberChangelogUpdateMessage ): IO[IpfsCidDeltaUpdateEvent[?]] = { diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala index 3afd5bb6fbaad945e5153fb7eef08ab21a9de905..52043043964b414181f3637a361196613f8f31d6 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/input/redispubsub/FederationMemberSharingFolderUpdateListener.scala @@ -9,8 +9,8 @@ import acab.devcon0.domain.ports.input.federationmember._ import acab.devcon0.domain.ports.output.publisher.RedisPublisher import acab.devcon0.output.repository.redisutils.Redis import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId -import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect._ +import cats.effect.std.Supervisor import cats.effect.unsafe.IORuntime import dev.profunktor.redis4cats.data._ import fs2.Pipe @@ -20,13 +20,12 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger class FederationMemberSharingFolderUpdateListener( redisListener: Listener, sharingFolderUpdateCommandHandler: SharingFolderUpdateCommandHandler, - updateInformationRefreshCommandHandler: UpdateInformationCommandHandler, informationQueryHandler: InformationQueryHandler, redisPublisher: RedisPublisher[IO], syncCommandHandler: SyncCommandHandler ) { - private implicit val runtime: IORuntime = cats.effect.unsafe.IORuntime.global + cats.effect.unsafe.IORuntime.global implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] private val channel: RedisChannel[String] = Redis.Channels.federationMemberSharingFolderUpdate @@ -45,17 +44,19 @@ class FederationMemberSharingFolderUpdateListener( private def triggerCommandsInBackground( ipfsPeerIpnsUpdateMessage: FederationMemberSharingFolderUpdateMessage ): IO[Unit] = { - val federationMemberId: FederationMemberId = ipfsPeerIpnsUpdateMessage.federationMemberId - val newIpfsPeerRootCid: IpfsCid = ipfsPeerIpnsUpdateMessage.ipfsCid - (for - federationMemberInformation <- queryFederationMemberInformation(federationMemberId) - updateChangelogEvent <- triggerUpdateChangelogItem(ipfsPeerIpnsUpdateMessage, federationMemberInformation) - isUpdateChangelogProcessed <- isUpdateChangelogProcessed(updateChangelogEvent) - _ <- triggerUpdateInformation(isUpdateChangelogProcessed, federationMemberId, newIpfsPeerRootCid) - _ <- publishMessage(federationMemberId, updateChangelogEvent, isUpdateChangelogProcessed) - _ <- syncCommandHandler.handle(SyncAckCommand(federationMemberInformation.p2pPeerId)).flattenEvents - yield ()).unsafeRunAndForget() - IO.unit + Supervisor[IO](await = true).use { supervisor => + val federationMemberId: FederationMemberId = ipfsPeerIpnsUpdateMessage.federationMemberId + (for + federationMemberInformation <- queryFederationMemberInformation(federationMemberId) + updateChangelogEvent <- triggerUpdateChangelogItem(ipfsPeerIpnsUpdateMessage, federationMemberInformation) + isUpdateChangelogProcessed <- isUpdateChangelogProcessed(updateChangelogEvent) + _ <- publishMessage(federationMemberId, updateChangelogEvent, isUpdateChangelogProcessed) + yield ()).onError(_ => reportSyncNackToMember(federationMemberId)) + } + } + + private def reportSyncNackToMember(federationMemberId: FederationMemberId): IO[Unit] = { + syncCommandHandler.handle(SyncNackCommand(federationMemberId)).flattenEvents } private def publishMessage( @@ -103,17 +104,6 @@ class FederationMemberSharingFolderUpdateListener( informationQueryHandler.handle(InformationByIdQuery(federationMemberId)) } - private def triggerUpdateInformation( - isUpdateChangelogProcessed: Boolean, - federationMemberId: FederationMemberId, - ipfsCid: IpfsCid - ): IO[Unit] = { - if isUpdateChangelogProcessed then - val command = InformationRefreshCommand(federationMemberId, ipfsCid) - updateInformationRefreshCommandHandler.handle(command).void - else IO.unit - } - private def logReceivedMessage(message: FederationMemberSharingFolderUpdateMessage): IO[Unit] = { logger.debug(s"Redis pub/sub. p2pPeerId=${message.federationMemberId} ipfsCid=${message.ipfsCid}}") } diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala index 677f39ec4b48f6bea507cd2f1d3960515019f04d..d3b8c1a177b2ea9a294d76c2fba8817ff8e7d5bd 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/publisher/RedisPublisherImpl.scala @@ -3,8 +3,6 @@ package acab.devcon0.output.publisher import acab.devcon0.domain.codecs.FederationCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs import acab.devcon0.domain.codecs.FederationMemberCodecs.Encoders.RedisSharingFolderUpdateMessage -import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidAdditionMessage -import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidRemovalMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationIpfsCidsDeltaUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberChangelogUpdateMessage import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberIpfsCidsDeltaUpdateMessage @@ -51,18 +49,6 @@ class RedisPublisherImpl(redisClient: Resource[IO, RedisClient]) extends RedisPu .flatMap(publishInner(_, Redis.Channels.federationIpfsCidsDeltaUpdate)) } - override def publish(message: FederationIpfsCidAdditionMessage): IO[Unit] = { - FederationCodecs.Encoders - .IpfsCidsAdditionMessage(message) - .flatMap(publishInner(_, Redis.Channels.federationIpfsCidAddition)) - } - - override def publish(message: FederationIpfsCidRemovalMessage): IO[Unit] = { - FederationCodecs.Encoders - .IpfsCidsRemovalMessage(message) - .flatMap(publishInner(_, Redis.Channels.federationIpfsCidRemoval)) - } - private def publishInner(event: String, redisChannel: RedisChannel[String]): IO[Unit] = { { for diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/ChangelogRepositoryImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/ChangelogRepositoryImpl.scala index e69d4697db2c456851ba94cddf0a9eb0d48233d9..de7ecc1c4c3767a72e25b1bcc7d6ba137c69e28b 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/ChangelogRepositoryImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/ChangelogRepositoryImpl.scala @@ -1,7 +1,6 @@ package acab.devcon0.output.repository.federationmember import java.time.Instant -import java.time.ZoneOffset import scala.collection.immutable.List @@ -10,7 +9,6 @@ import acab.devcon0.domain.dtos._ import acab.devcon0.domain.ports.output.repository.federationmember.ChangelogRepository import acab.devcon0.output.repository.redisutils import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId -import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect.IO import cats.effect.kernel.Resource import cats.implicits._ @@ -24,49 +22,19 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger object FederationMemberChangelog { - def key(id: FederationMemberId): String = { + private def key(id: FederationMemberId): String = { String.join(":", redisutils.Redis.SetPrefixes.federationMemberChangelog, id) } - object Index { - def key(id: FederationMemberId): String = { - String.join(":", redisutils.Redis.SetPrefixes.federationMemberChangelogIndex, id) - } - - object Operations { - def exists( - id: FederationMemberId, - ipfsCid: IpfsCid - ): RedisCommands[IO, String, String] => IO[Boolean] = { redis => - redis.hExists(key(id), ipfsCid) - } - - def get( - id: FederationMemberId, - ipfsCid: IpfsCid - ): RedisCommands[IO, String, String] => IO[Option[String]] = { redis => - redis.hGet(key(id), ipfsCid) - } - - def save( - id: FederationMemberId, - federationMemberChangelogItem: FederationMemberChangelogItem - ): RedisCommands[IO, String, String] => IO[Unit] = { redis => - val instant: Instant = federationMemberChangelogItem.timestamp.atZone(ZoneOffset.UTC).toInstant - redis.hSet(key(id), federationMemberChangelogItem.ipfsCid, instant.toEpochMilli.toString).void - } - } - } - object Operations { def add( id: FederationMemberId, timestamp: Instant, rawJson: String ): RedisCommands[IO, String, String] => IO[Unit] = { redis => - val instant: Instant = timestamp.atZone(ZoneOffset.UTC).toInstant - val score: Score = Score(instant.toEpochMilli.toDouble) - redis.zAdd(key(id), args = None, ScoreWithValue(score, rawJson)).void + val score: Score = Score(timestamp.toEpochMilli.toDouble) + val scoreWithValue: ScoreWithValue[String] = ScoreWithValue(score, rawJson) + redis.zAdd(key(id), args = None, scoreWithValue).void } def remove( @@ -83,6 +51,15 @@ object FederationMemberChangelog { getNewestByKey(key)(redis) } + def getOlderThan( + id: FederationMemberId, + timestamp: Instant + ): RedisCommands[IO, String, String] => IO[List[String]] = { redis => + val key: String = FederationMemberChangelog.key(id) + val instant: Instant = timestamp + redis.zRangeByScore(key, ZRange(Long.MinValue, instant.toEpochMilli), limit = None) + } + private def getNewestByKey( key: String ): RedisCommands[IO, String, String] => IO[Option[String]] = { redis => @@ -90,31 +67,6 @@ object FederationMemberChangelog { .zRevRangeWithScores(key, 0, 0) .map(value => value.headOption.map(_.value)) } - - def getNewest: RedisCommands[IO, String, String] => IO[List[(FederationMemberId, String)]] = { redis => - val prefix = redisutils.Redis.SetPrefixes.federationMemberChangelog - for - keys <- redis.keys(prefix ++ ":*") - newestValuesOptional <- keys.traverse(key => - getNewestByKey(key)(redis).map(option => (key.stripPrefix(prefix ++ ":"), option)) - ) - newestValues <- IO(newestValuesOptional.filter(_._2.isDefined).map(tuple => (tuple._1, tuple._2.get))) - yield { - newestValues - } - } - - def getOlderThan( - id: FederationMemberId, - timestamp: Instant - ): RedisCommands[IO, String, String] => IO[List[String]] = { redis => - { - val key: String = FederationMemberChangelog.key(id) - val instant: Instant = timestamp.atZone(ZoneOffset.UTC).toInstant - redis - .zRangeByScore(key, ZRange(Double.MinValue, instant.toEpochMilli.toDouble), limit = None) - } - } } } @@ -130,29 +82,6 @@ class ChangelogRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, St } - private def decode(maybeRawJson: Option[String]): IO[Option[FederationMemberChangelogItem]] = { - maybeRawJson match - case Some(rawJson) => FederationMemberCodecs.Decoders.ChangelogItem(rawJson).map(Some(_)) - case None => IO.pure(None) - } - - override def getNewest: IO[List[(FederationMemberId, FederationMemberChangelogItem)]] = { - commandsApi - .use(FederationMemberChangelog.Operations.getNewest(_)) - .flatMap(decodeTuple) - - } - - private def decodeTuple( - rawJsonsTuple: List[(FederationMemberId, String)] - ): IO[List[(FederationMemberId, FederationMemberChangelogItem)]] = { - rawJsonsTuple.traverse(tuples => - FederationMemberCodecs.Decoders - .ChangelogItem(tuples._2) - .map((tuples._1, _)) - ) - } - override def getPrevious( id: FederationMemberId, timestamp: Instant @@ -163,10 +92,6 @@ class ChangelogRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, St } - private def decode(rawJsons: List[String]): IO[List[FederationMemberChangelogItem]] = { - rawJsons.traverse(FederationMemberCodecs.Decoders.ChangelogItem(_)) - } - override def add( id: FederationMemberId, federationMemberChangelogItem: FederationMemberChangelogItem @@ -178,43 +103,6 @@ class ChangelogRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, St }) } - private def addInner( - id: FederationMemberId, - federationMemberChangelogItem: FederationMemberChangelogItem, - rawJson: String, - redis: RedisCommands[IO, String, String] - ): IO[FederationMemberChangelogItem] = { - val operations: TxStore[IO, String, String] => List[IO[Unit]] = { _ => - getAddOperations(id, federationMemberChangelogItem, rawJson, redis) - } - runTransaction(redis, operations) - .map(_ => federationMemberChangelogItem) - } - - private def getAddOperations( - id: FederationMemberId, - changelogItem: FederationMemberChangelogItem, - rawJson: String, - redis: RedisCommands[IO, String, String] - ): List[IO[Unit]] = { - List( - FederationMemberChangelog.Index.Operations - .exists(id, changelogItem.ipfsCid)(redis) - .flatMap(exists => { - if exists then { - logger.error(s"Duplicate situation changelogItem=$changelogItem") - IO.unit - } else { - val timestamp = changelogItem.timestamp - for - _ <- FederationMemberChangelog.Operations.add(id, timestamp, rawJson)(redis) - _ <- FederationMemberChangelog.Index.Operations.save(id, changelogItem)(redis) - yield () - } - }) - ) - } - override def update( id: FederationMemberId, federationMemberChangelogItemOld: FederationMemberChangelogItem, @@ -229,6 +117,30 @@ class ChangelogRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, St yield federationMemberChangelogItemNew } + private def decode(rawJsons: List[String]): IO[List[FederationMemberChangelogItem]] = { + rawJsons.traverse(FederationMemberCodecs.Decoders.ChangelogItem(_)) + } + + private def decode(maybeRawJson: Option[String]): IO[Option[FederationMemberChangelogItem]] = { + maybeRawJson match + case Some(rawJson) => FederationMemberCodecs.Decoders.ChangelogItem(rawJson).map(Some(_)) + case None => IO.pure(None) + } + + private def addInner( + id: FederationMemberId, + federationMemberChangelogItem: FederationMemberChangelogItem, + rawJson: String, + redis: RedisCommands[IO, String, String] + ): IO[FederationMemberChangelogItem] = { + val operations: TxStore[IO, String, String] => List[IO[Unit]] = { _ => + val timestamp = federationMemberChangelogItem.timestamp + List(FederationMemberChangelog.Operations.add(id, timestamp, rawJson)(redis)) + } + runTransaction(redis, operations) + .map(_ => federationMemberChangelogItem) + } + private def updateInner( id: FederationMemberId, federationMemberChangelogItem: FederationMemberChangelogItem, diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/CidsRepositoryImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/CidsRepositoryImpl.scala index d2f4d03360eb86f0177c3495b093cd2dceef1b37..574acc39fa74f501de339f0deb2b82ba5f82a4ef 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/CidsRepositoryImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/federationmember/CidsRepositoryImpl.scala @@ -8,6 +8,7 @@ import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId import acab.devcon0.trile.domain.dtos.aliases.IpfsCid import cats.effect.IO import cats.effect.kernel.Resource +import cats.implicits.toTraverseOps import dev.profunktor.redis4cats._ import dev.profunktor.redis4cats.tx.TxStore import org.typelevel.log4cats.Logger @@ -16,7 +17,11 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger object CidsRepository { object Index { - def key(id: FederationMemberId, ipfsCid: IpfsCid): String = { + private def memberPrefix(id: FederationMemberId): String = { + String.join(":", redisutils.Redis.SetPrefixes.federationMemberCidsIndex, id, "*") + } + + private def key(id: FederationMemberId, ipfsCid: IpfsCid): String = { String.join(":", redisutils.Redis.SetPrefixes.federationMemberCidsIndex, id, ipfsCid) } @@ -52,8 +57,10 @@ object CidsRepository { id: FederationMemberId, rootIpfsCid: IpfsCid, ipfsCids: List[IpfsCid] - ): RedisCommands[IO, String, String] => IO[Long] = { redis => - redis.sAdd(key(id, rootIpfsCid), ipfsCids*) + ): RedisCommands[IO, String, String] => IO[Unit] = { redis => + redis.sAdd(key(id, rootIpfsCid), ipfsCids*).void + } + } def exists( @@ -106,7 +113,7 @@ class CidsRepositoryImpl(val commandsApi: Resource[IO, RedisCommands[IO, String, commandsApi .use(redis => { val operations: TxStore[IO, String, String] => List[IO[Unit]] = { _ => - List(CidsRepository.Index.Operations.add(id, rootIpfsCid, ipfsCids)(redis).void) + List(CidsRepository.Index.Operations.add(id, rootIpfsCid, ipfsCids)(redis)) } redis.transact(operations).void.onError(logError) }) diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala index 35b8cee1f8e8767a94b610046050bc8fc3af280e..d6edaba542c132e9a63f0c79c54588db81d62f52 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/output/repository/redisutils/Redis.scala @@ -13,7 +13,6 @@ object Redis { val federationMemberInformation: String = "FEDERATION:MEMBER:INFORMATION" val federationMemberInformationIndex: String = "FEDERATION:MEMBER:INFORMATION/INDEX" val federationMemberChangelog: String = "FEDERATION:MEMBER:CHANGELOG" - val federationMemberChangelogIndex: String = "FEDERATION:MEMBER:CHANGELOG/INDEX" val federationMemberCidsIndex: String = "FEDERATION:MEMBER:CIDS/INDEX" }