This document provides a comprehensive reference for the Pocket API. For auto-generated API documentation from source code, see the embedded section below.
Pocket provides a clean, idiomatic Go API for building decision graphs and workflows. The API is designed around interfaces for maximum flexibility and composability.
import "github.com/agentstation/pocket"Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure.
Key features:
- Small, composable interfaces
- Type-safe operations with generics
- Built-in concurrency patterns
- Functional options for configuration
- Zero external dependencies in core
Basic usage:
// Create a simple processor
greet := pocket.ProcessorFunc(func(ctx context.Context, input any) (any, error) {
name := input.(string)
return fmt.Sprintf("Hello, %s!", name), nil
})
// Create a node
node := pocket.NewNode[any, any]("greeter", greet)
// Create and run a graph
store := pocket.NewStore()
graph := pocket.NewGraph(node, store)
result, err := graph.Run(context.Background(), "World")
Building complex graphs:
// Use the builder API
builder := pocket.NewBuilder(store).
Add(pocket.NewNode[any, any]("fetch", fetchData)).
Add(pocket.NewNode[any, any]("process", processData)).
Add(pocket.NewNode[any, any]("save", saveData)).
Connect("fetch", "success", "process").
Connect("process", "success", "save").
Start("fetch")
graph, err := builder.Build()
Concurrent patterns:
// Fan-out processing
results, err := pocket.FanOut(ctx, processNode, store, items)
// Pipeline
result, err := pocket.Pipeline(ctx, nodes, store, input)
// Concurrent execution
results, err := pocket.RunConcurrent(ctx, nodes, store)
Type-safe operations:
// Create a typed store
userStore := pocket.NewTypedStore[User](store)
// Type-safe get/set
user, exists, err := userStore.Get(ctx, "user:123")
err = userStore.Set(ctx, "user:123", newUser)
Package pocket provides a minimalist framework for building LLM workflows using composable nodes in a directed graph structure with Prep/Exec/Post lifecycle.
Type Safety: The framework provides three levels of type safety for workflow validation:
-
Compile-time: Generic node creation with NewNode[In, Out] enforces type consistency within nodes. The Go compiler checks function signatures when using the generic WithExec, WithPrep, etc. with typed nodes.
-
Initialization-time: ValidateFlow checks type compatibility across the entire workflow graph before execution begins. This catches type mismatches between connected nodes.
-
Runtime: Type assertions in lifecycle functions ensure data integrity during execution. These are minimized when using typed nodes.
The goal is to verify type safety of the workflow graph as early as possible, catching errors before any workflow execution begins.
Example (Lifecycle)
Example_lifecycle demonstrates the full Prep/Exec/Post lifecycle.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
// Create a node that uses all three steps
processor := pocket.NewNode[any, any]("processor",
pocket.Steps{
Prep: func(ctx context.Context, store pocket.StoreReader, input any) (any, error) {
// Prepare: validate and transform input
data := input.(map[string]int)
if len(data) == 0 {
return nil, fmt.Errorf("empty data")
}
return data, nil
},
Exec: func(ctx context.Context, data any) (any, error) {
// Execute: calculate sum
m := data.(map[string]int)
sum := 0
for _, v := range m {
sum += v
}
return sum, nil
},
Post: func(ctx context.Context, store pocket.StoreWriter, input, data, sum any) (any, string, error) {
// Post: decide routing based on result
total := sum.(int)
if total > 100 {
return fmt.Sprintf("High total: %d", total), "high", nil
}
return fmt.Sprintf("Low total: %d", total), "low", nil
},
},
)
store := pocket.NewStore()
graph := pocket.NewGraph(processor, store)
result, err := graph.Run(context.Background(), map[string]int{
"a": 10,
"b": 20,
"c": 30,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
}Low total: 60
- Variables
- func FanOut[T any](ctx context.Context, node Node, store Store, items []T) ([]any, error)
- func Pipeline(ctx context.Context, nodes []Node, store Store, input any) (any, error)
- func ResetDefaults()
- func RunConcurrent(ctx context.Context, nodes []Node, store Store, inputs []any) ([]any, error)
- func SetDefaultExec(fn ExecFunc)
- func SetDefaultPost(fn PostFunc)
- func SetDefaultPrep(fn PrepFunc)
- func SetDefaults(opts ...Option)
- func ValidateGraph(start Node) error
- type Builder
- type ExecFunc
- type FallbackFunc
- type FanIn
- type Graph
- type GraphOption
- type Logger
- type Node
- type Option
- func WithErrorHandler(handler func(error)) Option
- func WithExec[In, Out any](fn func(ctx context.Context, input In) (Out, error)) Option
- func WithOnComplete(fn func(ctx context.Context, store StoreWriter)) Option
- func WithOnFailure(fn func(ctx context.Context, store StoreWriter, err error)) Option
- func WithOnSuccess[Out any](fn func(ctx context.Context, store StoreWriter, output Out)) Option
- func WithPost[In, Out any](fn func(ctx context.Context, store StoreWriter, input In, prepResult any, execResult Out) (Out, string, error)) Option
- func WithPrep[In any](fn func(ctx context.Context, store StoreReader, input In) (any, error)) Option
- func WithRetry(maxRetries int, delay time.Duration) Option
- func WithTimeout(timeout time.Duration) Option
- type PostFunc
- type PrepFunc
- type Steps
- type Store
- type StoreOption
- type StoreReader
- type StoreWriter
- type Tracer
- type TypedStore
var (
// ErrNoStartNode is returned when a graph has no start node defined.
ErrNoStartNode = errors.New("pocket: no start node defined")
// ErrNodeNotFound is returned when a referenced node doesn't exist.
ErrNodeNotFound = errors.New("pocket: node not found")
// ErrInvalidInput is returned when input type doesn't match expected type.
ErrInvalidInput = errors.New("pocket: invalid input type")
)func FanOut
func FanOut[T any](ctx context.Context, node Node, store Store, items []T) ([]any, error)FanOut executes a node for each input item concurrently.
Example
ExampleFanOut demonstrates parallel processing of items.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
// Create a processor that simulates work
processor := pocket.NewNode[any, any]("process",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
num := input.(int)
return num * num, nil
},
},
)
store := pocket.NewStore()
items := []int{1, 2, 3, 4, 5}
// Process items concurrently
results, err := pocket.FanOut(context.Background(), processor, store, items)
if err != nil {
log.Fatal(err)
}
// Results maintain order
for i, result := range results {
fmt.Printf("%d -> %v\n", items[i], result)
}
}1 -> 1
2 -> 4
3 -> 9
4 -> 16
5 -> 25
func Pipeline
func Pipeline(ctx context.Context, nodes []Node, store Store, input any) (any, error)Pipeline executes nodes sequentially, passing output to input.
Example
ExamplePipeline demonstrates sequential processing.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
store := pocket.NewStore()
// Create a pipeline of transformations
double := pocket.NewNode[any, any]("double",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return input.(int) * 2, nil
},
},
)
addTen := pocket.NewNode[any, any]("addTen",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return input.(int) + 10, nil
},
},
)
toString := pocket.NewNode[any, any]("toString",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Result: %d", input.(int)), nil
},
},
)
nodes := []pocket.Node{double, addTen, toString}
result, err := pocket.Pipeline(context.Background(), nodes, store, 5)
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
}Result: 20
func ResetDefaults
func ResetDefaults()ResetDefaults resets all global defaults to their initial values.
func RunConcurrent
func RunConcurrent(ctx context.Context, nodes []Node, store Store, inputs []any) ([]any, error)RunConcurrent executes multiple nodes concurrently.
func SetDefaultExec
func SetDefaultExec(fn ExecFunc)SetDefaultExec sets the global default exec function.
func SetDefaultPost
func SetDefaultPost(fn PostFunc)SetDefaultPost sets the global default post function.
func SetDefaultPrep
func SetDefaultPrep(fn PrepFunc)SetDefaultPrep sets the global default prep function.
func SetDefaults
func SetDefaults(opts ...Option)SetDefaults configures global defaults for all nodes.
func ValidateGraph
func ValidateGraph(start Node) errorValidateGraph provides initialization-time type safety by validating the entire workflow graph.
Type validation process:
- Traverses the graph starting from the given node using depth-first search
- For each connection between nodes, verifies type compatibility: - Source node's OutputType must be assignable to target node's InputType - Interface satisfaction is checked (e.g., concrete type implements interface) - Untyped nodes (InputType/OutputType = nil) are skipped but successors are validated
- Returns detailed error messages identifying the exact type mismatch location
This is a critical part of the type safety system, catching errors before any workflow execution begins. It complements compile-time checks by validating the connections between nodes.
Type compatibility rules:
- Exact type match: string -> string ✓
- Interface satisfaction: ConcreteType -> Interface (if implements) ✓
- Any type: any -> ConcreteType ✓ (but loses compile-time safety)
- Assignability: Uses Go's reflect.Type.AssignableTo for compatibility
Example:
// Build your workflow
validator := NewNode[User, ValidationResult]("validator", ...)
processor := NewNode[ValidationResult, Response]("processor", ...)
validator.Connect("valid", processor)
// Validate before execution - catches type mismatches early
if err := ValidateGraph(validator); err != nil {
// Error: "type mismatch: node 'validator' outputs ValidationResult
// but node 'wrongNode' expects User (via action 'valid')"
log.Fatal(err)
}
// Safe to execute - types are verified
graph := NewGraph(validator, store)
result, err := graph.Run(ctx, user)
type Builder
Builder provides a fluent API for constructing graphs.
type Builder struct {
// contains filtered or unexported fields
}Example
ExampleBuilder demonstrates the fluent builder API.
store := pocket.NewStore()
// Define nodes with lifecycle
validate := pocket.NewNode[any, any]("validate",
pocket.Steps{
Prep: func(ctx context.Context, store pocket.StoreReader, input any) (any, error) {
email, ok := input.(string)
if !ok {
return nil, fmt.Errorf("expected string")
}
return email, nil
},
Exec: func(ctx context.Context, email any) (any, error) {
if !strings.Contains(email.(string), "@") {
return nil, fmt.Errorf("invalid email")
}
return email, nil
},
Post: func(ctx context.Context, store pocket.StoreWriter, input, prep, result any) (any, string, error) {
return result, defaultRoute, nil
},
},
)
normalize := pocket.NewNode[any, any]("normalize",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
email := input.(string)
return strings.ToLower(strings.TrimSpace(email)), nil
},
},
)
// Build the graph
graph, err := pocket.NewBuilder(store).
Add(validate).
Add(normalize).
Connect("validate", "default", "normalize").
Start("validate").
Build()
if err != nil {
log.Fatal(err)
}
result, err := graph.Run(context.Background(), " USER@EXAMPLE.COM ")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
// Output: user@example.comuser@example.com
func NewBuilder
func NewBuilder(store Store) *BuilderNewBuilder creates a new graph builder.
func (*Builder) Add
func (b *Builder) Add(node Node) *BuilderAdd registers a node in the graph.
func (*Builder) Build
func (b *Builder) Build() (*Graph, error)Build creates the graph.
func (*Builder) Connect
func (b *Builder) Connect(from, action, to string) *BuilderConnect creates a connection between nodes.
func (*Builder) Start
func (b *Builder) Start(name string) *BuilderStart sets the starting node.
func (*Builder) WithOptions
func (b *Builder) WithOptions(opts ...GraphOption) *BuilderWithOptions adds graph options.
type ExecFunc
ExecFunc performs the main processing logic without store access.
type ExecFunc func(ctx context.Context, prepResult any) (execResult any, err error)type FallbackFunc
FallbackFunc handles errors from the Exec step using the prepared data. It receives the prepResult (like Exec) and the error from the failed Exec. Like ExecFunc, it has no store access to maintain purity.
type FallbackFunc func(ctx context.Context, prepResult any, execErr error) (fallbackResult any, err error)type FanIn
FanIn collects results from multiple sources.
type FanIn struct {
// contains filtered or unexported fields
}func NewFanIn
func NewFanIn(combine func([]any) (any, error), sources ...Node) *FanInNewFanIn creates a fan-in pattern.
func (*FanIn) Run
func (f *FanIn) Run(ctx context.Context, store Store) (any, error)Run executes the fan-in pattern.
type Graph
Graph is the public handle to a graph for backward compatibility.
type Graph struct {
// contains filtered or unexported fields
}func NewGraph
func NewGraph(start Node, store Store, opts ...GraphOption) *GraphNewGraph creates a new graph starting from the given node.
func (*Graph) AsNode
func (g *Graph) AsNode(name string) NodeAsNode returns the graph as a Node interface. Since graph already implements Node, we just return it. This method exists for backward compatibility.
func (*Graph) Run
func (g *Graph) Run(ctx context.Context, input any) (output any, err error)Run executes the graph with the given input.
type GraphOption
GraphOption configures a Graph.
type GraphOption func(*graphOptions)func WithLogger
func WithLogger(logger Logger) GraphOptionWithLogger adds logging to the graph.
func WithTracer
func WithTracer(tracer Tracer) GraphOptionWithTracer adds distributed tracing.
type Logger
Logger provides structured logging.
type Logger interface {
Debug(ctx context.Context, msg string, keysAndValues ...any)
Info(ctx context.Context, msg string, keysAndValues ...any)
Error(ctx context.Context, msg string, keysAndValues ...any)
}type Node
Node is the core interface for all execution units in a workflow. Both simple nodes and graphs implement this interface.
type Node interface {
// Name returns the node's identifier.
Name() string
// Lifecycle methods for the Prep/Exec/Post pattern.
Prep(ctx context.Context, store StoreReader, input any) (prepResult any, err error)
Exec(ctx context.Context, prepResult any) (execResult any, err error)
Post(ctx context.Context, store StoreWriter, input, prepResult, execResult any) (output any, next string, err error)
// Connect adds a successor node for the given action.
Connect(action string, next Node) Node
// Successors returns all connected nodes.
Successors() map[string]Node
// Type information for validation (optional).
InputType() reflect.Type
OutputType() reflect.Type
}Example
ExampleNode demonstrates using the Prep/Exec/Post lifecycle.
// Create a node with lifecycle steps
uppercase := pocket.NewNode[any, any]("uppercase",
pocket.Steps{
Prep: func(ctx context.Context, store pocket.StoreReader, input any) (any, error) {
// Validate input is a string
text, ok := input.(string)
if !ok {
return nil, fmt.Errorf("expected string, got %T", input)
}
return text, nil
},
Exec: func(ctx context.Context, text any) (any, error) {
// Transform to uppercase
return strings.ToUpper(text.(string)), nil
},
Post: func(ctx context.Context, store pocket.StoreWriter, input, text, result any) (any, string, error) {
// Return result and routing
return result, doneRoute, nil
},
},
)
store := pocket.NewStore()
graph := pocket.NewGraph(uppercase, store)
result, err := graph.Run(context.Background(), "hello world")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
// Output: HELLO WORLDHELLO WORLD
Example (Routing)
ExampleNode_routing demonstrates conditional routing between nodes.
package main
import (
"context"
"fmt"
"github.com/agentstation/pocket"
)
func main() {
store := pocket.NewStore()
// Router node that checks input
router := pocket.NewNode[any, any]("router",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return input, nil
},
Post: func(ctx context.Context, store pocket.StoreWriter, input, prep, result any) (any, string, error) {
value := result.(int)
if value > 100 {
return result, "large", nil
}
return result, "small", nil
},
},
)
// Handler nodes
largeHandler := pocket.NewNode[any, any]("large",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Large number: %v", input), nil
},
},
)
smallHandler := pocket.NewNode[any, any]("small",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
return fmt.Sprintf("Small number: %v", input), nil
},
},
)
// Connect nodes
router.Connect("large", largeHandler)
router.Connect("small", smallHandler)
// Run with different inputs
graph := pocket.NewGraph(router, store)
result1, _ := graph.Run(context.Background(), 50)
result2, _ := graph.Run(context.Background(), 150)
fmt.Println(result1)
fmt.Println(result2)
}Small number: 50
Large number: 150
func Default
func Default(n, next Node) NodeDefault is a helper function to connect to the default next node.
func NewNode
func NewNode[In, Out any](name string, steps Steps, opts ...Option) NodeNewNode creates a new node with optional compile-time type safety.
Type parameters:
- In: The expected input type for this node (use 'any' for dynamic typing)
- Out: The output type this node produces (use 'any' for dynamic typing)
Parameters:
- name: The node's identifier
- steps: The lifecycle functions (Prep, Exec, Post) - all fields are optional
- opts: Additional options like retry, timeout, error handlers, etc.
Type safety mechanism:
-
Compile-time: When In/Out are not 'any', the node stores type information for validation. Using generic options like WithExec, WithPrep ensures function signatures match the declared types at compile time.
-
Initialization-time: Call ValidateFlow on your start node to verify the entire workflow graph has compatible types between connected nodes.
-
Runtime: When using regular options (WithExec, WithPrep) with typed nodes, the framework automatically wraps functions to ensure type safety.
Examples:
// Typed node - enables full type checking across the workflow
validator := NewNode[User, ValidationResult]("validator",
Steps{
Exec: func(ctx context.Context, user any) (any, error) {
// Type assertions handled by the framework
return ValidationResult{Valid: true}, nil
},
},
WithRetry(3, time.Second),
)
// Untyped node - no compile-time checks (explicit [any, any] encourages adding types)
processor := NewNode[any, any]("processor",
Steps{
Prep: prepFunc,
Exec: execFunc,
Post: postFunc,
},
)
type Option
Option configures a Node.
type Option func(*nodeOptions)func WithErrorHandler
func WithErrorHandler(handler func(error)) OptionWithErrorHandler sets a custom error handler.
func WithExec
func WithExec[In, Out any](fn func(ctx context.Context, input In) (Out, error)) OptionWithExec sets the execution function with type safety. The types In and Out should match the node's types when used with NewNode[In, Out]. For dynamic typing, use WithExec[any, any]. Exec functions do not have store access to enforce pure business logic.
func WithOnComplete
func WithOnComplete(fn func(ctx context.Context, store StoreWriter)) OptionWithOnComplete sets a cleanup hook that always runs after execution. The store parameter provides full read-write access for cleanup operations.
func WithOnFailure
func WithOnFailure(fn func(ctx context.Context, store StoreWriter, err error)) OptionWithOnFailure sets a cleanup hook that runs after failed execution. The store parameter provides full read-write access for cleanup operations.
func WithOnSuccess
func WithOnSuccess[Out any](fn func(ctx context.Context, store StoreWriter, output Out)) OptionWithOnSuccess sets a cleanup hook that runs after successful execution. The type Out should match the node's output type when used with NewNode[In, Out]. For dynamic typing, use WithOnSuccess[any]. The store parameter provides full read-write access for cleanup operations.
func WithPost
func WithPost[In, Out any](fn func(ctx context.Context, store StoreWriter, input In, prepResult any, execResult Out) (Out, string, error)) OptionWithPost sets the post-processing function with type safety. The types In and Out should match the node's types when used with NewNode[In, Out]. Post functions have access to all step results and determine routing. For dynamic typing, use WithPost[any, any]. The store parameter provides full read-write access for state mutations.
func WithPrep
func WithPrep[In any](fn func(ctx context.Context, store StoreReader, input In) (any, error)) OptionWithPrep sets the preparation function with type safety. The input type In should match the node's input type when used with NewNode[In, Out]. For dynamic typing, use WithPrep[any]. The store parameter provides read-only access to enforce Prep step semantics.
func WithRetry
func WithRetry(maxRetries int, delay time.Duration) OptionWithRetry configures retry behavior.
Example
ExampleWithRetry demonstrates retry configuration.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/agentstation/pocket"
)
func main() {
attempts := 0
// Create a node that fails twice before succeeding
flaky := pocket.NewNode[any, any]("flaky",
pocket.Steps{
Exec: func(ctx context.Context, input any) (any, error) {
attempts++
if attempts < 3 {
return nil, fmt.Errorf("temporary failure %d", attempts)
}
return "success", nil
},
},
pocket.WithRetry(2, 10*time.Millisecond), // Retry up to 2 times
)
store := pocket.NewStore()
graph := pocket.NewGraph(flaky, store)
result, err := graph.Run(context.Background(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Result after %d attempts: %v\n", attempts, result)
}Result after 3 attempts: success
func WithTimeout
func WithTimeout(timeout time.Duration) OptionWithTimeout sets execution timeout.
type PostFunc
PostFunc processes results and determines routing with full store access.
type PostFunc func(ctx context.Context, store StoreWriter, input, prepResult, execResult any) (output any, next string, err error)type PrepFunc
PrepFunc prepares data before execution with read-only store access.
type PrepFunc func(ctx context.Context, store StoreReader, input any) (prepResult any, err error)type Steps
Steps groups the lifecycle functions for a node. All fields are optional - if not provided, default implementations will be used.
type Steps struct {
// Prep prepares data before execution with read-only store access.
Prep PrepFunc
// Exec performs the main processing logic without store access.
Exec ExecFunc
// Fallback handles Exec errors with the prepared data.
// Like Exec, it receives prepResult and has no store access.
Fallback FallbackFunc
// Post processes results and determines routing with full store access.
Post PostFunc
}type Store
Store provides thread-safe storage for shared state.
type Store interface {
// Get retrieves a value by key.
Get(ctx context.Context, key string) (value any, exists bool)
// Set stores a value with the given key.
Set(ctx context.Context, key string, value any) error
// Delete removes a key from the store.
Delete(ctx context.Context, key string) error
// Scope returns a new store with the given prefix.
Scope(prefix string) Store
}func NewStore
func NewStore(opts ...StoreOption) StoreNewStore creates a new thread-safe store with optional configuration.
type StoreOption
StoreOption configures a store.
type StoreOption func(*storeConfig)func WithEvictionCallback
func WithEvictionCallback(fn func(key string, value any)) StoreOptionWithEvictionCallback sets a callback for when entries are evicted.
func WithMaxEntries
func WithMaxEntries(maxEntries int) StoreOptionWithMaxEntries sets the maximum number of entries in the store. When exceeded, the least recently used entry is evicted.
func WithTTL
func WithTTL(ttl time.Duration) StoreOptionWithTTL sets the time-to-live for entries. Entries older than the TTL are automatically removed.
type StoreReader
StoreReader provides read-only access to the store. Used in the Prep step to enforce read-only semantics.
type StoreReader interface {
// Get retrieves a value by key.
Get(ctx context.Context, key string) (value any, exists bool)
// Scope returns a new store with the given prefix.
Scope(prefix string) Store
}type StoreWriter
StoreWriter provides full read-write access to the store. Used in the Post step for state mutations.
type StoreWriter interface {
Store
}type Tracer
Tracer provides distributed tracing capabilities.
type Tracer interface {
StartSpan(ctx context.Context, name string) (context.Context, func())
}type TypedStore
TypedStore provides type-safe storage operations.
type TypedStore[T any] interface {
Get(ctx context.Context, key string) (T, bool, error)
Set(ctx context.Context, key string, value T) error
Delete(ctx context.Context, key string) error
}Example
ExampleTypedStore demonstrates type-safe storage.
package main
import (
"context"
"fmt"
"log"
"github.com/agentstation/pocket"
)
func main() {
type User struct {
ID string
Name string
}
// Create a typed store
store := pocket.NewStore()
userStore := pocket.NewTypedStore[User](store)
ctx := context.Background()
// Store a user
user := User{ID: "123", Name: "Alice"}
err := userStore.Set(ctx, "user:123", user)
if err != nil {
log.Fatal(err)
}
// Retrieve with type safety
retrieved, exists, err := userStore.Get(ctx, "user:123")
if err != nil {
log.Fatal(err)
}
if exists {
fmt.Printf("Found user: %+v\n", retrieved)
}
}Found user: {ID:123 Name:Alice}
func NewTypedStore
func NewTypedStore[T any](store Store) TypedStore[T]NewTypedStore creates a type-safe wrapper around a Store.
Generated by gomarkdoc
// Create a simple processor node
node := pocket.NewNode[string, string]("uppercase",
pocket.WithExec(func(ctx context.Context, input string) (string, error) {
return strings.ToUpper(input), nil
}),
)
// Create a graph and run
graph := pocket.NewGraph(node, pocket.NewStore())
result, err := graph.Run(context.Background(), "hello")
// result: "HELLO"// Node with full lifecycle
node := pocket.NewNode[Request, Response]("api-processor",
pocket.WithPrep(func(ctx context.Context, store pocket.StoreReader, req Request) (any, error) {
// Load configuration
config, _ := store.Get(ctx, "api:config")
return map[string]any{
"request": req,
"config": config,
}, nil
}),
pocket.WithExec(func(ctx context.Context, prepData any) (Response, error) {
// Process with prepared data
data := prepData.(map[string]any)
return callAPI(data["request"], data["config"])
}),
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
req Request, prep any, resp Response) (Response, string, error) {
// Save result and route
store.Set(ctx, "last:response", resp)
if resp.Success {
return resp, "success", nil
}
return resp, "retry", nil
}),
)node := pocket.NewNode[Request, Response]("resilient",
pocket.Steps{
Exec: func(ctx context.Context, prepResult any) (any, error) {
// Primary logic that might fail
return riskyOperation(prepResult)
},
Fallback: func(ctx context.Context, prepResult any, err error) (any, error) {
// Fallback receives prepResult (not original input) and the error
log.Printf("Primary failed: %v, using fallback", err)
return safeDefault(prepResult), nil
},
},
pocket.WithRetry(3, time.Second),
)// Create store with bounds
store := pocket.NewStore(
pocket.WithMaxEntries(1000),
pocket.WithTTL(5 * time.Minute),
)
// Step 1: Validate
validate := pocket.NewNode[Order, Order]("validate",
pocket.WithExec(func(ctx context.Context, order Order) (Order, error) {
if err := order.Validate(); err != nil {
return Order{}, err
}
return order, nil
}),
)
// Step 2: Process payment
payment := pocket.NewNode[Order, PaymentResult]("payment",
pocket.WithExec(func(ctx context.Context, order Order) (PaymentResult, error) {
return processPayment(order)
}),
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
order Order, prep any, result PaymentResult) (PaymentResult, string, error) {
if result.Success {
return result, "ship", nil
}
return result, "refund", nil
}),
)
// Step 3: Ship order
ship := pocket.NewNode[PaymentResult, ShipmentInfo]("ship",
pocket.WithExec(func(ctx context.Context, payment PaymentResult) (ShipmentInfo, error) {
return createShipment(payment.OrderID)
}),
)
// Connect workflow
validate.Connect("default", payment)
payment.Connect("ship", ship)
// Build and run
graph := pocket.NewGraph(validate, store)
result, err := graph.Run(context.Background(), Order{ID: "123"})// Process items in parallel
processor := pocket.NewNode[Item, Result]("processor",
pocket.WithExec(func(ctx context.Context, item Item) (Result, error) {
return processItem(item)
}),
)
// Use FanOut for parallel processing
results, err := pocket.FanOut(ctx, processor, store, items,
pocket.WithConcurrency(10),
pocket.WithOrdered(true),
)// Node that maintains state
stateful := pocket.NewNode[Event, Response]("stateful",
pocket.WithPrep(func(ctx context.Context, store pocket.StoreReader, event Event) (any, error) {
// Load current state
state, exists := store.Get(ctx, "process:state")
if !exists {
state = NewState()
}
return map[string]any{
"event": event,
"state": state,
}, nil
}),
pocket.WithExec(func(ctx context.Context, prepData any) (Response, error) {
data := prepData.(map[string]any)
state := data["state"].(State)
event := data["event"].(Event)
// Process with state
return state.Process(event)
}),
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
event Event, prep any, response Response) (Response, string, error) {
// Update state
data := prep.(map[string]any)
state := data["state"].(State)
state.Update(response)
store.Set(ctx, "process:state", state)
return response, "next", nil
}),
)node := pocket.NewNode[Input, Output]("retry-example",
pocket.WithExec(unreliableOperation),
pocket.WithRetryConfig(pocket.RetryConfig{
MaxAttempts: 5,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 10 * time.Second,
Multiplier: 2.0,
Jitter: 0.1,
}),
)// Use fallback as circuit breaker
breaker := pocket.NewNode[Request, Response]("circuit-breaker",
pocket.Steps{
Prep: func(ctx context.Context, store pocket.StoreReader, req Request) (any, error) {
failures, _ := store.Get(ctx, "circuit:failures")
if failures.(int) > 5 {
return nil, errors.New("circuit open")
}
return req, nil
},
Exec: func(ctx context.Context, prepResult any) (any, error) {
return callService(prepResult.(Request))
},
Fallback: func(ctx context.Context, prepResult any, err error) (any, error) {
// Return cached or default response
return getCachedResponse(), nil
},
},
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
req Request, prep any, result any) (any, string, error) {
// Track failures
if _, ok := result.(error); ok {
failures, _ := store.Get(ctx, "circuit:failures")
store.Set(ctx, "circuit:failures", failures.(int)+1)
} else {
store.Set(ctx, "circuit:failures", 0)
}
return result, "next", nil
}),
)router := pocket.NewNode[Message, Message]("router",
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
msg Message, prep, exec any) (Message, string, error) {
switch msg.Type {
case "email":
return msg, "email-handler", nil
case "sms":
return msg, "sms-handler", nil
case "push":
return msg, "push-handler", nil
default:
return msg, "default-handler", nil
}
}),
)
// Connect handlers
router.Connect("email-handler", emailNode)
router.Connect("sms-handler", smsNode)
router.Connect("push-handler", pushNode)
router.Connect("default-handler", defaultNode)conditional := pocket.NewNode[Data, Result]("conditional",
pocket.WithPrep(func(ctx context.Context, store pocket.StoreReader, data Data) (any, error) {
// Check condition in prep
enabled, _ := store.Get(ctx, "feature:enabled")
return map[string]any{
"data": data,
"enabled": enabled.(bool),
}, nil
}),
pocket.WithExec(func(ctx context.Context, prepData any) (Result, error) {
p := prepData.(map[string]any)
if !p["enabled"].(bool) {
return Result{Skipped: true}, nil
}
// Process only if enabled
return processData(p["data"].(Data))
}),
)// Create a sub-workflow
subWorkflow := pocket.NewGraph(subStart, subStore)
// Use directly as a node (Graph implements Node)
mainWorkflow := pocket.NewNode[Input, Output]("main",
pocket.WithPost(func(ctx context.Context, store pocket.StoreWriter,
input Input, prep, result any) (Output, string, error) {
if needsSubWorkflow(result) {
return result.(Output), "sub-workflow", nil
}
return result.(Output), "continue", nil
}),
)
// Connect sub-graph directly
mainWorkflow.Connect("sub-workflow", subWorkflow)func TestNode(t *testing.T) {
store := pocket.NewStore()
ctx := context.Background()
node := pocket.NewNode[string, string]("test",
pocket.WithExec(strings.ToUpper),
)
// Test via graph
graph := pocket.NewGraph(node, store)
result, err := graph.Run(ctx, "hello")
assert.NoError(t, err)
assert.Equal(t, "HELLO", result)
}func TestLifecycle(t *testing.T) {
store := pocket.NewStore()
ctx := context.Background()
// Test prep phase
prepResult, err := node.Prep(ctx, store, input)
assert.NoError(t, err)
// Test exec phase
execResult, err := node.Exec(ctx, prepResult)
assert.NoError(t, err)
// Test post phase
output, next, err := node.Post(ctx, store, input, prepResult, execResult)
assert.NoError(t, err)
assert.Equal(t, "next-node", next)
}type MockStore struct {
data map[string]any
}
func (m *MockStore) Get(ctx context.Context, key string) (any, bool) {
val, ok := m.data[key]
return val, ok
}
func (m *MockStore) Set(ctx context.Context, key string, value any) error {
m.data[key] = value
return nil
}
func TestWithMockStore(t *testing.T) {
mockStore := &MockStore{data: make(map[string]any)}
mockStore.data["config"] = Config{Enabled: true}
node := createNode()
result, err := node.Prep(context.Background(), mockStore, input)
assert.NoError(t, err)
assert.NotNil(t, result)
}// Configure store for high throughput
store := pocket.NewStore(
pocket.WithMaxEntries(100000), // Large capacity
pocket.WithTTL(30 * time.Second), // Short TTL
pocket.WithEvictionCallback(func(key string, value any) {
// Clean up resources
if closer, ok := value.(io.Closer); ok {
closer.Close()
}
}),
)// Process in batches for efficiency
batchProcessor := pocket.NewNode[[]Item, []Result]("batch",
pocket.WithExec(func(ctx context.Context, items []Item) ([]Result, error) {
results := make([]Result, len(items))
// Process batch together (e.g., bulk API call)
bulkResults, err := processBatch(items)
if err != nil {
return nil, err
}
return bulkResults, nil
}),
)// Use scoped stores to isolate data
processNode := pocket.NewNode[Task, Result]("processor",
pocket.WithPrep(func(ctx context.Context, store pocket.StoreReader, task Task) (any, error) {
// Create scoped store for this task
taskStore := store.(pocket.Store).Scope(fmt.Sprintf("task:%s", task.ID))
return map[string]any{
"task": task,
"store": taskStore,
}, nil
}),
pocket.WithExec(func(ctx context.Context, prepData any) (Result, error) {
data := prepData.(map[string]any)
task := data["task"].(Task)
taskStore := data["store"].(pocket.Store)
// Use scoped store for task-specific data
return processWithStore(task, taskStore)
}),
)import "github.com/agentstation/pocket/middleware"
// Add logging to node
node = middleware.WithLogging(logger)(node)// Add metrics collection
node = middleware.WithMetrics(metricsCollector)(node)// Add tracing
node = middleware.WithTracing(tracer)(node)Process multiple items concurrently.
func FanOut(ctx context.Context, processor Node, store Store, items []any) ([]any, error)Parameters:
processor: Node to process each itemstore: Store for the operationitems: Items to process
Returns:
- Results in the same order as inputs
- Error if any processing fails
Aggregate results from multiple sources.
type FanIn struct {
// Internal fields
}
func NewFanIn(aggregator Node, sources ...Node) *FanIn
func (f *FanIn) Run(ctx context.Context, store Store) (any, error)Execute nodes sequentially, passing output to next input.
func Pipeline(ctx context.Context, nodes []Node, store Store, input any) (any, error)Execute multiple nodes concurrently.
func RunConcurrent(ctx context.Context, nodes []Node, store Store) ([]any, error)var (
// No start node defined in graph
ErrNoStartNode = errors.New("pocket: no start node defined")
// Referenced node doesn't exist
ErrNodeNotFound = errors.New("pocket: node not found")
// Input type doesn't match expected type
ErrInvalidInput = errors.New("pocket: invalid input type")
)// Prep phase function
type PrepFunc func(ctx context.Context, store StoreReader, input any) (prepResult any, err error)
// Exec phase function
type ExecFunc func(ctx context.Context, prepResult any) (execResult any, err error)
// Post phase function
type PostFunc func(ctx context.Context, store StoreWriter, input, prepResult, execResult any) (output any, next string, err error)// Check if types are compatible
func IsTypeCompatible(outputType, inputType reflect.Type) bool// Add timeout to context
func WithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
// Add cancellation to context
func WithCancel(parent context.Context) (context.Context, context.CancelFunc)See Middleware Documentation for detailed middleware API.
// Common middleware factories
func WithLogging(logger Logger) Middleware
func WithMetrics(collector MetricsCollector) Middleware
func WithRetry(attempts int, delay time.Duration) Middleware
func WithTimeout(timeout time.Duration) MiddlewareSee Batch Processing Documentation for detailed batch API.
// Map-reduce pattern
func MapReduce[T, R, A any](
extract func(context.Context, Store) ([]T, error),
mapper func(context.Context, T) (R, error),
reducer func(context.Context, []R) (A, error),
opts ...BatchOption,
) Node
// Process items in parallel
func ForEach[T any](
extract func(context.Context, Store) ([]T, error),
process func(context.Context, T) error,
opts ...BatchOption,
) NodeSee YAML Integration Documentation for detailed YAML API.
// Load workflow from YAML
type Loader struct {
// Internal fields
}
func NewLoader() *Loader
func (l *Loader) LoadFile(path string, store Store) (*Graph, error)
func (l *Loader) Load(data []byte, store Store) (*Graph, error)
func (l *Loader) RegisterHandler(name string, handler any) error