Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions service/leaderfollower/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package leaderfollower

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
)

const defaultCoordPath = "/coord"
const defaultHTTPTimeout = 5 * time.Second

// Coordinator wires together an [Elector], [PeerDiscovery], and [Protocol].
type Coordinator struct {
l *zap.Logger
elector Elector
discovery PeerDiscovery
protocol Protocol
isLeader atomic.Bool

httpClient *http.Client
coordPath string
}

// CoordinatorOption configures a [Coordinator].
type CoordinatorOption func(*Coordinator)

// WithHTTPClient replaces the default HTTP client used for fan-out calls.
func WithHTTPClient(c *http.Client) CoordinatorOption {
return func(coord *Coordinator) { coord.httpClient = c }
}

// WithCoordPath sets the path used for peer coordination requests.
// Default: "/coord".
func WithCoordPath(path string) CoordinatorOption {
return func(coord *Coordinator) { coord.coordPath = path }
}

// NewCoordinator creates a Coordinator from explicitly provided components.
func NewCoordinator(l *zap.Logger, elector Elector, discovery PeerDiscovery, protocol Protocol, opts ...CoordinatorOption) *Coordinator {
c := &Coordinator{
l: l,
elector: elector,
discovery: discovery,
protocol: protocol,
coordPath: defaultCoordPath,
httpClient: &http.Client{
Timeout: defaultHTTPTimeout,
},
}
for _, o := range opts {
o(c)
}

return c
}

// Name implements the keel Service interface.
func (c *Coordinator) Name() string { return "leaderfollower-coordinator" }

// Start runs the election loop until ctx is cancelled.
// Blocks
func (c *Coordinator) Start(ctx context.Context) error {
return c.elector.Run(ctx, func(leaderCtx context.Context) {
c.isLeader.Store(true)
c.l.Info("became leader", zap.String("identity", c.elector.Identity()))

defer func() {
c.isLeader.Store(false)
c.l.Info("lost leadership", zap.String("identity", c.elector.Identity()))
}()

if err := c.protocol.Lead(leaderCtx, c.newFanOut()); err != nil {
c.l.Error("protocol lead returned error", zap.Error(err))
}
})
}

// Close is a no-op; cancelling the context passed to Start is sufficient.
func (c *Coordinator) Close(_ context.Context) error { return nil }

// IsLeader reports whether this node currently holds the leader lease.
func (c *Coordinator) IsLeader() bool { return c.isLeader.Load() }

// newFanOut returns a FanOut implementation bound to this coordinator's
// peer discovery and HTTP client.
func (c *Coordinator) newFanOut() FanOut {
return &fanOutImpl{
l: c.l,
discovery: c.discovery,
client: c.httpClient,
path: c.coordPath,
}
}

// fanOutImpl is the concrete FanOut used by protocols.
type fanOutImpl struct {
l *zap.Logger
discovery PeerDiscovery
client *http.Client
path string
}

func (f *fanOutImpl) Peers(ctx context.Context) ([]Peer, error) {
return f.discovery.Peers(ctx)
}

func (f *fanOutImpl) All(ctx context.Context, req FanOutRequest) []FanOutResult {
peers, err := f.discovery.Peers(ctx)
if err != nil {
f.l.Error("fan-out: peer discovery failed", zap.Error(err))
return []FanOutResult{{Err: fmt.Errorf("%w: %w", ErrDiscoveryFailed, err)}}
}

body, err := json.Marshal(req)
if err != nil {
f.l.Error("fan-out: failed to marshal request", zap.Error(err))
return []FanOutResult{{Err: fmt.Errorf("marshal fan-out request: %w", err)}}
}

var mu sync.Mutex

results := make([]FanOutResult, 0, len(peers))

var wg sync.WaitGroup

for _, p := range peers {
wg.Go(func() {
res := f.callPeer(ctx, p, body)

mu.Lock()

results = append(results, res)
mu.Unlock()
})
}

wg.Wait()

return results
}

func (f *fanOutImpl) callPeer(ctx context.Context, p Peer, body []byte) FanOutResult {
url := p.Addr + f.path

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return FanOutResult{Peer: p, Err: err}
}

httpReq.Header.Set("Content-Type", "application/json")

resp, err := f.client.Do(httpReq)

Check failure on line 160 in service/leaderfollower/coordinator.go

View workflow job for this annotation

GitHub Actions / test

G704: SSRF via taint analysis (gosec)
if err != nil {
return FanOutResult{Peer: p, Err: err}
}
defer resp.Body.Close()

respBody, err := io.ReadAll(io.LimitReader(resp.Body, 4096))
if err != nil {
return FanOutResult{Peer: p, Status: resp.StatusCode, Err: fmt.Errorf("read response: %w", err)}
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return FanOutResult{Peer: p, Status: resp.StatusCode,
Err: fmt.Errorf("%w: HTTP %d from %s", ErrPeerCallFailed, resp.StatusCode, p.ID), Body: respBody}
}

return FanOutResult{Peer: p, Status: resp.StatusCode, Body: respBody}
}
84 changes: 84 additions & 0 deletions service/leaderfollower/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package leaderfollower

import (
"context"
"fmt"
"net"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// Peer represents a reachable node in the cluster.
type Peer struct {
// ID is the stable identity of the peer. Must match [Elector.Identity] for
// the same node so the coordinator can identify itself in the peer list.
ID string

// Addr is the base HTTP address of the peer's coordination server,
Addr string
}

// PeerDiscovery resolves the current set of peers (including self).
// Implementations are called before each coordination round and must be
// safe for concurrent use.
type PeerDiscovery interface {
Peers(ctx context.Context) ([]Peer, error)
}

// --- Kubernetes Pod implementation ---

// PodDiscoveryConfig holds configuration for Kubernetes pod-based peer discovery.
type PodDiscoveryConfig struct {
Client kubernetes.Interface
Namespace string
LabelSelector string // e.g. "app.kubernetes.io/instance=myservice"
CoordPort int // port the coord server listens on (e.g. 8090)
}

// PodDiscovery implements [PeerDiscovery] by listing running Kubernetes pods
// matching a label selector.
type PodDiscovery struct {
cfg PodDiscoveryConfig
}

// NewPodDiscovery creates a PodDiscovery.
func NewPodDiscovery(cfg PodDiscoveryConfig) *PodDiscovery {
return &PodDiscovery{cfg: cfg}
}

// Peers lists all running, non-terminating pods matching the label selector.
// The pod name is used as Peer.ID and the pod IP + CoordPort as Peer.Addr.
func (d *PodDiscovery) Peers(ctx context.Context) ([]Peer, error) {
list, err := d.cfg.Client.CoreV1().Pods(d.cfg.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: d.cfg.LabelSelector,
})
if err != nil {
return nil, fmt.Errorf("list pods: %w", err)
}

var peers []Peer

for _, p := range list.Items {
if p.Status.Phase != corev1.PodRunning {
continue
}

if p.DeletionTimestamp != nil {
continue
}

if p.Status.PodIP == "" {
continue
}

peers = append(peers, Peer{
ID: p.Name,
Addr: "http://" + net.JoinHostPort(p.Status.PodIP, strconv.Itoa(d.cfg.CoordPort)),
})
}

return peers, nil
}
35 changes: 35 additions & 0 deletions service/leaderfollower/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Package leaderfollower provides a generic leader/follower coordination
// framework for distributed Go services running in Kubernetes.
//
// # Interfaces
//
// The framework is built from three pluggable interfaces:
// - [Elector]: determines which node is the leader
// - [PeerDiscovery]: resolves the current set of addressable peers
// - [Protocol]: the coordination strategy executed by the leader
//
// Kubernetes implementations are included in this package:
// - [LeaseElector]: leader election via Kubernetes Lease resource
// - [PodDiscovery]: peer discovery via pod label selector
//
// For the batteries-included Kubernetes setup with three-phase commit,
// use [threephase.New] in the threephase sub-package, which also provides
// [threephase.ConfigMapCoordStore] for coordination state persistence.
//
// For manual wiring without the K8s defaults, use [NewCoordinator].
//
// # Protocol
//
// The threephase sub-package provides a three-phase commit protocol:
// - Phase 1 (CanCommit): peers validate whether they can accept the proposed state
// - Phase 2 (PreCommit): peers stage the state (default: no-op, degrades to 2PC)
// - Phase 3 (DoCommit): peers apply the state atomically
//
// # ConfigMap layout
//
// The coordination ConfigMap managed by [threephase.ConfigMapCoordStore] uses the following keys:
// - committed: JSON-encoded currently active state
// - previous: JSON-encoded previous state (for rollback after restart)
// - proposed: JSON-encoded next proposed state (set by external proposers)
// - round: JSON-encoded Round[S] for crash recovery (cleared after each round)
package leaderfollower
Loading
Loading