Skip to content

Commit a3032e5

Browse files
committed
WIP: swarmd: migrate to moby module
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
1 parent b0ffe7f commit a3032e5

12 files changed

Lines changed: 264 additions & 258 deletions

swarmd/cmd/swarmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
"os"
1111
"os/signal"
1212

13-
engineapi "github.com/docker/docker/client"
1413
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
14+
engineapi "github.com/moby/moby/client"
1515
"github.com/moby/swarmkit/swarmd/dockerexec"
1616
"github.com/moby/swarmkit/swarmd/internal/defaults"
1717
"github.com/moby/swarmkit/v2/api"
@@ -171,7 +171,7 @@ var (
171171
return err
172172
}
173173

174-
client, err := engineapi.NewClientWithOpts(
174+
client, err := engineapi.New(
175175
engineapi.WithHost(engineAddr),
176176
)
177177
if err != nil {

swarmd/dockerexec/adapter.go

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"strings"
99
"time"
1010

11-
"github.com/docker/docker/api/types"
12-
"github.com/docker/docker/api/types/container"
13-
"github.com/docker/docker/api/types/events"
14-
engineapi "github.com/docker/docker/client"
1511
gogotypes "github.com/gogo/protobuf/types"
12+
"github.com/moby/moby/api/types/container"
13+
"github.com/moby/moby/api/types/events"
14+
engineapi "github.com/moby/moby/client"
1615
"github.com/moby/swarmkit/v2/agent/exec"
1716
"github.com/moby/swarmkit/v2/api"
1817
"github.com/moby/swarmkit/v2/log"
@@ -42,16 +41,16 @@ func newContainerAdapter(client engineapi.APIClient, nodeDescription *api.NodeDe
4241
}, nil
4342
}
4443

45-
func noopPrivilegeFn() (string, error) { return "", nil }
44+
func noopPrivilegeFn(context.Context) (string, error) { return "", nil }
4645

47-
func (c *containerConfig) imagePullOptions() types.ImagePullOptions {
46+
func (c *containerConfig) imagePullOptions() engineapi.ImagePullOptions {
4847
var registryAuth string
4948

5049
if c.spec().PullOptions != nil {
5150
registryAuth = c.spec().PullOptions.RegistryAuth
5251
}
5352

54-
return types.ImagePullOptions{
53+
return engineapi.ImagePullOptions{
5554
// if the image needs to be pulled, the auth config will be retrieved and updated
5655
RegistryAuth: registryAuth,
5756
PrivilegeFunc: noopPrivilegeFn,
@@ -130,7 +129,7 @@ func (c *containerAdapter) createNetworks(ctx context.Context) error {
130129

131130
func (c *containerAdapter) removeNetworks(ctx context.Context) error {
132131
for _, nid := range c.container.networks() {
133-
if err := c.client.NetworkRemove(ctx, nid); err != nil {
132+
if _, err := c.client.NetworkRemove(ctx, nid, engineapi.NetworkRemoveOptions{}); err != nil {
134133
if isActiveEndpointError(err) {
135134
continue
136135
}
@@ -144,32 +143,36 @@ func (c *containerAdapter) removeNetworks(ctx context.Context) error {
144143
}
145144

146145
func (c *containerAdapter) create(ctx context.Context) error {
147-
_, err := c.client.ContainerCreate(ctx,
148-
c.container.config(),
149-
c.container.hostConfig(),
150-
c.container.networkingConfig(),
151-
nil,
152-
c.container.name(),
153-
)
146+
_, err := c.client.ContainerCreate(ctx, engineapi.ContainerCreateOptions{
147+
Config: c.container.config(),
148+
HostConfig: c.container.hostConfig(),
149+
NetworkingConfig: c.container.networkingConfig(),
150+
Name: c.container.name(),
151+
})
154152

155153
return err
156154
}
157155

158156
func (c *containerAdapter) start(ctx context.Context) error {
159157
// TODO(nishanttotla): Consider adding checkpoint handling later
160-
return c.client.ContainerStart(ctx, c.container.name(), types.ContainerStartOptions{})
158+
_, err := c.client.ContainerStart(ctx, c.container.name(), engineapi.ContainerStartOptions{})
159+
return err
161160
}
162161

163-
func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
164-
return c.client.ContainerInspect(ctx, c.container.name())
162+
func (c *containerAdapter) inspect(ctx context.Context) (container.InspectResponse, error) {
163+
res, err := c.client.ContainerInspect(ctx, c.container.name(), engineapi.ContainerInspectOptions{})
164+
if err != nil {
165+
return container.InspectResponse{}, err
166+
}
167+
return res.Container, nil
165168
}
166169

167170
// events issues a call to the events API and returns a channel with all
168171
// events. The stream of events can be shutdown by cancelling the context.
169172
//
170173
// A chan struct{} is returned that will be closed if the event processing
171174
// fails and needs to be restarted.
172-
func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <-chan struct{}, error) {
175+
func (c *containerAdapter) events(ctx context.Context) engineapi.EventsResult {
173176
// TODO(stevvooe): Move this to a single, global event dispatch. For
174177
// now, we create a connection per container.
175178
var (
@@ -180,7 +183,7 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <
180183
log.G(ctx).Debugf("waiting on events")
181184
// TODO(stevvooe): For long running tasks, it is likely that we will have
182185
// to restart this under failure.
183-
eventCh, errCh := c.client.Events(ctx, types.EventsOptions{
186+
res := c.client.Events(ctx, engineapi.EventsListOptions{
184187
Since: "0",
185188
Filters: c.container.eventFilter(),
186189
})
@@ -190,13 +193,13 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <
190193

191194
for {
192195
select {
193-
case msg := <-eventCh:
196+
case msg := <-res.Messages:
194197
select {
195198
case eventsq <- msg:
196199
case <-ctx.Done():
197200
return
198201
}
199-
case err := <-errCh:
202+
case err := <-res.Err:
200203
log.G(ctx).WithError(err).Error("error from events stream")
201204
return
202205
case <-ctx.Done():
@@ -206,7 +209,10 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <
206209
}
207210
}()
208211

209-
return eventsq, closed, nil
212+
return engineapi.EventsResult{
213+
Messages: eventsq,
214+
Err: nil,
215+
}
210216
}
211217

212218
func (c *containerAdapter) shutdown(ctx context.Context) error {
@@ -220,18 +226,21 @@ func (c *containerAdapter) shutdown(ctx context.Context) error {
220226
stopgraceFromProto, _ := gogotypes.DurationFromProto(spec.StopGracePeriod)
221227
stopgraceSeconds = int(stopgraceFromProto.Seconds())
222228
}
223-
return c.client.ContainerStop(ctx, c.container.name(), container.StopOptions{Timeout: &stopgraceSeconds})
229+
_, err := c.client.ContainerStop(ctx, c.container.name(), engineapi.ContainerStopOptions{Timeout: &stopgraceSeconds})
230+
return err
224231
}
225232

226233
func (c *containerAdapter) terminate(ctx context.Context) error {
227-
return c.client.ContainerKill(ctx, c.container.name(), "")
234+
_, err := c.client.ContainerKill(ctx, c.container.name(), engineapi.ContainerKillOptions{})
235+
return err
228236
}
229237

230238
func (c *containerAdapter) remove(ctx context.Context) error {
231-
return c.client.ContainerRemove(ctx, c.container.name(), types.ContainerRemoveOptions{
239+
_, err := c.client.ContainerRemove(ctx, c.container.name(), engineapi.ContainerRemoveOptions{
232240
RemoveVolumes: true,
233241
Force: true,
234242
})
243+
return err
235244
}
236245

237246
func (c *containerAdapter) createVolumes(ctx context.Context) error {
@@ -268,7 +277,7 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription
268277
return nil, errors.New("logs not supported on services with TTY")
269278
}
270279

271-
apiOptions := types.ContainerLogsOptions{
280+
apiOptions := engineapi.ContainerLogsOptions{
272281
Follow: options.Follow,
273282
Timestamps: true,
274283
Details: false,

swarmd/dockerexec/container.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@ import (
44
"errors"
55
"fmt"
66
"net"
7+
"net/netip"
78
"strconv"
89
"strings"
910
"time"
1011

11-
"github.com/docker/docker/api/types"
12-
enginecontainer "github.com/docker/docker/api/types/container"
13-
"github.com/docker/docker/api/types/events"
14-
"github.com/docker/docker/api/types/filters"
15-
enginemount "github.com/docker/docker/api/types/mount"
16-
"github.com/docker/docker/api/types/network"
17-
"github.com/docker/docker/api/types/volume"
18-
"github.com/docker/go-connections/nat"
1912
"github.com/docker/go-units"
2013
gogotypes "github.com/gogo/protobuf/types"
14+
enginecontainer "github.com/moby/moby/api/types/container"
15+
"github.com/moby/moby/api/types/events"
16+
enginemount "github.com/moby/moby/api/types/mount"
17+
"github.com/moby/moby/api/types/network"
18+
engineapi "github.com/moby/moby/client"
2119
"github.com/moby/swarmkit/v2/agent/exec"
2220
"github.com/moby/swarmkit/v2/api"
2321
"github.com/moby/swarmkit/v2/api/genericresource"
@@ -93,12 +91,13 @@ func (c *containerConfig) image() string {
9391
return c.spec().Image
9492
}
9593

96-
func portSpec(port uint32, protocol api.PortConfig_Protocol) nat.Port {
97-
return nat.Port(fmt.Sprintf("%d/%s", port, strings.ToLower(protocol.String())))
94+
func portSpec(port uint32, protocol api.PortConfig_Protocol) network.Port {
95+
p, _ := network.ParsePort(fmt.Sprintf("%d/%s", port, strings.ToLower(protocol.String())))
96+
return p
9897
}
9998

100-
func (c *containerConfig) portBindings() nat.PortMap {
101-
portBindings := nat.PortMap{}
99+
func (c *containerConfig) portBindings() network.PortMap {
100+
portBindings := network.PortMap{}
102101
if c.task.Endpoint == nil {
103102
return portBindings
104103
}
@@ -109,7 +108,7 @@ func (c *containerConfig) portBindings() nat.PortMap {
109108
}
110109

111110
port := portSpec(portConfig.TargetPort, portConfig.Protocol)
112-
binding := []nat.PortBinding{
111+
binding := []network.PortBinding{
113112
{},
114113
}
115114

@@ -125,17 +124,18 @@ func (c *containerConfig) portBindings() nat.PortMap {
125124
func (c *containerConfig) isolation() enginecontainer.Isolation {
126125
switch c.spec().Isolation {
127126
case api.ContainerIsolationDefault:
128-
return enginecontainer.Isolation("default")
127+
return "default"
129128
case api.ContainerIsolationHyperV:
130-
return enginecontainer.Isolation("hyperv")
129+
return "hyperv"
131130
case api.ContainerIsolationProcess:
132-
return enginecontainer.Isolation("process")
131+
return "process"
132+
default:
133+
return ""
133134
}
134-
return enginecontainer.Isolation("")
135135
}
136136

137-
func (c *containerConfig) exposedPorts() map[nat.Port]struct{} {
138-
exposedPorts := make(map[nat.Port]struct{})
137+
func (c *containerConfig) exposedPorts() network.PortSet {
138+
exposedPorts := make(network.PortSet)
139139
if c.task.Endpoint == nil {
140140
return exposedPorts
141141
}
@@ -427,7 +427,7 @@ func getMountMask(m *api.Mount) string {
427427
}
428428

429429
// This handles the case of volumes that are defined inside a service Mount
430-
func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *volume.CreateOptions {
430+
func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *engineapi.VolumeCreateOptions {
431431
var (
432432
driverName string
433433
driverOpts map[string]string
@@ -441,7 +441,7 @@ func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *volume.CreateOp
441441
}
442442

443443
// FIXME: do we need the ClusterVolumeSpec here?
444-
return &volume.CreateOptions{
444+
return &engineapi.VolumeCreateOptions{
445445
Name: mount.Source,
446446
Driver: driverName,
447447
DriverOpts: driverOpts,
@@ -513,20 +513,20 @@ func (c *containerConfig) virtualIP(networkID string) string {
513513
func (c *containerConfig) networkingConfig() *network.NetworkingConfig {
514514
epConfig := make(map[string]*network.EndpointSettings)
515515
for _, na := range c.task.Networks {
516-
var ipv4, ipv6 string
516+
var ipv4, ipv6 netip.Addr
517517
for _, addr := range na.Addresses {
518-
ip, _, err := net.ParseCIDR(addr)
518+
prefix, err := netip.ParsePrefix(addr)
519519
if err != nil {
520520
continue
521521
}
522522

523-
if ip.To4() != nil {
524-
ipv4 = ip.String()
523+
ip := prefix.Addr()
524+
if ip.Is4() {
525+
ipv4 = ip
525526
continue
526527
}
527-
528-
if ip.To16() != nil {
529-
ipv6 = ip.String()
528+
if ip.Is6() {
529+
ipv6 = ip
530530
}
531531
}
532532

@@ -556,39 +556,48 @@ func (c *containerConfig) networks() []string {
556556
return networks
557557
}
558558

559-
func (c *containerConfig) networkCreateOptions(name string) (types.NetworkCreate, error) {
559+
func (c *containerConfig) networkCreateOptions(name string) (engineapi.NetworkCreateOptions, error) {
560560
na, ok := c.networksAttachments[name]
561561
if !ok {
562-
return types.NetworkCreate{}, errors.New("container: unknown network referenced")
562+
return engineapi.NetworkCreateOptions{}, errors.New("container: unknown network referenced")
563563
}
564564

565-
options := types.NetworkCreate{
565+
options := engineapi.NetworkCreateOptions{
566566
Driver: na.Network.DriverState.Name,
567567
IPAM: &network.IPAM{
568568
Driver: na.Network.IPAM.Driver.Name,
569569
},
570-
Options: na.Network.DriverState.Options,
571-
CheckDuplicate: true,
570+
Options: na.Network.DriverState.Options,
572571
}
573572

574573
for _, ic := range na.Network.IPAM.Configs {
575-
c := network.IPAMConfig{
576-
Subnet: ic.Subnet,
577-
IPRange: ic.Range,
578-
Gateway: ic.Gateway,
574+
sn, err := netip.ParsePrefix(ic.Subnet)
575+
if err != nil {
576+
continue
577+
}
578+
r, err := netip.ParsePrefix(ic.Range)
579+
if err != nil {
580+
continue
581+
}
582+
gw, err := netip.ParseAddr(ic.Gateway)
583+
if err != nil {
584+
continue
579585
}
580-
options.IPAM.Config = append(options.IPAM.Config, c)
586+
options.IPAM.Config = append(options.IPAM.Config, network.IPAMConfig{
587+
Subnet: sn,
588+
IPRange: r,
589+
Gateway: gw,
590+
})
581591
}
582592

583593
return options, nil
584594
}
585595

586-
func (c containerConfig) eventFilter() filters.Args {
587-
filter := filters.NewArgs()
588-
filter.Add("type", string(events.ContainerEventType))
589-
filter.Add("name", c.name())
590-
filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
591-
return filter
596+
func (c containerConfig) eventFilter() engineapi.Filters {
597+
return make(engineapi.Filters).
598+
Add("type", string(events.ContainerEventType)).
599+
Add("name", c.name()).
600+
Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
592601
}
593602

594603
func (c *containerConfig) init() *bool {

swarmd/dockerexec/container_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"testing"
66
"time"
77

8-
enginecontainer "github.com/docker/docker/api/types/container"
9-
enginemount "github.com/docker/docker/api/types/mount"
10-
"github.com/docker/docker/api/types/strslice"
118
"github.com/docker/go-units"
129
gogotypes "github.com/gogo/protobuf/types"
10+
enginecontainer "github.com/moby/moby/api/types/container"
11+
enginemount "github.com/moby/moby/api/types/mount"
12+
"github.com/moby/moby/api/types/strslice"
1313
"github.com/moby/swarmkit/v2/api"
1414
)
1515

0 commit comments

Comments
 (0)