diff --git a/config.json b/config.json index f44f32c..5948fe7 100644 --- a/config.json +++ b/config.json @@ -29,11 +29,20 @@ "options": [ "bind" ] + }, + { + "source": "/var/run/docker/netns", + "destination": "/run/docker/netns", + "type": "bind", + "options": [ + "bind" + ] } ], "linux": { "capabilities": [ - "CAP_NET_ADMIN" + "CAP_NET_ADMIN", + "CAP_SYS_ADMIN" ] } } diff --git a/pkg/plugin/dhcp_manager.go b/pkg/plugin/dhcp_manager.go new file mode 100644 index 0000000..d08134a --- /dev/null +++ b/pkg/plugin/dhcp_manager.go @@ -0,0 +1,292 @@ +package plugin + +import ( + "context" + "fmt" + "net" + "time" + + dTypes "github.com/docker/docker/api/types" + docker "github.com/docker/docker/client" + log "github.com/sirupsen/logrus" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" + + "github.com/devplayer0/docker-net-dhcp/pkg/udhcpc" + "github.com/devplayer0/docker-net-dhcp/pkg/util" +) + +type dhcpManager struct { + docker *docker.Client + joinReq JoinRequest + opts DHCPNetworkOptions + + LastIP *netlink.Addr + LastIPv6 *netlink.Addr + + hostname string + nsHandle netns.NsHandle + netHandle *netlink.Handle + ctrLink netlink.Link + + stopChan chan struct{} + errChan chan error + errChanV6 chan error +} + +func newDHCPManager(docker *docker.Client, r JoinRequest, opts DHCPNetworkOptions) *dhcpManager { + return &dhcpManager{ + docker: docker, + joinReq: r, + opts: opts, + + stopChan: make(chan struct{}), + } +} + +func (m *dhcpManager) logFields(v6 bool) log.Fields { + return log.Fields{ + "network": m.joinReq.NetworkID[:12], + "endpoint": m.joinReq.EndpointID[:12], + "sandbox": m.joinReq.SandboxKey, + "is_ipv6": v6, + } +} + +func (m *dhcpManager) renew(v6 bool, info udhcpc.Info) error { + lastIP := m.LastIP + if v6 { + lastIP = m.LastIPv6 + } + + ip, err := netlink.ParseAddr(info.IP) + if err != nil { + return fmt.Errorf("failed to parse IP address: %w", err) + } + + if !ip.Equal(*lastIP) { + // TODO: We can't deal with a different renewed IP for the same reason as described for `bound` + log. + WithFields(m.logFields(v6)). + WithField("old_ip", lastIP). + WithField("new_ip", ip). + Warn("udhcpc renew with changed IP") + } + + if !v6 && info.Gateway != "" { + newGateway := net.ParseIP(info.Gateway) + + routes, err := m.netHandle.RouteListFiltered(unix.AF_INET, &netlink.Route{ + LinkIndex: m.ctrLink.Attrs().Index, + Dst: nil, + }, netlink.RT_FILTER_OIF|netlink.RT_FILTER_DST) + if err != nil { + return fmt.Errorf("failed to list routes: %w", err) + } + + if len(routes) == 0 { + log. + WithFields(m.logFields(v6)). + WithField("gateway", newGateway). + Info("udhcpc renew adding default route") + + if err := m.netHandle.RouteAdd(&netlink.Route{ + LinkIndex: m.ctrLink.Attrs().Index, + Gw: newGateway, + }); err != nil { + return fmt.Errorf("failed to add default route: %w", err) + } + } else if !newGateway.Equal(routes[0].Gw) { + log. + WithFields(m.logFields(v6)). + WithField("old_gateway", routes[0].Gw). + WithField("new_gateway", newGateway). + Info("udhcpc renew replacing default route") + + routes[0].Gw = newGateway + if err := m.netHandle.RouteReplace(&routes[0]); err != nil { + return fmt.Errorf("failed to replace default route: %w", err) + } + } + } + + return nil +} + +func (m *dhcpManager) setupClient(v6 bool) (chan error, error) { + v6Str := "" + if v6 { + v6Str = "v6" + } + + client, err := udhcpc.NewDHCPClient(m.ctrLink.Attrs().Name, &udhcpc.DHCPClientOptions{ + Hostname: m.hostname, + V6: v6, + Namespace: m.joinReq.SandboxKey, + }) + if err != nil { + return nil, fmt.Errorf("failed to create DHCP%v client: %w", v6Str, err) + } + + events, err := client.Start() + if err != nil { + return nil, fmt.Errorf("failed to start DHCP%v client: %w", v6Str, err) + } + + errChan := make(chan error) + go func() { + for { + select { + case event := <-events: + switch event.Type { + // TODO: We can't really allow the IP in the container to be deleted, it'll delete some of our previously + // copied routes. Should this be handled somehow? + //case "deconfig": + // ip := m.LastIP + // if v6 { + // ip = m.LastIPv6 + // } + + // log. + // WithFields(m.logFields(v6)). + // WithField("ip", ip). + // Info("udhcpc deconfiguring, deleting previously acquired IP") + // if err := m.netHandle.AddrDel(m.ctrLink, ip); err != nil { + // log. + // WithError(err). + // WithFields(m.logFields(v6)). + // WithField("ip", ip). + // Error("Failed to delete existing udhcpc address") + // } + // We're `bound` from the beginning + //case "bound": + case "renew": + log. + WithFields(m.logFields(v6)). + Debug("udhcpc renew") + + if err := m.renew(v6, event.Data); err != nil { + log. + WithError(err). + WithFields(m.logFields(v6)). + WithField("gateway", event.Data.Gateway). + WithField("new_ip", event.Data.IP). + Error("Failed to execute IP renewal") + } + case "leasefail": + log.WithFields(m.logFields(v6)).Warn("udhcpc failed to get a lease") + case "nak": + log.WithFields(m.logFields(v6)).Warn("udhcpc client received NAK") + } + + case <-m.stopChan: + log.WithFields(m.logFields(v6)).Info("Shutting down persistent DHCP client") + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + errChan <- client.Finish(ctx) + return + } + } + }() + + return errChan, nil +} + +func (m *dhcpManager) Start(ctx context.Context) error { + var err error + m.nsHandle, err = util.AwaitNetNS(ctx, m.joinReq.SandboxKey, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to get sandbox network namespace: %w", err) + } + + m.netHandle, err = netlink.NewHandleAt(m.nsHandle) + if err != nil { + m.nsHandle.Close() + return fmt.Errorf("failed to open netlink handle in sandbox namespace: %w", err) + } + + if err := func() error { + hostName, _ := vethPairNames(m.joinReq.EndpointID) + hostLink, err := netlink.LinkByName(hostName) + if err != nil { + return fmt.Errorf("failed to find host side of veth pair: %w", err) + } + hostVeth, ok := hostLink.(*netlink.Veth) + if !ok { + return util.ErrNotVEth + } + + ctrIndex, err := netlink.VethPeerIndex(hostVeth) + if err != nil { + return fmt.Errorf("failed to get container side of veth's index: %w", err) + } + + m.ctrLink, err = util.AwaitLinkByIndex(ctx, m.netHandle, ctrIndex, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("failed to get link for container side of veth pair: %w", err) + } + + dockerNet, err := m.docker.NetworkInspect(ctx, m.joinReq.NetworkID, dTypes.NetworkInspectOptions{}) + if err != nil { + return fmt.Errorf("failed to get Docker network info: %w", err) + } + + var ctrID string + for id, info := range dockerNet.Containers { + if info.EndpointID == m.joinReq.EndpointID { + ctrID = id + break + } + } + if ctrID == "" { + return util.ErrNoContainer + } + + ctr, err := m.docker.ContainerInspect(ctx, ctrID) + if err != nil { + return fmt.Errorf("failed to get Docker container info: %w", err) + } + m.hostname = ctr.Config.Hostname + + if m.errChan, err = m.setupClient(false); err != nil { + close(m.stopChan) + return err + } + + if m.opts.IPv6 { + if m.errChanV6, err = m.setupClient(true); err != nil { + close(m.stopChan) + return err + } + } + + return nil + }(); err != nil { + m.netHandle.Delete() + m.nsHandle.Close() + return err + } + + return nil +} + +func (m *dhcpManager) Stop() error { + defer m.nsHandle.Close() + defer m.netHandle.Delete() + + close(m.stopChan) + + if err := <-m.errChan; err != nil { + return fmt.Errorf("failed shut down DHCP client: %w", err) + } + if m.opts.IPv6 { + if err := <-m.errChanV6; err != nil { + return fmt.Errorf("failed shut down DHCPv6 client: %w", err) + } + } + + return nil +} diff --git a/pkg/plugin/network.go b/pkg/plugin/network.go index 09e1dbd..43d274e 100644 --- a/pkg/plugin/network.go +++ b/pkg/plugin/network.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" dTypes "github.com/docker/docker/api/types" "github.com/mitchellh/mapstructure" @@ -212,7 +213,7 @@ func (p *Plugin) CreateEndpoint(ctx context.Context, r CreateEndpointRequest) (C if err != nil { return fmt.Errorf("failed to get initial IP%v address via DHCP%v: %w", v6str, v6str, err) } - ip, _, err := net.ParseCIDR(info.IP) + ip, err := netlink.ParseAddr(info.IP) if err != nil { return fmt.Errorf("failed to parse initial IP%v address: %w", v6str, err) } @@ -400,8 +401,8 @@ func (p *Plugin) Join(ctx context.Context, r JoinRequest) (JoinResponse, error) } if route.Protocol == unix.RTPROT_KERNEL || - (family == unix.AF_INET && route.Dst.Contains(hint.IPv4)) || - (family == unix.AF_INET6 && route.Dst.Contains(hint.IPv6)) { + (family == unix.AF_INET && route.Dst.Contains(hint.IPv4.IP)) || + (family == unix.AF_INET6 && route.Dst.Contains(hint.IPv6.IP)) { // Make sure to leave out the default on-link route created automatically for the IP(s) acquired by DHCP continue } @@ -446,7 +447,26 @@ func (p *Plugin) Join(ctx context.Context, r JoinRequest) (JoinResponse, error) } } - // TODO: Start a persistent DHCP client + go func() { + // TODO: Make timeout configurable? + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + m := newDHCPManager(p.docker, r, opts) + m.LastIP = hint.IPv4 + m.LastIPv6 = hint.IPv6 + + if err := m.Start(ctx); err != nil { + log.WithError(err).WithFields(log.Fields{ + "network": r.NetworkID[:12], + "endpoint": r.EndpointID[:12], + "sandbox": r.SandboxKey, + }).Error("Failed to start persistent DHCP client") + return + } + + p.persistentDHCP[r.EndpointID] = m + }() log.WithFields(log.Fields{ "network": r.NetworkID[:12], @@ -459,7 +479,15 @@ func (p *Plugin) Join(ctx context.Context, r JoinRequest) (JoinResponse, error) // Leave stops the persistent DHCP client for an endpoint func (p *Plugin) Leave(ctx context.Context, r LeaveRequest) error { - // TODO: Actually stop the DHCP client + manager, ok := p.persistentDHCP[r.EndpointID] + if !ok { + return util.ErrNoSandbox + } + delete(p.persistentDHCP, r.EndpointID) + + if err := manager.Stop(); err != nil { + return err + } log.WithFields(log.Fields{ "network": r.NetworkID[:12], diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 462c54a..0bf26e2 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -9,6 +9,7 @@ import ( docker "github.com/docker/docker/client" "github.com/gorilla/handlers" "github.com/mitchellh/mapstructure" + "github.com/vishvananda/netlink" "github.com/devplayer0/docker-net-dhcp/pkg/util" ) @@ -47,8 +48,8 @@ func decodeOpts(input interface{}) (DHCPNetworkOptions, error) { } type joinHint struct { - IPv4 net.IP - IPv6 net.IP + IPv4 *netlink.Addr + IPv6 *netlink.Addr Gateway string } @@ -57,7 +58,8 @@ type Plugin struct { docker *docker.Client server http.Server - joinHints map[string]joinHint + joinHints map[string]joinHint + persistentDHCP map[string]*dhcpManager } // NewPlugin creates a new Plugin @@ -70,7 +72,8 @@ func NewPlugin() (*Plugin, error) { p := Plugin{ docker: client, - joinHints: make(map[string]joinHint), + joinHints: make(map[string]joinHint), + persistentDHCP: make(map[string]*dhcpManager), } mux := http.NewServeMux() diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 0fd9632..43df8c3 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -22,6 +22,10 @@ var ( ErrNoHint = errors.New("missing CreateEndpoint hints") // ErrNotVEth indicates a host link was unexpectedly not a veth interface ErrNotVEth = errors.New("host link is not a veth interface") + // ErrNoContainer indicates a container was unexpectedly not found + ErrNoContainer = errors.New("couldn't find container by endpoint on the network") + // ErrNoSandbox indicates missing state from the Join stage + ErrNoSandbox = errors.New("missing joined endpoint state") ) func ErrToStatus(err error) int { diff --git a/pkg/util/netlink.go b/pkg/util/netlink.go new file mode 100644 index 0000000..7e3cbed --- /dev/null +++ b/pkg/util/netlink.go @@ -0,0 +1,66 @@ +package util + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" +) + +func AwaitNetNS(ctx context.Context, path string, interval time.Duration) (netns.NsHandle, error) { + var err error + nsChan := make(chan netns.NsHandle) + go func() { + for { + var ns netns.NsHandle + ns, err = netns.GetFromPath(path) + if err == nil { + nsChan <- ns + return + } + + time.Sleep(interval) + } + }() + + var dummy netns.NsHandle + select { + case ns := <-nsChan: + return ns, nil + case <-ctx.Done(): + if err != nil { + logrus.WithError(err).WithField("path", path).Error("Failed to await network namespace") + } + return dummy, ctx.Err() + } +} + +func AwaitLinkByIndex(ctx context.Context, handle *netlink.Handle, index int, interval time.Duration) (netlink.Link, error) { + var err error + linkChan := make(chan netlink.Link) + go func() { + for { + var link netlink.Link + link, err = handle.LinkByIndex(index) + if err == nil { + linkChan <- link + return + } + + time.Sleep(interval) + } + }() + + var dummy netlink.Link + select { + case link := <-linkChan: + return link, nil + case <-ctx.Done(): + if err != nil { + logrus.WithError(err).WithField("index", index).Error("Failed to await link by index") + } + return dummy, ctx.Err() + } +} diff --git a/test_env.sh b/test_env.sh index 02b4356..ebbb22d 100755 --- a/test_env.sh +++ b/test_env.sh @@ -3,11 +3,11 @@ BRIDGE=net-dhcp BRIDGE_IP="10.123.0.1" DUMMY_IP="10.123.0.3" MASK="24" -DHCP_RANGE="10.123.0.5,10.123.0.254" +DHCP_RANGE="10.123.0.5,10.123.0.254,10s" BRIDGE_IP6="fd69::1" DUMMY_IP6="fd69::3" MASK6="64" -DHCP6_RANGE="fd69::5,fd69::1000,64" +DHCP6_RANGE="fd69::5,fd69::1000,64,10s" DOMAIN=cool-dhcp quit() { @@ -29,7 +29,7 @@ ip route add fd42::0/64 dev "$BRIDGE" # router advertisement stuff is done :/ #ip route add fd43::0/64 via "$DUMMY_IP6" -dnsmasq --no-daemon --conf-file=/dev/null --dhcp-leasefile=/dev/null \ +dnsmasq --no-daemon --conf-file=/dev/null --dhcp-leasefile=/tmp/docker-net-dhcp.leases \ --port=0 --interface="$BRIDGE" --bind-interfaces \ --domain="$DOMAIN" \ --dhcp-range="$DHCP_RANGE" \