Skip to content

Commit 996d41b

Browse files
imsduSimon Dumas
andauthored
Rename some classes, split incoming/outgoing from queries from clients (#5314)
Co-authored-by: Simon Dumas <simon.dumas@senscience.ai>
1 parent 5a6bb4c commit 996d41b

File tree

20 files changed

+707
-954
lines changed

20 files changed

+707
-954
lines changed

delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClient
88
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
99
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
1010
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, BlazegraphViewEvent}
11+
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.query.IncomingOutgoingLinks
12+
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.query.IncomingOutgoingLinks.Queries
1113
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphSupervisionRoutes, BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
12-
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
14+
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{SparqlSlowQueryDeleter, SparqlSlowQueryLogger, SparqlSlowQueryStore}
1315
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
1416
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
1517
import ch.epfl.bluebrain.nexus.delta.sdk._
@@ -40,11 +42,11 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
4042

4143
make[BlazegraphViewsConfig].from { BlazegraphViewsConfig.load(_) }
4244

43-
make[BlazegraphSlowQueryStore].from { (xas: Transactors) => BlazegraphSlowQueryStore(xas) }
45+
make[SparqlSlowQueryStore].from { (xas: Transactors) => SparqlSlowQueryStore(xas) }
4446

45-
make[BlazegraphSlowQueryDeleter].fromEffect {
46-
(supervisor: Supervisor, store: BlazegraphSlowQueryStore, cfg: BlazegraphViewsConfig, clock: Clock[IO]) =>
47-
BlazegraphSlowQueryDeleter.start(
47+
make[SparqlSlowQueryDeleter].fromEffect {
48+
(supervisor: Supervisor, store: SparqlSlowQueryStore, cfg: BlazegraphViewsConfig, clock: Clock[IO]) =>
49+
SparqlSlowQueryDeleter.start(
4850
supervisor,
4951
store,
5052
cfg.slowQueries.logTtl,
@@ -53,9 +55,8 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
5355
)
5456
}
5557

56-
make[BlazegraphSlowQueryLogger].from {
57-
(cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore, clock: Clock[IO]) =>
58-
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold, clock)
58+
make[SparqlSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: SparqlSlowQueryStore, clock: Clock[IO]) =>
59+
SparqlSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold, clock)
5960
}
6061

6162
make[SparqlClient].named("sparql-indexing-client").from { (cfg: BlazegraphViewsConfig, as: ActorSystem) =>
@@ -123,13 +124,13 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
123124
)(baseUri)
124125
}
125126

126-
make[BlazegraphViewsQuery].fromEffect {
127+
make[BlazegraphViewsQuery].from {
127128
(
128129
aclCheck: AclCheck,
129130
fetchContext: FetchContext,
130131
views: BlazegraphViews,
131132
client: SparqlClient @Id("sparql-query-client"),
132-
slowQueryLogger: BlazegraphSlowQueryLogger,
133+
slowQueryLogger: SparqlSlowQueryLogger,
133134
cfg: BlazegraphViewsConfig,
134135
xas: Transactors
135136
) =>
@@ -144,12 +145,25 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
144145
)
145146
}
146147

148+
make[IncomingOutgoingLinks].fromEffect {
149+
(
150+
fetchContext: FetchContext,
151+
views: BlazegraphViews,
152+
client: SparqlClient @Id("sparql-query-client"),
153+
base: BaseUri
154+
) =>
155+
Queries.load.map { queries =>
156+
IncomingOutgoingLinks(fetchContext, views, client, queries)(base)
157+
}
158+
}
159+
147160
make[BlazegraphViewsRoutes].from {
148161
(
149162
identities: Identities,
150163
aclCheck: AclCheck,
151164
views: BlazegraphViews,
152165
viewsQuery: BlazegraphViewsQuery,
166+
incomingOutgoingLinks: IncomingOutgoingLinks,
153167
baseUri: BaseUri,
154168
cfg: BlazegraphViewsConfig,
155169
cr: RemoteContextResolution @Id("aggregate"),
@@ -159,6 +173,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
159173
new BlazegraphViewsRoutes(
160174
views,
161175
viewsQuery,
176+
incomingOutgoingLinks,
162177
identities,
163178
aclCheck
164179
)(
Lines changed: 58 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,30 @@
11
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph
22

33
import cats.effect.IO
4-
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax
54
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
6-
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
7-
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
5+
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax
86
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews.entityType
9-
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.{Aux, SparqlResultsJson}
7+
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.Aux
108
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client._
119
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._
1210
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue.{AggregateBlazegraphViewValue, IndexingBlazegraphViewValue}
13-
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.SparqlLink.{SparqlExternalLink, SparqlResourceLink}
1411
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
15-
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.BlazegraphSlowQueryLogger
16-
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
12+
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.SparqlSlowQueryLogger
1713
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
1814
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
1915
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Project => ProjectAcl}
2016
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed
2117
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
2218
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.ExpandIri
23-
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment.IriSegment
24-
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.ResultEntry.UnscoredResultEntry
25-
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults
26-
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults.UnscoredSearchResults
27-
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment}
19+
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
2820
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
2921
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.{AggregateView, IndexingView}
30-
import ch.epfl.bluebrain.nexus.delta.sdk.views.{ViewRef, ViewsStore}
22+
import ch.epfl.bluebrain.nexus.delta.sdk.views.{View, ViewRef, ViewsStore}
3123
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
32-
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
3324
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
3425

35-
import java.util.regex.Pattern.quote
36-
3726
trait BlazegraphViewsQuery {
3827

39-
/**
40-
* List incoming links for a given resource.
41-
*
42-
* @param id
43-
* the resource identifier
44-
* @param projectRef
45-
* the project of the resource
46-
* @param pagination
47-
* the pagination config
48-
*/
49-
def incoming(
50-
id: IdSegment,
51-
projectRef: ProjectRef,
52-
pagination: FromPagination
53-
)(implicit caller: Caller, base: BaseUri): IO[SearchResults[SparqlLink]]
54-
55-
/**
56-
* List outgoing links for a given resource.
57-
*
58-
* @param id
59-
* the resource identifier
60-
* @param projectRef
61-
* the project of the resource
62-
* @param pagination
63-
* the pagination config
64-
* @param includeExternalLinks
65-
* whether to include links to resources not managed by Delta
66-
*/
67-
def outgoing(
68-
id: IdSegment,
69-
projectRef: ProjectRef,
70-
pagination: FromPagination,
71-
includeExternalLinks: Boolean
72-
)(implicit caller: Caller, base: BaseUri): IO[SearchResults[SparqlLink]]
73-
7428
/**
7529
* Queries the blazegraph namespace (or namespaces) managed by the view with the passed ''id''. We check for the
7630
* caller to have the necessary query permissions on the view before performing the query.
@@ -94,130 +48,78 @@ trait BlazegraphViewsQuery {
9448

9549
object BlazegraphViewsQuery {
9650

97-
private val loader = ClasspathResourceLoader.withContext(getClass)
98-
9951
implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value)
10052

10153
final def apply(
10254
aclCheck: AclCheck,
10355
fetchContext: FetchContext,
10456
views: BlazegraphViews,
10557
client: SparqlQueryClient,
106-
logSlowQueries: BlazegraphSlowQueryLogger,
58+
logSlowQueries: SparqlSlowQueryLogger,
10759
prefix: String,
10860
xas: Transactors
109-
): IO[BlazegraphViewsQuery] = {
110-
for {
111-
incomingQuery <- loader.contentOf("blazegraph/incoming.txt")
112-
outgoingWithExternalQuery <- loader.contentOf("blazegraph/outgoing_include_external.txt")
113-
outgoingScopedQuery <- loader.contentOf("blazegraph/outgoing_scoped.txt")
114-
viewsStore = ViewsStore[BlazegraphViewRejection, BlazegraphViewState](
115-
BlazegraphViewState.serializer,
116-
views.fetchState(_, _),
117-
view =>
118-
IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id))
119-
.as {
120-
view.value match {
121-
case _: AggregateBlazegraphViewValue =>
122-
Left(view.id)
123-
case i: IndexingBlazegraphViewValue =>
124-
Right(
125-
IndexingView(
126-
ViewRef(view.project, view.id),
127-
BlazegraphViews.namespace(view.uuid, view.indexingRev, prefix),
128-
i.permission
129-
)
130-
)
131-
}
132-
},
133-
xas
134-
)
135-
} yield new BlazegraphViewsQuery {
61+
): BlazegraphViewsQuery = {
62+
val viewsStore = ViewsStore[BlazegraphViewRejection, BlazegraphViewState](
63+
BlazegraphViewState.serializer,
64+
views.fetchState,
65+
view =>
66+
IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id))
67+
.as {
68+
view.value match {
69+
case _: AggregateBlazegraphViewValue =>
70+
Left(view.id)
71+
case i: IndexingBlazegraphViewValue =>
72+
Right(
73+
IndexingView(
74+
ViewRef(view.project, view.id),
75+
BlazegraphViews.namespace(view.uuid, view.indexingRev, prefix),
76+
i.permission
77+
)
78+
)
79+
}
80+
},
81+
xas
82+
)
83+
new BlazegraphViewsQuery {
13684

13785
private val expandIri: ExpandIri[BlazegraphViewRejection] = new ExpandIri(InvalidResourceId.apply)
13886

139-
private def replace(query: String, id: Iri, pagination: FromPagination): String =
140-
query
141-
.replaceAll(quote("{id}"), id.toString)
142-
.replaceAll(quote("{offset}"), pagination.from.toString)
143-
.replaceAll(quote("{size}"), pagination.size.toString)
144-
145-
override def incoming(id: IdSegment, projectRef: ProjectRef, pagination: FromPagination)(implicit
146-
caller: Caller,
147-
base: BaseUri
148-
): IO[SearchResults[SparqlLink]] =
149-
for {
150-
p <- fetchContext.onRead(projectRef)
151-
iri <- expandIri(id, p)
152-
q = SparqlQuery(replace(incomingQuery, iri, pagination))
153-
bindings <- query(IriSegment(defaultViewId), projectRef, q, SparqlResultsJson)
154-
links = toSparqlLinks(bindings.value)
155-
} yield links
156-
157-
override def outgoing(
158-
id: IdSegment,
159-
projectRef: ProjectRef,
160-
pagination: FromPagination,
161-
includeExternalLinks: Boolean
162-
)(implicit caller: Caller, base: BaseUri): IO[SearchResults[SparqlLink]] =
163-
for {
164-
p <- fetchContext.onRead(projectRef)
165-
iri <- expandIri(id, p)
166-
queryTemplate = if (includeExternalLinks) outgoingWithExternalQuery else outgoingScopedQuery
167-
q = SparqlQuery(replace(queryTemplate, iri, pagination))
168-
bindings <- query(IriSegment(defaultViewId), projectRef, q, SparqlResultsJson)
169-
links = toSparqlLinks(bindings.value)
170-
} yield links
171-
17287
override def query[R <: SparqlQueryResponse](
17388
id: IdSegment,
17489
project: ProjectRef,
175-
query: SparqlQuery,
90+
sparqlQuery: SparqlQuery,
17691
responseType: Aux[R]
177-
)(implicit caller: Caller): IO[R] =
92+
)(implicit caller: Caller): IO[R] = {
17893
for {
179-
view <- viewsStore.fetch(id, project)
180-
p <- fetchContext.onRead(project)
181-
iri <- expandIri(id, p)
182-
indices <- view match {
183-
case i: IndexingView =>
184-
aclCheck
185-
.authorizeForOr(i.ref.project, i.permission)(
186-
AuthorizationFailed(i.ref.project, i.permission)
187-
)
188-
.as(Set(i.index))
189-
case a: AggregateView =>
190-
aclCheck
191-
.mapFilter[IndexingView, String](
192-
a.views,
193-
v => ProjectAcl(v.ref.project) -> v.permission,
194-
_.index
195-
)
196-
}
197-
qr <- logSlowQueries(
198-
BlazegraphQueryContext(ViewRef.apply(project, iri), query, caller.subject),
199-
client.query(indices, query, responseType).adaptError { case e: SparqlClientError =>
200-
WrappedBlazegraphClientError(e)
201-
}
202-
).span("blazegraphQuery")
94+
view <- viewsStore.fetch(id, project)
95+
p <- fetchContext.onRead(project)
96+
iri <- expandIri(id, p)
97+
namespaces <- viewToNamespaces(view)
98+
queryIO = client.query(namespaces, sparqlQuery, responseType).adaptError { case e: SparqlClientError =>
99+
WrappedBlazegraphClientError(e)
100+
}
101+
qr <- logSlowQueries(ViewRef(project, iri), sparqlQuery, caller.subject, queryIO)
203102
} yield qr
103+
}.span("blazegraphUserQuery")
104+
105+
// Translate a view to the set of underlying namespaces according to the current caller acls
106+
private def viewToNamespaces(view: View)(implicit caller: Caller) =
107+
view match {
108+
case i: IndexingView =>
109+
aclCheck
110+
.authorizeForOr(i.ref.project, i.permission)(
111+
AuthorizationFailed(i.ref.project, i.permission)
112+
)
113+
.as(Set(i.index))
114+
case a: AggregateView =>
115+
aclCheck
116+
.mapFilter[IndexingView, String](
117+
a.views,
118+
v => ProjectAcl(v.ref.project) -> v.permission,
119+
_.index
120+
)
121+
}
204122

205-
private def toSparqlLinks(sparqlResults: SparqlResults)(implicit
206-
base: BaseUri
207-
): SearchResults[SparqlLink] = {
208-
val (count, results) =
209-
sparqlResults.results.bindings
210-
.foldLeft((0L, List.empty[SparqlLink])) { case ((total, acc), bindings) =>
211-
val newTotal = bindings.get("total").flatMap(v => v.value.toLongOption).getOrElse(total)
212-
val res = (SparqlResourceLink(bindings) orElse SparqlExternalLink(bindings))
213-
.map(_ :: acc)
214-
.getOrElse(acc)
215-
(newTotal, res)
216-
}
217-
UnscoredSearchResults(count, results.map(UnscoredResultEntry(_)))
218-
}
219123
}
220124
}
221-
222-
final case class BlazegraphQueryContext(view: ViewRef, query: SparqlQuery, subject: Subject)
223125
}

0 commit comments

Comments
 (0)