Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/datasources/tag/CurrentQueryHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { DataFrameDTO, FieldType, TimeRange } from "@grafana/data";
import { QueryHandler, TagWithValue } from "./types";
import { convertTagValue } from "./utils";
import { Workspace } from "core/types";
import { Observable, of } from "rxjs";

export class CurrentQueryHandler extends QueryHandler {
handleQuery(tagsWithValues: TagWithValue[], result: DataFrameDTO, _workspaces: Workspace[], _range: TimeRange, _maxDataPoints: number | undefined, queryProperties: boolean): Promise<DataFrameDTO> {
return Promise.resolve(this.handleCurrentQuery(queryProperties, tagsWithValues, result));
handleQuery(tagsWithValues: TagWithValue[], result: DataFrameDTO, _workspaces: Workspace[], _range: TimeRange, _maxDataPoints: number | undefined, queryProperties: boolean): Observable<DataFrameDTO> {
return of(this.handleCurrentQuery(queryProperties, tagsWithValues, result));
}

private handleCurrentQuery(queryProperties: boolean, tagsWithValues: TagWithValue[], result: DataFrameDTO): DataFrameDTO {
Expand Down
101 changes: 55 additions & 46 deletions src/datasources/tag/HistoricalQueryHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,47 @@ import { PostFn, QueryHandler, TagHistoryResponse, TagWithValue, TimeAndTagTypeV
import { convertTagValue } from "./utils";
import { getWorkspaceName } from "core/utils";
import { Workspace } from "core/types";
import { map, merge, Observable, of, switchMap } from "rxjs";

export class HistoricalQueryHandler extends QueryHandler {
constructor(
private readonly post: PostFn,
private readonly post$: PostFn,
private readonly baseUrl?: string
) {
super();
}

handleQuery(tagsWithValues: TagWithValue[], result: DataFrameDTO, workspaces: Workspace[], range: TimeRange, maxDataPoints: number | undefined, _queryProperties: boolean): Promise<DataFrameDTO> {
return this.handleHistoricalQuery(tagsWithValues, workspaces, range, maxDataPoints, result);
handleQuery(tagsWithValues: TagWithValue[], result: DataFrameDTO, workspaces: Workspace[], range: TimeRange, maxDataPoints: number | undefined, _queryProperties: boolean): Observable<DataFrameDTO> {
return this.handleHistoricalQuery$(tagsWithValues, workspaces, range, maxDataPoints, result);
}

private async handleHistoricalQuery(
private handleHistoricalQuery$(
tagsWithValues: TagWithValue[],
workspaces: Workspace[],
range: TimeRange,
maxDataPoints: number | undefined,
result: DataFrameDTO,
): Promise<DataFrameDTO> {
): Observable<DataFrameDTO> {
const tagPathCount = this.countTagPaths(tagsWithValues);
const workspaceTagMap = this.groupTagsByWorkspace(tagsWithValues);
const tagPropertiesMap = this.buildTagPropertiesMap(tagsWithValues, tagPathCount, workspaces);

const tagsDecimatedHistory = await this.fetchTagsHistory(
return this.fetchTagsHistory$(
workspaceTagMap,
workspaces,
range,
maxDataPoints,
tagPathCount
);
).pipe(
map((tagsDecimatedHistory) => {
const mergedTagValuesWithType = this.mergeTagsHistoryValues(tagsDecimatedHistory);

const mergedTagValuesWithType = this.mergeTagsHistoryValues(tagsDecimatedHistory);
this.addTimeFieldToResult(result, mergedTagValuesWithType.timestamps);
this.addTagFieldsToResult(result, mergedTagValuesWithType.values, tagPropertiesMap);

this.addTimeFieldToResult(result, mergedTagValuesWithType.timestamps);
this.addTagFieldsToResult(result, mergedTagValuesWithType.values, tagPropertiesMap);

return result;
return result;
}
));
}

private countTagPaths(tagsWithValues: TagWithValue[]): Record<string, number> {
Expand Down Expand Up @@ -78,35 +81,35 @@ export class HistoricalQueryHandler extends QueryHandler {
return tagPropertiesMap;
}

private async fetchTagsHistory(
private fetchTagsHistory$(
workspaceTagMap: Record<string, TagWithValue[]>,
workspaces: Workspace[],
range: TimeRange,
maxDataPoints: number | undefined,
tagPathCount: Record<string, number>
): Promise<Record<string, TypeAndValues>> {
): Observable<Record<string, TypeAndValues>> {
const tagsDecimatedHistory: Record<string, TypeAndValues> = {};

for (const workspace in workspaceTagMap) {
const tagHistoryResponse = await this.getTagHistoryWithChunks(
workspaceTagMap[workspace],
workspace,
range,
maxDataPoints || 0,
const observables$ = Object.entries(workspaceTagMap).map(([workspace, tags]) => {
return this.getTagHistoryWithChunks$(tags, workspace, range, maxDataPoints || 0).pipe(
map(tagHistoryResponse => {
for (const path in tagHistoryResponse.results) {
const prefixedPath =
tagPathCount[path] > 1
? `${getWorkspaceName(workspaces, workspace)}.${path}`
: path;

tagsDecimatedHistory[prefixedPath] = tagHistoryResponse.results[path];
}
})
);
});

for (const path in tagHistoryResponse.results) {
const prefixedPath = tagPathCount[path] > 1
? `${getWorkspaceName(workspaces, workspace)}.${path}`
: path;
tagsDecimatedHistory[prefixedPath] = tagHistoryResponse.results[path];
}
}

return tagsDecimatedHistory;
return merge(...observables$)
.pipe(map(() => tagsDecimatedHistory));
}

private async getTagHistoryWithChunks(paths: TagWithValue[], workspace: string, range: TimeRange, intervals: number): Promise<TagHistoryResponse> {
private getTagHistoryWithChunks$(paths: TagWithValue[], workspace: string, range: TimeRange, intervals: number): Observable<TagHistoryResponse> {
const chunkSize = 10;
const pathChunks: TagWithValue[][] = [];
for (let i = 0; i < paths.length; i += chunkSize) {
Expand All @@ -115,26 +118,32 @@ export class HistoricalQueryHandler extends QueryHandler {

const aggregatedResults: TagHistoryResponse = { results: {} };

const chunkResults = await Promise.all(
pathChunks.map((chunk) => this.getTagHistoryValues(chunk.map(({ tag }) => tag.path), workspace, range, intervals))
);

for (const chunkResult of chunkResults) {
for (const [path, data] of Object.entries(chunkResult.results)) {
if (!aggregatedResults.results[path]) {
aggregatedResults.results[path] = data;
} else {
aggregatedResults.results[path].values.push(...data.values);
}
}
}

return aggregatedResults;
const chunkResults$ = pathChunks.map((chunk) => this.getTagHistoryValues(chunk.map(({ tag }) => tag.path), workspace, range, intervals))

return chunkResults$.reduce((acc$, chunk$) =>
acc$.pipe(
switchMap(aggregated =>
chunk$.pipe(
map(chunkResult => {
for (const [path, data] of Object.entries(chunkResult.results)) {
if (!aggregated.results[path]) {
aggregated.results[path] = data;
} else {
aggregated.results[path].values.push(...data.values);
}
}
return aggregated;
})
)
)
),
of(aggregatedResults));
}

private async getTagHistoryValues(paths: string[], workspace: string, range: TimeRange, intervals: number): Promise<TagHistoryResponse> {
private getTagHistoryValues(paths: string[], workspace: string, range: TimeRange, intervals: number): Observable<TagHistoryResponse> {
const tagHistoryUrl = this.baseUrl + '/nitaghistorian/v2/tags';
return await this.post<TagHistoryResponse>(`${tagHistoryUrl}/query-decimated-history`, {
return this.post$<TagHistoryResponse>(`${tagHistoryUrl}/query-decimated-history`, {
paths,
workspace,
startTime: range.from.toISOString(),
Expand Down
2 changes: 1 addition & 1 deletion src/datasources/tag/QueryHandlerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { HistoricalQueryHandler } from "./HistoricalQueryHandler";
import { PostFn, QueryHandler, TagQueryType } from "./types";

export class QueryHandlerFactory {
constructor(private post: PostFn, private baseUrl?: string) {}
constructor(private post: PostFn, private baseUrl?: string) { }

public createQueryHandler(queryType: TagQueryType): QueryHandler {
switch (queryType) {
Expand Down
39 changes: 26 additions & 13 deletions src/datasources/tag/TagDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import {
TagQuery,
TagQueryType,
TagsWithValues,
TagWithValue,
} from './types';
import { Throw } from 'core/utils';
import { expandMultipleValueVariable } from "./utils";
import { QueryHandlerFactory } from './QueryHandlerFactory';
import { forkJoin, map, Observable, switchMap } from 'rxjs';

export class TagDataSource extends DataSourceBase<TagQuery, TagDataSourceOptions> {
public defaultQuery: Omit<TagQuery, 'refId'> = {
Expand All @@ -25,7 +27,7 @@ export class TagDataSource extends DataSourceBase<TagQuery, TagDataSourceOptions
};

private readonly tagUrl = this.instanceSettings.url + '/nitag/v2';
private readonly queryHandlerFactory = new QueryHandlerFactory(this.post.bind(this), this.instanceSettings.url);
private readonly queryHandlerFactory = new QueryHandlerFactory(this.post$.bind(this), this.instanceSettings.url);

constructor(
readonly instanceSettings: DataSourceInstanceSettings<TagDataSourceOptions>,
Expand All @@ -35,16 +37,23 @@ export class TagDataSource extends DataSourceBase<TagQuery, TagDataSourceOptions
super(instanceSettings, backendSrv, templateSrv);
}

async runQuery(query: TagQuery, { range, maxDataPoints, scopedVars }: DataQueryRequest): Promise<DataFrameDTO> {
const tagsWithValues = await this.getMostRecentTagsByMultiplePaths(
runQuery(query: TagQuery, { range, maxDataPoints, scopedVars }: DataQueryRequest): Observable<DataFrameDTO> {
const tagsWithValues$ = this.getMostRecentTagsByMultiplePaths$(
this.generatePathsFromTemplate(query, scopedVars),
this.templateSrv.replace(query.workspace, scopedVars)
);
const workspacesPromise = this.getWorkspaces();

const workspaces = await this.getWorkspaces();
const result: DataFrameDTO = { refId: query.refId, fields: [] };
return forkJoin([
tagsWithValues$,
workspacesPromise
]).pipe(
switchMap(([tagsWithValues, workspaces]) => {
const result: DataFrameDTO = { refId: query.refId, fields: [] };

return this.queryHandlerFactory.createQueryHandler(query.type).handleQuery(tagsWithValues, result, workspaces, range, maxDataPoints, query.properties);
return this.queryHandlerFactory.createQueryHandler(query.type).handleQuery(tagsWithValues, result, workspaces, range, maxDataPoints, query.properties);
})
);
}

/**
Expand All @@ -54,27 +63,31 @@ export class TagDataSource extends DataSourceBase<TagQuery, TagDataSourceOptions
**/
private generatePathsFromTemplate(query: TagQuery, scopedVars: Record<string, any>): string[] {
let paths: string[] = [query.path];

const replacedPath = this.templateSrv.replace(
query.path,
scopedVars,
(v: string | string[]): string => `{${v}}`
);

paths = expandMultipleValueVariable(replacedPath);

return paths;
}

private async getMostRecentTagsByMultiplePaths(paths: string[], workspace: string) {
private getMostRecentTagsByMultiplePaths$(paths: string[], workspace: string): Observable<TagWithValue[]> {
const workspaceQuery = [workspace || "*"];
const response = await this.post<TagsWithValues>(`${this.tagUrl}/fetch-tags-with-values`, {
const response = this.post$<TagsWithValues>(`${this.tagUrl}/fetch-tags-with-values`, {
paths: paths,
workspaces: workspaceQuery,
take: 100,
});
}).pipe(
map((res: TagsWithValues) => {
return res.tagsWithValues.length ? res.tagsWithValues : Throw(`No tags matched the path '${paths}'`);
})
);

return response.tagsWithValues.length ? response.tagsWithValues : Throw(`No tags matched the path '${paths}'`)
return response;
}

shouldRunQuery(query: TagQuery): boolean {
Expand Down
Loading