diff --git a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala index 971160b6c71168c38f176993abcb69f9c2b88014..500443cc8b6f85bef99ea62755bada5b347e4f93 100644 --- a/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala +++ b/federation-controller-backend/src/main/scala/acab/devcon0/domain/adapters/federationmember/SharingFolderUpdateCommandHandlerImpl.scala @@ -1,21 +1,21 @@ package acab.devcon0.domain.adapters.federationmember +import java.time.Instant + import acab.devcon0.domain.dtos._ -import acab.devcon0.domain.dtos.enums.IpfsCidPinStatus._ import acab.devcon0.domain.dtos.pubsub.Redis.FederationMemberSharingFolderUpdateMessage import acab.devcon0.domain.ports.input._ import acab.devcon0.domain.ports.input.federationmember._ import acab.devcon0.domain.service.federationmember.ChangelogService -import acab.devcon0.trile.domain.dtos.aliases.FederationMemberId import cats.effect.IO -import cats.implicits._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -class SharingFolderUpdateCommandHandlerImpl(changelogService: ChangelogService[IO]) - extends SharingFolderUpdateCommandHandler { +class SharingFolderUpdateCommandHandlerImpl( + changelogService: ChangelogService[IO] +) extends SharingFolderUpdateCommandHandler { - implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + private val logger: Logger[IO] = Slf4jLogger.getLogger[IO] override def handle(cmd: SharingFolderUpdateCommand): IO[SharingFolderEvent[?]] = handleInner(cmd.message, cmd.federationMemberInformation) @@ -28,104 +28,55 @@ class SharingFolderUpdateCommandHandlerImpl(changelogService: ChangelogService[I private def handleInner( message: FederationMemberSharingFolderUpdateMessage, - federationMemberInformation: FederationMemberInformation - ): IO[Option[FederationMemberChangelogItem]] = { - for - maybeFederationMemberChangelogItem <- processMessage(message, federationMemberInformation) - _ <- processOlderChangelogItems(maybeFederationMemberChangelogItem, federationMemberInformation) - yield maybeFederationMemberChangelogItem - } - - private def processMessage( - message: FederationMemberSharingFolderUpdateMessage, - federationMemberInformation: FederationMemberInformation + memberInformation: FederationMemberInformation ): IO[Option[FederationMemberChangelogItem]] = { - filterFederationMemberIpnsUpdate(message, federationMemberInformation) - .flatMap(maybeFederationMemberChangelogItem => - maybeFederationMemberChangelogItem match - case Some(changelogItem) => - saveFederationMemberChangelogItem(federationMemberInformation, changelogItem).map(Some(_)) - case None => IO.pure(None) - ) + buildChangelogItemConditionally(message, memberInformation) + .flatMap { + case Some(changelogItem) => saveChangelogItem(memberInformation, changelogItem).map(Some(_)) + case None => IO.pure(None) + } } - private def saveFederationMemberChangelogItem( - federationMemberInformation: FederationMemberInformation, - federationMemberChangelogItem: FederationMemberChangelogItem + private def saveChangelogItem( + memberInformation: FederationMemberInformation, + changelogItem: FederationMemberChangelogItem ): IO[FederationMemberChangelogItem] = { - changelogService.add(federationMemberInformation.id, federationMemberChangelogItem) + logger.info(s"saveChangelogItem cid=${changelogItem.ipfsCid}") >> + changelogService.add(memberInformation.id, changelogItem) } - private def filterFederationMemberIpnsUpdate( + private def buildChangelogItemConditionally( message: FederationMemberSharingFolderUpdateMessage, - federationMemberInformation: FederationMemberInformation + memberInformation: FederationMemberInformation ): IO[Option[FederationMemberChangelogItem]] = { - getFederationMemberNewestCid(federationMemberInformation) - .flatMap(maybeFederationMemberChangelogItem => - maybeFederationMemberChangelogItem match - case Some(changelogItem) if isMessageNewerAndDifferent(message, changelogItem) => - buildFederationMemberChangelogItem(message) - case Some(federationMemberChangelogItem) => logFilterNotPassedCase(message, federationMemberChangelogItem) - case _ => buildFederationMemberChangelogItem(message) - ) + getSharedFolderChangeLogItem(memberInformation) + .flatMap { + case Some(timestamp) if !isNewerMessage(timestamp, message) => logOutOfOrderCase(message, timestamp) + case _ => IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp))) + } } - private def getFederationMemberNewestCid( - federationMemberInformation: FederationMemberInformation - ): IO[Option[FederationMemberChangelogItem]] = { - changelogService.getNewest(federationMemberInformation.id) + private def getSharedFolderChangeLogItem(information: FederationMemberInformation): IO[Option[Instant]] = { + changelogService.getNewest(information.id).map(_.map(_.timestamp)) } - private def buildFederationMemberChangelogItem( - message: FederationMemberSharingFolderUpdateMessage - ): IO[Option[FederationMemberChangelogItem]] = { - IO.pure(Some(FederationMemberChangelogItem(message.ipfsCid, message.timestamp, PINNED))) + private def isNewerMessage(timestamp: Instant, message: FederationMemberSharingFolderUpdateMessage): Boolean = { + message.timestamp.isAfter(timestamp) + } + + private def logResult(cmd: SharingFolderUpdateCommand): IO[Unit] = { + val nickname = cmd.federationMemberInformation.nickname + logger.info(s"Federation member changelog added. nickname=$nickname ipfsCid=${cmd.message.ipfsCid}") } - private def logFilterNotPassedCase( + private def logOutOfOrderCase( message: FederationMemberSharingFolderUpdateMessage, - federationMemberChangelogItem: FederationMemberChangelogItem + timestamp: Instant ): IO[Option[FederationMemberChangelogItem]] = { logger.info( - s"Command request not processed. Possible duplicate or out of order message." + - s"existingIpfsCid=${federationMemberChangelogItem.ipfsCid} vs incomingIpfsCid=${message.ipfsCid}, " + - s"existingTimestamp=${federationMemberChangelogItem.timestamp} vs incomingTimestamp=${message.timestamp}" + s"Out of order message. Command request not processed. ipfsCid=${message.ipfsCid}, " + + s"latestTimestamp=$timestamp vs incomingTimestamp=${message.timestamp}" ) >> IO.pure(None) } - - private def isMessageNewerAndDifferent( - message: FederationMemberSharingFolderUpdateMessage, - federationMemberChangelogItem: FederationMemberChangelogItem - ): Boolean = { - message.timestamp.isAfter(federationMemberChangelogItem.timestamp) && - !message.ipfsCid.equals(federationMemberChangelogItem.ipfsCid) - } - - private def processOlderChangelogItems( - maybeFederationMemberChangelogItem: Option[FederationMemberChangelogItem], - federationMemberInformation: FederationMemberInformation - ): IO[Unit] = { - maybeFederationMemberChangelogItem match - case Some(changelogItem) => processOlderChangelogItems(changelogItem, federationMemberInformation) - case None => IO.unit - } - - private def processOlderChangelogItems( - changelogItem: FederationMemberChangelogItem, - federationMemberInformation: FederationMemberInformation - ): IO[Unit] = { - val id: FederationMemberId = federationMemberInformation.id - changelogService - .getPrevious(id, changelogItem) - .flatMap(toRemoveChangelogItems => toRemoveChangelogItems.traverse(changelogService.setToRemoved(id, _))) - .void - } - - private def logResult(cmd: SharingFolderUpdateCommand): IO[Unit] = { - logger.info( - s"Federation member changelog added. " + - s"federationMemberName=${cmd.federationMemberInformation.nickname} ipfsCid=${cmd.message.ipfsCid}" - ) - } }