File tree Expand file tree Collapse file tree 2 files changed +30
-4
lines changed
main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query
test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query Expand file tree Collapse file tree 2 files changed +30
-4
lines changed Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import fs2.Stream
1313import fs2 .concurrent .SignallingRef
1414
1515import concurrent .duration ._
16+ import scala .util .Try
1617
1718/**
1819 * Computes the outcome to apply when all elements are consumed by a projection
@@ -75,9 +76,20 @@ object RefreshOrStop {
7576 private def passivate (project : ProjectRef , signal : SignallingRef [IO , Boolean ]) =
7677 for {
7778 _ <- logger.info(s " Project ' $project' is inactive, pausing until some activity is seen again. " )
78- durationOpt <- Stream .awakeEvery[IO ](1 .second).scanMonoid.interruptWhen(signal).compile.last
79- minutes = durationOpt.getOrElse(0 .minute).toMinutes
80- _ <- logger.info(s " Project ' $project' is active again after ` $minutes minutes`, querying will resume. " )
79+ durationOpt <- Stream
80+ .awakeEvery[IO ](1 .second)
81+ .fold(Option (Duration .Zero )) { case (accOpt, duration) =>
82+ accOpt.flatMap(safeAdd(_, duration))
83+ }
84+ .interruptWhen(signal)
85+ .compile
86+ .last
87+ .map(_.flatten)
88+ hours = durationOpt.getOrElse(0 .hour).toHours
89+ _ <- logger.info(s " Project ' $project' is active again after ` $hours hours`, querying will resume. " )
8190 } yield Passivated
8291
92+ private [query] def safeAdd (d1 : FiniteDuration , d2 : FiniteDuration ) =
93+ Try { d1 + d2 }.toOption
94+
8395}
Original file line number Diff line number Diff line change @@ -9,7 +9,8 @@ import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
99import ch .epfl .bluebrain .nexus .testkit .mu .ce .PatienceConfig
1010import fs2 .concurrent .SignallingRef
1111
12- import scala .concurrent .duration .DurationInt
12+ import java .util .concurrent .TimeUnit
13+ import scala .concurrent .duration .{DurationInt , FiniteDuration }
1314
1415class RefreshOrStopSuite extends NexusSuite {
1516
@@ -82,4 +83,17 @@ class RefreshOrStopSuite extends NexusSuite {
8283 } yield ()
8384 }
8485
86+ test(" Safely add two durations and return the result" ) {
87+ val d1 = FiniteDuration (42L , TimeUnit .NANOSECONDS )
88+ val d2 = FiniteDuration (8L , TimeUnit .NANOSECONDS )
89+ val expected = FiniteDuration (50L , TimeUnit .NANOSECONDS )
90+ assertEquals(RefreshOrStop .safeAdd(d1, d2), Some (expected))
91+ }
92+
93+ test(" Trigger an overflow and handle the error" ) {
94+ val d1 = FiniteDuration (Long .MaxValue - 5L , TimeUnit .NANOSECONDS )
95+ val d2 = FiniteDuration (8L , TimeUnit .NANOSECONDS )
96+ assertEquals(RefreshOrStop .safeAdd(d1, d2), None )
97+ }
98+
8599}
You can’t perform that action at this time.
0 commit comments