Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/gateway_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ func wireExtras(
contextFileInterceptor = tools.NewContextFileInterceptor(stores.Agents, workspace, agentCtxCache, userCtxCache)
}

// 1c. Persistent media storage for cross-turn image/document access
mediaStore, err := media.NewStore(filepath.Join(workspace, ".media"))
// 1c. Persistent media storage for cross-turn image/document access.
// Default is the historical filesystem-backed store under
// {workspace}/.media. Set GOCLAW_MEDIA_BACKEND=s3 (plus the matching
// _S3_* env vars) to switch to S3 without touching call sites — the
// Store façade hides the backend choice from the rest of the code.
mediaCfg := media.LoadConfigFromEnv(filepath.Join(workspace, ".media"))
mediaStore, err := media.NewStoreFromConfig(context.Background(), mediaCfg)
if err != nil {
slog.Warn("media store creation failed, images will not persist across turns", "error", err)
slog.Warn("media store creation failed, images will not persist across turns",
"backend", mediaCfg.Backend, "error", err)
}

// Wire media cleanup on session delete.
Expand Down
38 changes: 38 additions & 0 deletions internal/media/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package media

import (
"context"
"errors"
"io"
)

// Backend persists session-scoped media files. Implementations decide
// where the bytes live — local filesystem, S3, etc. — but must keep the
// same key shape: {sessionHash}/{id}.{ext}.
//
// LocalPath is intentionally part of the contract because most current
// callers want a filesystem path they can hand to ffmpeg, pypdf, or a
// container bind mount. Remote-only backends are expected to cache the
// object locally on first request and return the cache path.
type Backend interface {
// Save persists the file at srcPath under sessionKey and returns the
// generated media ID plus the extension that was applied. The source
// file SHOULD be removed by the backend on success.
Save(ctx context.Context, sessionKey, srcPath, mime string) (id string, ext string, err error)

// Open returns a reader for the bytes of a previously-saved media ID.
// The caller must close the reader.
Open(ctx context.Context, id string) (io.ReadCloser, error)

// LocalPath returns a filesystem path the caller can read directly.
// Remote backends MAY block while fetching the object into a local
// cache. Returns ErrNotFound if the ID is unknown.
LocalPath(ctx context.Context, id string) (string, error)

// Delete removes every media file persisted under sessionKey.
Delete(ctx context.Context, sessionKey string) error
}

// ErrNotFound is returned by Backend implementations when a media ID has
// no corresponding object. Callers can match it with errors.Is.
var ErrNotFound = errors.New("media: not found")
68 changes: 68 additions & 0 deletions internal/media/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package media

import (
"context"
"fmt"
"os"
"strings"
)

// Config selects and parameterises the media Backend. Populate it
// programmatically or via LoadConfigFromEnv.
type Config struct {
// Backend is "fs" (default) or "s3".
Backend string

// FS-only.
BaseDir string

// S3-only.
Bucket string
Prefix string
Region string
Endpoint string // for S3-compatible services (MinIO, R2, DO Spaces)
CacheDir string
}

// LoadConfigFromEnv pulls the standard GOCLAW_MEDIA_* variables out of
// the environment. Sensible defaults: fs backend, baseDir = defaultBase.
func LoadConfigFromEnv(defaultBase string) Config {
cfg := Config{
Backend: strings.ToLower(strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_BACKEND"))),
BaseDir: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_BASEDIR")),
Bucket: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_S3_BUCKET")),
Prefix: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_S3_PREFIX")),
Region: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_S3_REGION")),
Endpoint: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_S3_ENDPOINT")),
CacheDir: strings.TrimSpace(os.Getenv("GOCLAW_MEDIA_S3_CACHE_DIR")),
}
if cfg.Backend == "" {
cfg.Backend = "fs"
}
if cfg.BaseDir == "" {
cfg.BaseDir = defaultBase
}
return cfg
}

// NewBackend builds the Backend selected by cfg.
func NewBackend(ctx context.Context, cfg Config) (Backend, error) {
switch cfg.Backend {
case "", "fs":
return NewFSBackend(cfg.BaseDir)
case "s3":
return NewS3Backend(ctx, cfg)
default:
return nil, fmt.Errorf("media: unknown backend %q (want fs|s3)", cfg.Backend)
}
}

// NewStoreFromConfig is the convenience entry point for code that wants
// a *Store (the historical façade) instead of a bare Backend.
func NewStoreFromConfig(ctx context.Context, cfg Config) (*Store, error) {
b, err := NewBackend(ctx, cfg)
if err != nil {
return nil, err
}
return NewStoreWithBackend(b), nil
}
115 changes: 115 additions & 0 deletions internal/media/fs_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package media

import (
"context"
"crypto/sha256"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"

"github.com/google/uuid"
)

// FSBackend stores media files on the local filesystem under
// {baseDir}/{sessionHash}/{id}.{ext}. It is the historical and default
// implementation of Backend; behaviour is preserved exactly from the
// pre-refactor media.Store so existing deployments see no change.
type FSBackend struct {
baseDir string
}

// NewFSBackend creates an FSBackend rooted at baseDir. The directory is
// created if it doesn't exist.
func NewFSBackend(baseDir string) (*FSBackend, error) {
if err := os.MkdirAll(baseDir, 0755); err != nil {
return nil, fmt.Errorf("media fs: create base dir: %w", err)
}
return &FSBackend{baseDir: baseDir}, nil
}

func (b *FSBackend) Save(_ context.Context, sessionKey, srcPath, mime string) (string, string, error) {
dir := b.sessionDir(sessionKey)
if err := os.MkdirAll(dir, 0755); err != nil {
return "", "", fmt.Errorf("media fs: create session dir: %w", err)
}

mediaID := uuid.New().String()
ext := ExtFromMime(mime)
if ext == "" {
ext = filepath.Ext(srcPath)
}
dstPath := filepath.Join(dir, mediaID+ext)

// Same-filesystem rename is cheap; fall back to copy+remove across
// devices (e.g. when the workspace volume differs from tmpfs).
if err := os.Rename(srcPath, dstPath); err == nil {
return mediaID, ext, nil
}
if err := copyFile(srcPath, dstPath); err != nil {
return "", "", fmt.Errorf("media fs: copy file: %w", err)
}
_ = os.Remove(srcPath)
return mediaID, ext, nil
}

func (b *FSBackend) Open(ctx context.Context, id string) (io.ReadCloser, error) {
p, err := b.LocalPath(ctx, id)
if err != nil {
return nil, err
}
return os.Open(p)
}

func (b *FSBackend) LocalPath(_ context.Context, id string) (string, error) {
// Media files are stored as {sessionHash}/{id}.{ext}. The session
// hash is not part of the public ID, so we glob across sessions —
// IDs are uuid.New so the chance of collision is negligible.
matches, err := filepath.Glob(filepath.Join(b.baseDir, "*", id+".*"))
if err != nil {
return "", fmt.Errorf("media fs: glob for %s: %w", id, err)
}
if len(matches) == 0 {
return "", fmt.Errorf("%w: %s", ErrNotFound, id)
}
return matches[0], nil
}

func (b *FSBackend) Delete(_ context.Context, sessionKey string) error {
dir := b.sessionDir(sessionKey)
if err := os.RemoveAll(dir); err != nil {
slog.Warn("media fs: failed to delete session dir", "dir", dir, "error", err)
return err
}
return nil
}

func (b *FSBackend) sessionDir(sessionKey string) string {
h := sha256.Sum256([]byte(sessionKey))
hash := fmt.Sprintf("%x", h[:6]) // 12 hex chars, filesystem-safe
return filepath.Join(b.baseDir, hash)
}

// copyFile copies src to dst using buffered I/O.
func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()

out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()

if _, err := io.Copy(out, in); err != nil {
return err
}
return out.Close()
}

// Ensure FSBackend satisfies Backend at compile time.
var _ Backend = (*FSBackend)(nil)
95 changes: 95 additions & 0 deletions internal/media/fs_backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package media

import (
"context"
"errors"
"io"
"os"
"path/filepath"
"strings"
"testing"
)

// TestFSBackend_SaveLoadDelete is the core round-trip: write a file,
// look it up by ID, read it back, then drop the session.
func TestFSBackend_SaveLoadDelete(t *testing.T) {
t.Parallel()
ctx := context.Background()
base := t.TempDir()
b, err := NewFSBackend(base)
if err != nil {
t.Fatalf("NewFSBackend: %v", err)
}

src := filepath.Join(t.TempDir(), "src.png")
if err := os.WriteFile(src, []byte("PNG-bytes"), 0o644); err != nil {
t.Fatalf("write src: %v", err)
}

id, ext, err := b.Save(ctx, "session-a", src, "image/png")
if err != nil {
t.Fatalf("Save: %v", err)
}
if id == "" || ext != ".png" {
t.Fatalf("unexpected save result: id=%q ext=%q", id, ext)
}
if _, err := os.Stat(src); !os.IsNotExist(err) {
t.Fatalf("expected src to be consumed, got err=%v", err)
}

p, err := b.LocalPath(ctx, id)
if err != nil {
t.Fatalf("LocalPath: %v", err)
}
if !strings.HasPrefix(p, base) {
t.Fatalf("LocalPath %q outside base %q", p, base)
}

rc, err := b.Open(ctx, id)
if err != nil {
t.Fatalf("Open: %v", err)
}
body, _ := io.ReadAll(rc)
rc.Close()
if string(body) != "PNG-bytes" {
t.Fatalf("Open returned %q, want PNG-bytes", body)
}

if err := b.Delete(ctx, "session-a"); err != nil {
t.Fatalf("Delete: %v", err)
}
if _, err := b.LocalPath(ctx, id); !errors.Is(err, ErrNotFound) {
t.Fatalf("after Delete LocalPath err = %v, want ErrNotFound", err)
}
}

// TestFSBackend_MimeOverridesExtension confirms ExtFromMime wins over
// the source file's own extension — important because some upstream
// callers hand us a tempfile with no extension at all.
func TestFSBackend_MimeOverridesExtension(t *testing.T) {
t.Parallel()
ctx := context.Background()
b, _ := NewFSBackend(t.TempDir())

src := filepath.Join(t.TempDir(), "no-ext-here")
_ = os.WriteFile(src, []byte("x"), 0o644)

_, ext, err := b.Save(ctx, "s", src, "audio/mpeg")
if err != nil {
t.Fatalf("Save: %v", err)
}
if ext != ".mp3" {
t.Fatalf("ext = %q, want .mp3", ext)
}
}

// TestFSBackend_LoadPathMissingReturnsErrNotFound is what callers
// (media handlers, agent loop) match on when a stale ID slips through.
func TestFSBackend_LoadPathMissingReturnsErrNotFound(t *testing.T) {
t.Parallel()
b, _ := NewFSBackend(t.TempDir())
_, err := b.LocalPath(context.Background(), "00000000-0000-0000-0000-000000000000")
if !errors.Is(err, ErrNotFound) {
t.Fatalf("err = %v, want ErrNotFound", err)
}
}
Loading