Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions csp_gateway/server/config/gateway/omnibus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ modules:
str_basket: 20
architectures:
basket: server
indexes:
example: ["x", "y"]
server_views:
server_view:
table: example
group_by: ["z"]
mount_rest_routes:
_target_: csp_gateway.MountRestRoutes
force_mount_all: True
Expand All @@ -38,7 +44,6 @@ gateway:
PORT: ${port}
UI: True
modules:
- /modules/logfire
- /modules/example_module
- /modules/example_module_feedback
- /modules/example_custom_table
Expand All @@ -48,7 +53,7 @@ gateway:
- /modules/mount_perspective_tables
- /modules/mount_rest_routes
- /modules/mount_websocket_routes
- /modules/mount_api_key_middleware
# - /modules/mount_api_key_middleware
channels:
_target_: csp_gateway.server.demo.ExampleGatewayChannels

Expand Down
13 changes: 13 additions & 0 deletions csp_gateway/server/demo/omnibus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from datetime import date, datetime, timedelta
from logging import INFO, basicConfig
from pathlib import Path
from random import choice
from string import ascii_lowercase
from typing import Annotated, Dict, List

import csp
Expand Down Expand Up @@ -72,6 +74,7 @@ def nonnegative_check(v):
class ExampleDataBase(csp.Struct):
x: Annotated[int, AfterValidator(nonnegative_check)]
y: str = ""
z: str = ""
internal_csp_struct: ExampleCspStruct = ExampleCspStruct()
data: Numpy1DArray[float] = np.array([])
mapping: Dict[str, int] = {}
Expand Down Expand Up @@ -152,6 +155,7 @@ def subscribe(
return ExampleData(
x=last_x,
y=str(last_x) * multiplier,
z=choice(ascii_lowercase),
data=np.random.random((SCALE,)),
mapping={str(last_x): last_x},
)
Expand Down Expand Up @@ -305,6 +309,15 @@ def push_to_perspective( # type: ignore[no-untyped-def]
layouts={
"Server Defined Layout": '{"sizes":[1],"detail":{"main":{"type":"split-area","orientation":"vertical","children":[{"type":"split-area","orientation":"horizontal","children":[{"type":"tab-area","widgets":["EXAMPLE_LIST_GENERATED_4"],"currentIndex":0},{"type":"tab-area","widgets":["PERSPECTIVE_GENERATED_ID_1"],"currentIndex":0}],"sizes":[0.3,0.7]},{"type":"split-area","orientation":"horizontal","children":[{"type":"tab-area","widgets":["EXAMPLE_GENERATED_3"],"currentIndex":0},{"type":"tab-area","widgets":["PERSPECTIVE_GENERATED_ID_0"],"currentIndex":0}],"sizes":[0.3,0.7]}],"sizes":[0.5,0.5]}},"viewers":{"EXAMPLE_LIST_GENERATED_4":{"version":"3.3.4","plugin":"Datagrid","plugin_config":{"columns":{},"edit_mode":"READ_ONLY","scroll_lock":false},"columns_config":{},"title":"example_list","group_by":[],"split_by":[],"columns":["timestamp","x","y","data","mapping","dt","d","internal_csp_struct.z"],"filter":[],"sort":[["timestamp","desc"]],"expressions":{},"aggregates":{},"table":"example_list","settings":false},"PERSPECTIVE_GENERATED_ID_1":{"version":"3.3.4","plugin":"X Bar","plugin_config":{},"columns_config":{},"title":"example_list (*)","group_by":["x"],"split_by":[],"columns":["y"],"filter":[],"sort":[["x","asc"]],"expressions":{},"aggregates":{"y":"median"},"table":"example_list","settings":false},"EXAMPLE_GENERATED_3":{"version":"3.3.4","plugin":"Datagrid","plugin_config":{"columns":{},"edit_mode":"READ_ONLY","scroll_lock":false},"columns_config":{},"title":"example","group_by":[],"split_by":[],"columns":["timestamp","x","y","data","mapping","dt","d","internal_csp_struct.z"],"filter":[],"sort":[["timestamp","desc"]],"expressions":{},"aggregates":{},"table":"example","settings":false},"PERSPECTIVE_GENERATED_ID_0":{"version":"3.3.4","plugin":"Treemap","plugin_config":{},"columns_config":{},"title":"example (*)","group_by":["x"],"split_by":[],"columns":["y","x",null],"filter":[],"sort":[["timestamp","desc"]],"expressions":{},"aggregates":{},"table":"example","settings":false}}}' # noqa: E501
},
limits={"str_basket": 20},
architectures={"basket": "server"},
indexes={"example": ["x", "y"]},
server_views={
"server_view": {
"table": "example",
"group_by": ["z"],
},
},
update_interval=timedelta(seconds=1),
),
MountRestRoutes(force_mount_all=True),
Expand Down
85 changes: 76 additions & 9 deletions csp_gateway/server/modules/web/perspective.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Literal,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
Expand All @@ -21,7 +22,7 @@
from fastapi import APIRouter, WebSocket
from perspective import Client, Server, Table
from perspective.handlers.starlette import PerspectiveStarletteHandler
from pydantic import Field, PrivateAttr
from pydantic import Field, PrivateAttr, field_validator
from starlette.websockets import WebSocketDisconnect
from typing_extensions import TypeAliasType

Expand Down Expand Up @@ -66,14 +67,19 @@ def perspective_thread(client: Client) -> None:
psp_loop.run_forever()


def create_pyarrow_table(key_name, data, arrow_schema, date_conversion_set):
def create_pyarrow_table(key_name, data, computed_index, arrow_schema, date_conversion_set):
new_data = []
for item in data:
flattened_items = item.psp_flatten()
if key_name:
for f in flattened_items:
# TODO better
f["basket-key"] = key_name
if computed_index:
index_name, index_fields = computed_index
for f in flattened_items:
computed_index_value = "-".join(str(f[field]) for field in index_fields)
f[index_name] = computed_index_value
new_data.extend(flattened_items)

# convert to arrow
Expand All @@ -93,13 +99,20 @@ def create_pyarrow_table(key_name, data, arrow_schema, date_conversion_set):
return table


def pull_data_thread(queue: PickleableQueue, table_insts, arrow_schema_insts, arrow_schema_date_conversions):
def pull_data_thread(
queue: PickleableQueue,
table_insts,
computed_indexes,
arrow_schema_insts,
arrow_schema_date_conversions,
):
while True:
try:
for (table_name, key_name), timeserieses in queue.get().items():
table = create_pyarrow_table(
key_name,
timeserieses,
computed_indexes.get(table_name),
arrow_schema_insts[table_name],
arrow_schema_date_conversions[table_name],
)
Expand All @@ -116,23 +129,40 @@ def pull_data_thread(queue: PickleableQueue, table_insts, arrow_schema_insts, ar

ExcludedColumns = TypeAliasType("ExcludedColumns", "Union[Set[str], Dict[str, Union[bool, ExcludedColumns]]]")

ViewConfig = Dict[
Literal["table", "group_by", "split_by", "aggregates", "columns", "sort", "filter", "expressions"],
Union[
str, # table
List[str], # group_by, split_by, columns, expressions
Dict[str, str], # aggregates
List[Dict[str, Union[str, Literal["asc", "desc"]]]], # sort
List[Dict[str, Union[str, List[Union[str, int, float]]]]], # filter
],
]


class MountPerspectiveTables(GatewayModule):
requires: Optional[ChannelSelection] = []

tables: ChannelSelection = Field(default_factory=ChannelSelection)
_unused_tables: Optional[List[str]] = PrivateAttr(default_factory=list)

server_views: Optional[Dict[str, ViewConfig]] = Field(
default_factory=dict, description="Optional dict mapping new table name to a dict with table and view information"
)

limits: Dict[str, int] = Field(
description="Dict mapping table name to [perspective limit](https://perspective-dev.github.io/guide/explanation/table/options.html)",
default={},
)
default_limit: Optional[int] = Field(None, description="Default limit for all tables, i.e. 1000")
indexes: Dict[str, Optional[str]] = Field(
description="Dict mapping table name to [perspective index](https://perspective-dev.github.io/guide/explanation/table/options.html)",
indexes: Dict[str, Optional[Union[str, List[str]]]] = Field(
description="Dict mapping table name to [perspective index](https://perspective-dev.github.io/guide/explanation/table/options.html). . If a multi-index is provided, will create a new computed index field.",
default={},
)
default_index: Optional[str] = Field(None, description="Default index field for all tables, i.e. 'id'")
default_index: Optional[Union[str, List[str]]] = Field(
None, description="Default index field for all tables, i.e. 'id'. If a multi-index is provided, will create a new computed index field."
)
architectures: Dict[str, Literal["server", "client-server"]] = Field(
description="Dict mapping table name to [perspective data architecture](https://perspective-dev.github.io/guide/explanation/architecture.html), default is client-server",
default={},
Expand Down Expand Up @@ -178,10 +208,25 @@ class MountPerspectiveTables(GatewayModule):
_arrow_schema_insts: Dict[str, Dict] = PrivateAttr(default_factory=dict)
_arrow_schema_date_conversions: Dict[str, Set[str]] = PrivateAttr(default_factory=dict)
_table_insts: Dict[str, Table] = PrivateAttr(default={})
# Mapping from table name to (computed index field name, list of fields used to compute index)
_computed_indexes: Dict[str, Tuple[str, List[str]]] = PrivateAttr(default={})

_queue: PickleableQueue = PrivateAttr(default_factory=PickleableQueue)

@field_validator("server_views", mode="after")
def _validate_server_views(cls, v: Dict[str, ViewConfig]) -> Dict[str, ViewConfig]:
for new_table_name, view_config in v.items():
# Must specify table
if "table" not in view_config:
raise ValueError(f"View config for {new_table_name} must specify a base 'table' to create the view from.")
# Must specify at least one view operation
if len(view_config.keys()) == 1:
raise ValueError(f"View config for {new_table_name} must specify at least one view operation in addition to the base 'table'.")
return v

def _connect_all_tables(self, channels: GatewayChannels) -> None:
for field in self.tables.select_from(channels):
selected_tables = set(self.tables.select_from(channels)) | set(_["table"] for _ in self.server_views.values())
for field in selected_tables:
edge = channels.get_channel(field)
excluded_columns = self.excluded_table_columns.get(field, None)
if isinstance(edge, dict):
Expand Down Expand Up @@ -235,6 +280,12 @@ def _connect_all_tables(self, channels: GatewayChannels) -> None:
if v is date:
self._arrow_schema_date_conversions[field].add(k)

# Add server-defined views
for new_table_name, view_config in self.server_views.items():
base_table = self._client.open_table(view_config.pop("table"))
view = base_table.view(**view_config)
self._table_insts[new_table_name] = self._client.table(view, name=new_table_name)

def get_schema_from_field(self, channels: GatewayChannels, field: str):
edge = channels.get_channel(field)

Expand All @@ -257,6 +308,12 @@ def get_schema_from_field(self, channels: GatewayChannels, field: str):
return struct_type.psp_schema(excluded_columns)

def add_table(self, field: str, schema, limit: int = None, index: str = None):
if isinstance(index, list):
# create a new computed index field
index_fields = index
index = "index" if "index" not in schema else "-".join(index)
schema[index] = str
self._computed_indexes[field] = (index, index_fields)
self._table_insts[field] = self._client.table(schema, limit, index, name=field)

def connect(self, channels: GatewayChannels) -> None:
Expand Down Expand Up @@ -376,7 +433,16 @@ async def get_perspective_layouts():
@api_router.get(
"{}/{}".format(self._route, "meta"),
responses=get_default_responses(),
response_model=Dict[str, Union[None, int, str, Dict[str, Union[str, int]], Dict[str, Dict[str, str]], List[str]]],
response_model=Dict[
str,
Union[
None,
int, # limit
str, # index, architecture
List[str], # index, unused tables
Dict[str, Union[str, int, List[str], Dict[str, str]]],
],
],
tags=["Utility"],
)
async def get_perspective_meta():
Expand All @@ -387,7 +453,7 @@ async def get_perspective_meta():
return {
"limits": self.limits,
"default_limit": self.default_limit,
"indexes": self.indexes,
"indexes": {**self.indexes, **{k: v[0] for k, v in self._computed_indexes.items()}},
"default_index": self.default_index,
"architectures": self.architectures,
"default_architecture": self.default_architecture,
Expand All @@ -405,6 +471,7 @@ def run_perspective(self):
args=(
self._queue,
self._table_insts,
self._computed_indexes,
self._arrow_schema_insts,
self._arrow_schema_date_conversions,
),
Expand Down
6 changes: 3 additions & 3 deletions csp_gateway/tests/server/modules/web/test_perspective.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ def test_pyarrow_conversion():
datetime_key_set.add("timestamp")

o = MyPyArrowStruct(timestamp=now, d=now_date)
table = create_pyarrow_table(None, [o], arrow_schema, date_key_set)
table = create_pyarrow_table(None, [o], None, arrow_schema, date_key_set)
df = pl.from_arrow(table)
assert df.head()["d"][0] == now_date

# Check nones are handled correctly
o = MyPyArrowStruct(timestamp=None, d=None)
table = create_pyarrow_table(None, [o], arrow_schema, date_key_set)
table = create_pyarrow_table(None, [o], None, arrow_schema, date_key_set)
df = pl.from_arrow(table)
assert df.head()["d"][0] is None
assert df.head()["timestamp"][0] is None
Expand Down Expand Up @@ -565,4 +565,4 @@ def connect(self, channels):
gateway._build_web(ui=True, timeout=1, _in_test=True)

# Assert that tables were pruned
assert psp_module._unused_tables == ["b", "d", "f"]
assert sorted(psp_module._unused_tables) == sorted(["b", "d", "f"])
14 changes: 13 additions & 1 deletion js/build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ import { BuildCss } from "@prospective.co/procss/target/cjs/procss.js";
import cpy from "cpy";
import fs from "fs";
import { createRequire } from "node:module";
import path from "node:path";

const require = createRequire(import.meta.url);

// Force all react imports to resolve to the same copy to avoid
// duplicate-React errors when dependencies (e.g. @perspective-dev/react)
// list a different React version.
const REACT_ALIAS = {
react: path.dirname(require.resolve("react/package.json")),
"react-dom": path.dirname(require.resolve("react-dom/package.json")),
"react/jsx-runtime": require.resolve("react/jsx-runtime"),
};

const BUILD = [
{
Expand Down Expand Up @@ -31,6 +43,7 @@ const BUILD = [
bundle: true,
plugins: [],
format: "esm",
alias: REACT_ALIAS,
loader: {
".css": "text",
".html": "text",
Expand All @@ -44,7 +57,6 @@ const BUILD = [
},
];

const require = createRequire(import.meta.url);
function add(builder, path, path2) {
builder.add(path, fs.readFileSync(require.resolve(path2 || path)).toString());
}
Expand Down
21 changes: 11 additions & 10 deletions js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@
"prepack": "pnpm run build"
},
"dependencies": {
"@perspective-dev/client": "~4.0.1",
"@perspective-dev/server": "~4.0.1",
"@perspective-dev/viewer": "~4.0.1",
"@perspective-dev/viewer-d3fc": "~4.0.1",
"@perspective-dev/viewer-datagrid": "~4.0.1",
"@perspective-dev/workspace": "~4.0.1",
"@perspective-dev/client": "~4.2.0",
"@perspective-dev/react": "~4.2.0",
"@perspective-dev/server": "~4.2.0",
"@perspective-dev/viewer": "~4.2.0",
"@perspective-dev/viewer-d3fc": "~4.2.0",
"@perspective-dev/viewer-datagrid": "~4.2.0",
"@perspective-dev/workspace": "~4.2.0",
"perspective-summary": "~0.1.0",
"react": "^19.2.3",
"react-dom": "^19.2.3",
"react": "^19.2.4",
"react-dom": "^19.2.4",
"react-icons": "^5.5.0",
"react-modern-drawer": "^1.4.0"
},
"devDependencies": {
"@finos/perspective-esbuild-plugin": "3.2.1",
"@prospective.co/procss": "^0.1.17",
"cpy": "^12.1.0",
"cpy": "^13.0.0",
"esbuild": "^0.27.2",
"npm-run-all": "^4.1.5",
"prettier": "^3.7.4"
"prettier": "^3.8.1"
}
}
Loading