Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions console/packages/console-frontend/src/api/state/state.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { afterEach, describe, expect, it, vi } from 'vitest'
import { fetchStateItems } from './state'

function mockStateItems(payload: unknown): void {
vi.stubGlobal(
'fetch',
vi.fn(async () => ({
ok: true,
json: async () => payload,
})),
)
}

describe('fetchStateItems', () => {
afterEach(() => {
vi.unstubAllGlobals()
})

it('uses explicit item keys', async () => {
mockStateItems({ items: [{ key: 'worker', value: { status: 'ready' } }] })

const { items } = await fetchStateItems('runtime')

expect(items[0]).toMatchObject({
groupId: 'runtime',
key: 'worker',
value: { status: 'ready' },
type: 'object',
})
})

it('uses map keys for object-shaped item responses', async () => {
mockStateItems({ items: { 'files/worker/config.yaml': { raw: true } } })

const { items } = await fetchStateItems('runtime')

expect(items[0].key).toBe('files/worker/config.yaml')
expect(items[0].value).toEqual({ raw: true })
})

it('uses explicit item keys before generated object entry keys', async () => {
mockStateItems({ items: { 'item-0': { key: 'worker', value: { status: 'ready' } } } })

const { items } = await fetchStateItems('runtime')

expect(items[0].key).toBe('worker')
expect(items[0].value).toEqual({ status: 'ready' })
})

it('uses embedded state_key before falling back', async () => {
mockStateItems({ items: [{ state_key: 'last_test_suite', status: 'PASS' }] })

const { items } = await fetchStateItems('runtime')

expect(items[0].key).toBe('last_test_suite')
expect(items[0].value).toEqual({ state_key: 'last_test_suite', status: 'PASS' })
})

it('does not invent item-N labels when keys are missing', async () => {
mockStateItems({ items: [{ status: 'unknown' }] })

const { items } = await fetchStateItems('runtime')

expect(items[0].key).toBe('(missing key 0)')
})
})
82 changes: 71 additions & 11 deletions console/packages/console-frontend/src/api/state/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,74 @@ export interface StateGroup {
// State Functions (used functions only)
// ============================================================================

function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === 'object' && !Array.isArray(value)
}

function nonEmptyString(value: unknown): string | null {
return typeof value === 'string' && value.length > 0 ? value : null
}

function stateItemValue(item: Record<string, unknown>): unknown {
if ('value' in item && (nonEmptyString(item.key) || nonEmptyString(item.state_key))) {
return item.value
Comment on lines +33 to +35

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Unwrap value consistently when id is used as the key.

Line 63 accepts item.id as a key source, but Line 34 only unwraps item.value for key/state_key. { id: 'worker', value: {...} } would render the wrapper object instead of the actual state value.

Proposed fix
 function stateItemValue(item: Record<string, unknown>): unknown {
-  if ('value' in item && (nonEmptyString(item.key) || nonEmptyString(item.state_key))) {
+  if (
+    'value' in item &&
+    (nonEmptyString(item.key) || nonEmptyString(item.state_key) || nonEmptyString(item.id))
+  ) {
     return item.value
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function stateItemValue(item: Record<string, unknown>): unknown {
if ('value' in item && (nonEmptyString(item.key) || nonEmptyString(item.state_key))) {
return item.value
function stateItemValue(item: Record<string, unknown>): unknown {
if (
'value' in item &&
(nonEmptyString(item.key) || nonEmptyString(item.state_key) || nonEmptyString(item.id))
) {
return item.value
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@console/packages/console-frontend/src/api/state/state.ts` around lines 33 -
35, The stateItemValue function only unwraps item.value when item.key or
item.state_key are non-empty strings, but the code elsewhere accepts item.id as
a valid key source. Update the condition in the stateItemValue function to also
check nonEmptyString(item.id) alongside the existing checks for item.key and
item.state_key, so that the value is properly unwrapped regardless of which key
source is used.

}

return item
}

function makeStateItem(groupId: string, key: string, value: unknown): StateItem {
return {
groupId,
key,
value,
type: typeof value === 'object' ? 'object' : typeof value,
timestamp: Date.now(),
}
}

function fallbackKey(index: number): string {
return `(missing key ${index})`
}

function isGeneratedItemKey(key: string): boolean {
return /^item-\d+$/.test(key)
}

function normalizeStateItem(groupId: string, item: unknown, index: number): StateItem {
if (Array.isArray(item) && item.length >= 2) {
const key = nonEmptyString(item[0])
if (key) return makeStateItem(groupId, key, item[1])
}

if (isRecord(item)) {
const key =
nonEmptyString(item.key) ?? nonEmptyString(item.state_key) ?? nonEmptyString(item.id)
return makeStateItem(groupId, key ?? fallbackKey(index), stateItemValue(item))
}

return makeStateItem(groupId, fallbackKey(index), item)
}

function normalizeStateItems(groupId: string, rawItems: unknown): StateItem[] {
if (Array.isArray(rawItems)) {
return rawItems.map((item, index) => normalizeStateItem(groupId, item, index))
}

if (isRecord(rawItems)) {
return Object.entries(rawItems).map(([key, value], index) => {
const embeddedKey = isRecord(value)
? (nonEmptyString(value.key) ?? nonEmptyString(value.state_key) ?? nonEmptyString(value.id))
: null
const entryKey = key && !isGeneratedItemKey(key) ? key : null
const itemValue = isRecord(value) ? stateItemValue(value) : value
return makeStateItem(groupId, embeddedKey ?? entryKey ?? fallbackKey(index), itemValue)
})
}

return []
}

export async function fetchStateItems(
groupId: string,
): Promise<{ items: StateItem[]; count: number }> {
Expand All @@ -31,17 +99,9 @@ export async function fetchStateItems(
body: JSON.stringify({ scope: groupId }),
})
if (!res.ok) throw new Error('Failed to fetch state items')
const data = await unwrapResponse<{ items: unknown[] }>(res)
const items: StateItem[] = (data.items || []).map((item: unknown, index: number) => {
const typedItem = item as Record<string, unknown>
return {
groupId,
key: (typedItem.id as string) || `item-${index}`,
value: item,
type: typeof item === 'object' ? 'object' : typeof item,
timestamp: Date.now(),
}
})
const data = await unwrapResponse<{ items?: unknown } | Record<string, unknown> | unknown[]>(res)
const rawItems = Array.isArray(data) || !isRecord(data) || !('items' in data) ? data : data.items
const items = normalizeStateItems(groupId, rawItems)
return { items, count: items.length }
}

Expand Down
10 changes: 10 additions & 0 deletions engine/src/builtins/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,16 @@ impl BuiltinKvStore {
.map_or(vec![], |topic| topic.values().cloned().collect())
}

pub async fn list_entries(&self, index: String) -> Vec<(String, Value)> {
let store = self.store.read().await;
store.get(&index).map_or(vec![], |topic| {
topic
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
})
}

pub async fn list_groups(&self) -> Vec<String> {
let store = self.store.read().await;
store.keys().cloned().collect()
Expand Down
78 changes: 74 additions & 4 deletions engine/src/workers/state/adapters/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
registry::{StateAdapterFuture, StateAdapterRegistration},
structs::{
StateDeleteInput, StateGetGroupInput, StateGetInput, StateListGroupsInput,
StateSetInput, StateUpdateInput,
StateListItem, StateSetInput, StateUpdateInput,
},
},
};
Expand All @@ -38,6 +38,77 @@ impl BridgeAdapter {
}
}

fn decode_list_item(item: Value, index: usize) -> StateListItem {
match item {
Value::Array(mut tuple) if tuple.len() >= 2 => {
if let Some(key) = tuple.first().and_then(Value::as_str).map(str::to_string) {
let value = tuple.swap_remove(1);
return StateListItem { key, value };
}
StateListItem {
key: format!("(missing key {index})"),
value: Value::Array(tuple),
}
}
Value::Object(mut object) => {
let key = object
.get("key")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
object
.get("state_key")
.and_then(Value::as_str)
.map(str::to_string)
});
match key {
Some(key) => {
let value = object
.remove("value")
.unwrap_or_else(|| Value::Object(object));
StateListItem { key, value }
}
None => StateListItem {
key: format!("(missing key {index})"),
value: Value::Object(object),
},
}
}
value => StateListItem {
key: format!("(missing key {index})"),
value,
},
}
}

fn decode_list_result(result: Value) -> anyhow::Result<Vec<StateListItem>> {
if let Ok(items) = serde_json::from_value::<Vec<StateListItem>>(result.clone()) {
return Ok(items);
}

let result = result.get("items").cloned().unwrap_or(result);

if let Some(object) = result.as_object() {
return Ok(object
.iter()
.map(|(key, value)| StateListItem {
key: key.clone(),
value: value.clone(),
})
.collect());
Comment on lines +89 to +98

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Handle top-level "items" without shadowing a real state key.

Line 89 unwraps result["items"] before checking whether the whole payload is already a plain key/value map. That drops or relabels legitimate state entries whose actual key is "items", which breaks the “preserve semantic keys” contract for arbitrary scopes. Please only treat items as an envelope when the response shape is unambiguously wrapped.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/state/adapters/bridge.rs` around lines 89 - 98, In the
state list conversion logic in bridge.rs, the current unwrapping of
result["items"] in the bridge adapter can incorrectly treat a real state entry
named "items" as an envelope. Update the result handling in the state listing
path (around the object-to-StateListItem mapping) so that "items" is only
unwrapped when the payload is clearly wrapped, and otherwise preserve the
original top-level key/value map unchanged. Keep the existing StateListItem
conversion behavior, but make the envelope detection explicit before accessing
the items field.

}

if let Value::Array(items) = result {
return Ok(items
.into_iter()
.enumerate()
.map(|(index, item)| decode_list_item(item, index))
.collect());
}

anyhow::bail!("invalid state::list response: expected array, object, or items field")
}

#[async_trait]
impl StateAdapter for BridgeAdapter {
async fn update(
Expand Down Expand Up @@ -130,7 +201,7 @@ impl StateAdapter for BridgeAdapter {
Ok(())
}

async fn list(&self, scope: &str) -> anyhow::Result<Vec<Value>> {
async fn list(&self, scope: &str) -> anyhow::Result<Vec<StateListItem>> {
let data = StateGetGroupInput {
scope: scope.to_string(),
};
Expand All @@ -146,8 +217,7 @@ impl StateAdapter for BridgeAdapter {
.await
.map_err(|e| anyhow::anyhow!("Failed to list values via bridge: {}", e))?;

serde_json::from_value::<Vec<Value>>(result)
.map_err(|e| anyhow::anyhow!("Failed to deserialize list result: {}", e))
decode_list_result(result)
}

async fn list_groups(&self) -> anyhow::Result<Vec<String>> {
Expand Down
21 changes: 17 additions & 4 deletions engine/src/workers/state/adapters/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
workers::state::{
adapters::StateAdapter,
registry::{StateAdapterFuture, StateAdapterRegistration},
structs::StateListItem,
},
};

Expand Down Expand Up @@ -70,8 +71,14 @@ impl StateAdapter for BuiltinKvStoreAdapter {
.await)
}

async fn list(&self, scope: &str) -> anyhow::Result<Vec<Value>> {
Ok(self.storage.list(scope.to_string()).await)
async fn list(&self, scope: &str) -> anyhow::Result<Vec<StateListItem>> {
Ok(self
.storage
.list_entries(scope.to_string())
.await
.into_iter()
.map(|(key, value)| StateListItem { key, value })
.collect())
}

async fn list_groups(&self) -> anyhow::Result<Vec<String>> {
Expand Down Expand Up @@ -163,8 +170,14 @@ mod tests {
.await
.expect("List should succeed");
assert_eq!(list.len(), 2);
assert!(list.contains(&data1));
assert!(list.contains(&data2));
assert!(list.contains(&StateListItem {
key: item1_id.to_string(),
value: data1
}));
assert!(list.contains(&StateListItem {
key: item2_id.to_string(),
value: data2
}));
}

#[tokio::test]
Expand Down
4 changes: 3 additions & 1 deletion engine/src/workers/state/adapters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use async_trait::async_trait;
use iii_sdk::{UpdateOp, UpdateResult, types::SetResult};
use serde_json::Value;

use super::structs::StateListItem;

#[async_trait]
pub trait StateAdapter: Send + Sync {
async fn set(&self, scope: &str, key: &str, value: Value) -> anyhow::Result<SetResult>;
Expand All @@ -23,7 +25,7 @@ pub trait StateAdapter: Send + Sync {
key: &str,
ops: Vec<UpdateOp>,
) -> anyhow::Result<UpdateResult>;
async fn list(&self, scope: &str) -> anyhow::Result<Vec<Value>>;
async fn list(&self, scope: &str) -> anyhow::Result<Vec<StateListItem>>;
async fn list_groups(&self) -> anyhow::Result<Vec<String>>;
async fn destroy(&self) -> anyhow::Result<()>;

Expand Down
12 changes: 6 additions & 6 deletions engine/src/workers/state/adapters/redis_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
state::{
adapters::StateAdapter,
registry::{StateAdapterFuture, StateAdapterRegistration},
structs::StateListItem,
},
},
};
Expand Down Expand Up @@ -187,7 +188,7 @@ impl StateAdapter for StateRedisAdapter {
Ok(())
}

async fn list(&self, scope: &str) -> anyhow::Result<Vec<Value>> {
async fn list(&self, scope: &str) -> anyhow::Result<Vec<StateListItem>> {
let scope_key = format!("state:{}", scope);
let mut conn = self.publisher.lock().await;

Expand All @@ -197,11 +198,10 @@ impl StateAdapter for StateRedisAdapter {
.map_err(|e| anyhow::anyhow!("Failed to get group from Redis: {}", e))?;

let mut result = Vec::new();
for v in values.into_values() {
result.push(
serde_json::from_str(&v)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?,
);
for (key, v) in values {
let value = serde_json::from_str(&v)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
result.push(StateListItem { key, value });
}
Ok(result)
}
Expand Down
Loading
Loading