Skip to content

Commit f042ca2

Browse files
imsduSimon Dumas
andauthored
Use type alias for streaming elems (#5338)
Co-authored-by: Simon Dumas <simon.dumas@senscience.ai>
1 parent f381eac commit f042ca2

File tree

56 files changed

+125
-141
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+125
-141
lines changed

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn
3232
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
3333
import ch.epfl.bluebrain.nexus.delta.sourcing.model.*
3434
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
35-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
35+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, SuccessElemStream}
3636
import io.circe.Json
3737

3838
import java.util.UUID

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/SparqlIndexingAction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{IndexingViewDe
99
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
1010
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
1111
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
12-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
12+
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag}
1313
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
1414
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
1515
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
1111
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
1212
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
1313
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
14-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream
1514
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
1615
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
1716
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
18-
import fs2.Stream
1917

2018
sealed trait BlazegraphCoordinator
2119

@@ -51,7 +49,7 @@ object BlazegraphCoordinator {
5149
deleteNamespace: ActiveViewDef => IO[Unit]
5250
) extends BlazegraphCoordinator {
5351

54-
def run(offset: Offset): Stream[IO, Elem[Unit]] = {
52+
def run(offset: Offset): ElemStream[Unit] = {
5553
fetchViews(offset).evalMap { elem =>
5654
elem
5755
.traverse { v =>

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDef.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
88
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewState
99
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
1010
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
11-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
1211
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
1312
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
1413
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
15-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
1614
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
15+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
1716

1817
/**
1918
* Definition of a Blazegraph view to build a projection

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/SparqlSink.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.graph.NTriples
1515
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.*
1616
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
1717
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
18-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
1918
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
20-
import fs2.Chunk
19+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, ElemChunk}
2120
import org.http4s.Uri
2221
import shapeless.Typeable
2322

@@ -52,7 +51,7 @@ final class SparqlSink(
5251
implicit private val kamonComponent: KamonMetricComponent =
5352
KamonMetricComponent(BlazegraphViews.entityType.value)
5453

55-
override def apply(elements: Chunk[Elem[NTriples]]): IO[Chunk[Elem[Unit]]] = {
54+
override def apply(elements: ElemChunk[NTriples]): IO[ElemChunk[Unit]] = {
5655
val bulk = elements.foldLeft(SparqlBulk.empty(endpoint)) {
5756
case (acc, Elem.SuccessElem(_, id, _, _, _, triples, _)) =>
5857
acc.replace(id, triples)
@@ -76,7 +75,7 @@ final class SparqlSink(
7675
IO.pure(markInvalidIdsAsFailed(elements, bulk.invalidIds))
7776
}.span("sparqlSink")
7877

79-
private def markInvalidIdsAsFailed(elements: Chunk[Elem[NTriples]], invalidIds: Set[Iri]) =
78+
private def markInvalidIdsAsFailed(elements: ElemChunk[NTriples], invalidIds: Set[Iri]) =
8079
elements.map { e =>
8180
if (invalidIds.contains(e.id))
8281
e.failed(InvalidIri)

delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/SparqlIndexingActionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
1313
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState
1414
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive
1515
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
16+
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
1617
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
17-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
1818
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
1919
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
2020
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
2121
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
22-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef}
22+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef, SuccessElemStream}
2323
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
2424
import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig
2525
import fs2.Stream

delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
99
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
1010
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
1111
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
12-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
12+
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
1313
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
1414
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
15+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
1516
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
1617
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
1718
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply
18-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
1919
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
2020
import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig
2121
import fs2.Stream

delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.syntax.*
2323
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
2424
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
2525
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
26-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
26+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, ElemChunk}
2727
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
2828
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
2929
import fs2.Chunk
@@ -55,7 +55,7 @@ trait CompositeSink extends Sink
5555
final class Single[SinkFormat](
5656
queryGraph: SingleQueryGraph,
5757
transform: GraphResource => IO[Option[SinkFormat]],
58-
sink: Chunk[Elem[SinkFormat]] => IO[Chunk[Elem[Unit]]],
58+
sink: ElemChunk[SinkFormat] => IO[ElemChunk[Unit]],
5959
override val chunkSize: Int,
6060
override val maxWindow: FiniteDuration,
6161
retryStrategy: RetryStrategy[Throwable]
@@ -70,7 +70,7 @@ final class Single[SinkFormat](
7070
transformed <- graph.flatTraverse(transform)
7171
} yield transformed
7272

73-
override def apply(elements: Chunk[Elem[GraphResource]]): IO[Chunk[Elem[Unit]]] =
73+
override def apply(elements: ElemChunk[GraphResource]): IO[ElemChunk[Unit]] =
7474
elements
7575
.traverse {
7676
case e: SuccessElem[GraphResource] => e.evalMapFilter(queryTransform)
@@ -100,7 +100,7 @@ final class Single[SinkFormat](
100100
final class Batch[SinkFormat](
101101
queryGraph: BatchQueryGraph,
102102
transform: GraphResource => IO[Option[SinkFormat]],
103-
sink: Chunk[Elem[SinkFormat]] => IO[Chunk[Elem[Unit]]],
103+
sink: ElemChunk[SinkFormat] => IO[ElemChunk[Unit]],
104104
override val chunkSize: Int,
105105
override val maxWindow: FiniteDuration,
106106
retryStrategy: RetryStrategy[Throwable]
@@ -115,7 +115,7 @@ final class Batch[SinkFormat](
115115
override def inType: Typeable[GraphResource] = Typeable[GraphResource]
116116

117117
/** Performs the sparql query only using [[SuccessElem]]s from the chunk */
118-
private def query(elements: Chunk[Elem[GraphResource]]): IO[Option[Graph]] =
118+
private def query(elements: ElemChunk[GraphResource]): IO[Option[Graph]] =
119119
elements.mapFilter(elem => elem.map(_.id).toOption) match {
120120
case ids if ids.nonEmpty => queryGraph(ids).retry(retryStrategy)
121121
case _ => IO.none
@@ -233,7 +233,7 @@ object CompositeSink {
233233
common: String,
234234
query: SparqlConstructQuery,
235235
transform: GraphResource => IO[Option[SinkFormat]],
236-
sink: Chunk[Elem[SinkFormat]] => IO[Chunk[Elem[Unit]]],
236+
sink: ElemChunk[SinkFormat] => IO[ElemChunk[Unit]],
237237
batchConfig: BatchConfig,
238238
sinkConfig: SinkConfig,
239239
retryStrategyConfig: RetryStrategyConfig

delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn
3030
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
3131
import ch.epfl.bluebrain.nexus.delta.sourcing.model.*
3232
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
33-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
33+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, SuccessElemStream}
3434
import io.circe.Json
3535

3636
import scala.concurrent.duration.FiniteDuration

delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.auth.{AuthTokenProvider, Credentials}
1313
import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
1414
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectStatistics
1515
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
16-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, IriFilter, Tag}
16+
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{IriFilter, Tag}
1717
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
18-
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems}
18+
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, ElemStream, RemainingElems}
1919
import io.circe.Json
2020
import io.circe.parser.decode
2121
import org.http4s.Method.{GET, HEAD}

0 commit comments

Comments
 (0)