Skip to content

[Feature]: Request body-transform extension point (EPP-style plugin framework) for dispatch-time payload rewriting #255

Description

@shimib

Problem Statement

The worker assumes one outgoing body shape: the OpenAI-style JSON marshalled from ReqPayload(), sent as-is by HTTPInferenceClient.SendRequest. Any provider that needs a different dispatch-time body (multi-modal APIs that want multipart/form-data, providers needing field remapping, signed-URL rewriting, etc.) currently has no choice but to patch a provider-specific branch into pkg/asyncworker/http_client.go. That doesn't scale — every such provider becomes a core code change and a permanent special case.

We already have a config-driven extension precedent in the repo (pipeline.GateFactoryCreateGate(gateType, params), wired via WithGateFactory(...) and constructed in cmd/main.go), and EPP has a mature plugin framework we depend on. I'd like to introduce an analogous request body-transform extension point: a registry of config-loaded plugins the worker consults at dispatch time, defaulting to a no-op so the existing JSON path is byte-for-byte unchanged when nothing is configured.

This issue tracks the framework; concrete plugins (starting with the gcs_uri multipart transform) are separate.

Proposed Solution

Model the extension point on EPP's pkg/epp/plugins so it's familiar across the projects:

  • Plugin identity — embed an EPP-style Plugin interface exposing TypedName() TypedName (Type + Name).
  • Typed capability interface — the transform contract the worker invokes:
// RequestTransform is a dispatch-time body-transform plugin. The worker invokes
// the configured chain after marshalling and before SendRequest. A plugin that
// doesn't recognize a message returns handled=false, preserving the JSON path.
type RequestTransform interface {
    plugins.Plugin // TypedName() plugins.TypedName

    // Validate runs before dispatch. Return a fatal (non-retryable) error to
    // reject the message — e.g. a signed object URL that expires before reqDeadline.
    Validate(payload []byte, metadata map[string]string, reqDeadline int64) error

    // Transform optionally returns a new body and Content-Type for the request.
    Transform(payload []byte, metadata map[string]string) (body []byte, contentType string, handled bool, err error)
}
  • Factory + registry — mirror EPP exactly:
type FactoryFunc func(name string, parameters json.RawMessage, handle Handle) (RequestTransform, error)
func Register(pluginType string, factory FactoryFunc) // into a package-level Registry
  • Config loading & wiring — load enabled transforms from configuration into an ordered chain, threaded to the worker via a functional option in the same style as WithGateFactory(...), constructed in cmd/main.go. Empty config ⇒ no transforms ⇒ current behavior.
  • Host integration point — in pkg/asyncworker/worker.go:
    • run Validate(...) in/after validateAndMarshal(...), where ReqDeadline() is already available; a fatal ClientError{ErrorCategory: ErrCategoryInvalidReq} flows through the existing inferenceErr.Category().Fatal() branch so the broker won't retry a doomed request.
    • run the Transform(...) chain immediately before client.SendRequest(reqCtx, msg.RequestURL, msg.HttpHeaders, payloadBytes); on handled=true, replace payloadBytes and set the returned Content-Type into msg.HttpHeaders.

This keeps the public api.InferenceClient interface unchanged — SendRequest(ctx, url, headers, payload) has no metadata parameter, and we avoid a breaking change by doing the transform host-side in the worker.

Contract note: api/api.go documents metadata as opaque pass-through ("the system does not read or write Metadata for its own routing or correlation"). Plugins keying off metadata (e.g. a provider key) intentionally relax this; because the registry is empty by default and opt-in via config, the default contract holds for anyone not enabling a transform.

Alternatives Considered

  • Single inline RequestTransform hook (no registry/plugins): simpler, but not config-loadable or composable, and diverges from EPP/GateFactory conventions.
  • Extend SendRequest to take metadata: breaking change to the public interface; pushes transform logic into the client.
  • Producer-side transform before enqueue: can't work — the body must reference a fresh signed URL validated against the message deadline at dispatch time.

Willingness to Contribute

Yes, I can submit a PR.

Additional Context

  • EPP reference: sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins (Plugin, TypedName, FactoryFunc, Register/Registry, Handle).
  • In-repo precedent: pipeline.GateFactory (pipeline/pipeline.go), WithGateFactory (pkg/redis, pkg/pubsub), cmd/main.go.
  • Open questions for discussion: package location for the registry (pipeline vs a new pkg/asyncworker/transform); whether Handle is needed in v1 or deferred; chain ordering / conflict semantics when multiple plugins return handled=true.

Sub-issues

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions