Add DHCP client in container

This commit is contained in:
Jack O'Sullivan 2019-08-25 17:49:07 +01:00
parent 02d5109ced
commit aa269d25c2
3 changed files with 116 additions and 48 deletions

View File

@ -19,19 +19,13 @@
"options": [ "options": [
"bind" "bind"
] ]
},
{
"source": "/var/run/docker/netns",
"destination": "/var/run/docker/netns",
"type": "bind",
"options": [
"bind"
]
} }
], ],
"pidhost": true,
"linux": { "linux": {
"capabilities": [ "capabilities": [
"CAP_NET_ADMIN" "CAP_NET_ADMIN",
"CAP_SYS_ADMIN"
] ]
}, },
"env": [ "env": [

View File

@ -3,6 +3,7 @@ import ipaddress
import logging import logging
import atexit import atexit
import socket import socket
import threading
import pyroute2 import pyroute2
from pyroute2.netlink.rtnl import rtypes from pyroute2.netlink.rtnl import rtypes
@ -27,13 +28,10 @@ client = docker.from_env()
def close_docker(): def close_docker():
client.close() client.close()
host_dhcp_clients = {} gateway_hints = {}
container_dhcp_clients = {} container_dhcp_clients = {}
@atexit.register @atexit.register
def cleanup_dhcp(): def cleanup_dhcp():
for endpoint, dhcp in host_dhcp_clients.items():
logger.warning('cleaning up orphaned host DHCP client (endpoint "%s")', endpoint)
dhcp.finish(timeout=1)
for endpoint, dhcp in container_dhcp_clients.items(): for endpoint, dhcp in container_dhcp_clients.items():
logger.warning('cleaning up orphaned container DHCP client (endpoint "%s")', endpoint) logger.warning('cleaning up orphaned container DHCP client (endpoint "%s")', endpoint)
dhcp.finish(timeout=1) dhcp.finish(timeout=1)
@ -58,6 +56,17 @@ def net_bridge(n):
return ndb.interfaces[client.networks.get(n).attrs['Options'][OPT_BRIDGE]] return ndb.interfaces[client.networks.get(n).attrs['Options'][OPT_BRIDGE]]
def ipv6_enabled(n): def ipv6_enabled(n):
return client.networks.get(n).attrs['EnableIPv6'] return client.networks.get(n).attrs['EnableIPv6']
def endpoint_container_iface(n, e):
for cid, info in client.networks.get(n).attrs['Containers'].items():
if info['EndpointID'] == e:
container = client.containers.get(cid)
netns = f'/proc/{container.attrs["State"]["Pid"]}/ns/net'
ndb.sources.add(netns=netns)
for i in ndb.interfaces:
if i['address'] == info['MacAddress']:
return i
break
return None
@app.route('/NetworkDriver.GetCapabilities', methods=['POST']) @app.route('/NetworkDriver.GetCapabilities', methods=['POST'])
def net_get_capabilities(): def net_get_capabilities():
@ -87,12 +96,14 @@ def delete_net():
@app.route('/NetworkDriver.CreateEndpoint', methods=['POST']) @app.route('/NetworkDriver.CreateEndpoint', methods=['POST'])
def create_endpoint(): def create_endpoint():
req = request.get_json(force=True) req = request.get_json(force=True)
network_id = req['NetworkID']
endpoint_id = req['EndpointID']
req_iface = req['Interface'] req_iface = req['Interface']
bridge = net_bridge(req['NetworkID']) bridge = net_bridge(network_id)
bridge_addrs = iface_addrs(bridge) bridge_addrs = iface_addrs(bridge)
if_host, if_container = veth_pair(req['EndpointID']) if_host, if_container = veth_pair(endpoint_id)
logger.info('creating veth pair %s <=> %s', if_host, if_container) logger.info('creating veth pair %s <=> %s', if_host, if_container)
if_host = (ndb.interfaces.create(ifname=if_host, kind='veth', peer=if_container) if_host = (ndb.interfaces.create(ifname=if_host, kind='veth', peer=if_container)
.set('state', 'up') .set('state', 'up')
@ -125,16 +136,16 @@ def create_endpoint():
if addr.ip == bridge_addr.ip: if addr.ip == bridge_addr.ip:
raise NetDhcpError(400, f'Address {addr} is already in use on bridge {bridge["ifname"]}') raise NetDhcpError(400, f'Address {addr} is already in use on bridge {bridge["ifname"]}')
elif type_ == 'v4': elif type_ == 'v4':
dhcp = udhcpc.DHCPClient(if_container['ifname']) dhcp = udhcpc.DHCPClient(if_container, once=True)
addr = dhcp.await_ip(timeout=10) addr = dhcp.finish()
res_iface['Address'] = str(addr) res_iface['Address'] = str(addr)
host_dhcp_clients[req['EndpointID']] = dhcp gateway_hints[endpoint_id] = dhcp.gateway
else: else:
raise NetDhcpError(400, f'DHCPv6 is currently unsupported') raise NetDhcpError(400, f'DHCPv6 is currently unsupported')
logger.info('Adding address %s to %s', addr, if_container['ifname']) logger.info('Adding address %s to %s', addr, if_container['ifname'])
try_addr('v4') try_addr('v4')
if ipv6_enabled(req['NetworkID']): if ipv6_enabled(network_id):
try_addr('v6') try_addr('v6')
res = jsonify({ res = jsonify({
@ -203,12 +214,11 @@ def join():
}, },
'StaticRoutes': [] 'StaticRoutes': []
} }
if endpoint in host_dhcp_clients: if endpoint in gateway_hints:
dhcp = host_dhcp_clients[endpoint] gateway = gateway_hints[endpoint]
logger.info('Setting IPv4 gateway from DHCP (%s)', dhcp.gateway) logger.info('Setting IPv4 gateway from DHCP (%s)', gateway)
res['Gateway'] = str(dhcp.gateway) res['Gateway'] = str(gateway)
dhcp.finish(timeout=1) del gateway_hints[endpoint]
del host_dhcp_clients[endpoint]
ipv6 = ipv6_enabled(req['NetworkID']) ipv6 = ipv6_enabled(req['NetworkID'])
for route in bridge.routes: for route in bridge.routes:
@ -236,3 +246,37 @@ def join():
@app.route('/NetworkDriver.Leave', methods=['POST']) @app.route('/NetworkDriver.Leave', methods=['POST'])
def leave(): def leave():
return jsonify({}) return jsonify({})
# ProgramExternalActivity is supposed to be used for port forwarding etc.,
# but we can use it to start the DHCP client in the container's network namespace
# since the interface will have been moved inside at this point. Trying to grab
# the contaienr's attributes (to get the network namespace) will deadlock, so
# we must defer starting the DHCP client
@app.route('/NetworkDriver.ProgramExternalConnectivity', methods=['POST'])
def start_container_dhcp():
req = request.get_json(force=True)
endpoint = req['EndpointID']
def _deferred():
iface = endpoint_container_iface(req['NetworkID'], endpoint)
dhcp = udhcpc.DHCPClient(iface)
container_dhcp_clients[endpoint] = dhcp
logger.info('Starting DHCP client on %s in container namespace %s', iface['ifname'], dhcp.netns)
threading.Thread(target=_deferred).start()
return jsonify({})
@app.route('/NetworkDriver.RevokeExternalConnectivity', methods=['POST'])
def stop_container_dhcp():
req = request.get_json(force=True)
endpoint = req['EndpointID']
if endpoint in container_dhcp_clients:
dhcp = container_dhcp_clients[endpoint]
logger.info('Shutting down DHCP client on %s in container namespace %s', dhcp.iface['ifname'], dhcp.netns)
dhcp.finish(timeout=1)
ndb.sources.remove(dhcp.netns)
del container_dhcp_clients[endpoint]
return jsonify({})

View File

@ -1,9 +1,12 @@
from enum import Enum from enum import Enum
import ipaddress import ipaddress
import os
from os import path from os import path
import fcntl
import time import time
import threading import threading
import subprocess import subprocess
import signal
import logging import logging
from pyroute2.netns.process.proxy import NSPopen from pyroute2.netns.process.proxy import NSPopen
@ -22,64 +25,91 @@ class DHCPClientError(Exception):
pass pass
def _nspopen_wrapper(netns): def _nspopen_wrapper(netns):
return lambda *args, **kwargs: NSPopen(netns, *args, **kwargs) def _wrapper(*args, **kwargs):
# We have to set O_NONBLOCK on stdout since NSPopen uses a global lock
# on the object (e.g. deadlock if we try to readline() and terminate())
proc = NSPopen(netns, *args, **kwargs)
proc.stdout.fcntl(fcntl.F_SETFL, os.O_NONBLOCK)
return proc
return _wrapper
class DHCPClient: class DHCPClient:
def __init__(self, iface, netns=None, once=False, event_listener=lambda t, ip, gw, dom: None): def __init__(self, iface, once=False, event_listener=None):
self.netns = netns
self.iface = iface self.iface = iface
self.once = once self.once = once
self.event_listener = event_listener self.event_listeners = [DHCPClient._attr_listener]
if event_listener:
self.event_listeners.append(event_listener)
Popen = _nspopen_wrapper(netns) if netns else subprocess.Popen self.netns = None
cmdline = ['/sbin/udhcpc', '-s', HANDLER_SCRIPT, '-i', iface, '-f'] if iface['target'] and iface['target'] != 'localhost':
self.netns = iface['target']
logger.debug('udhcpc using netns %s', self.netns)
Popen = _nspopen_wrapper(self.netns) if self.netns else subprocess.Popen
cmdline = ['/sbin/udhcpc', '-s', HANDLER_SCRIPT, '-i', iface['ifname'], '-f']
cmdline.append('-q' if once else '-R') cmdline.append('-q' if once else '-R')
self.proc = Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8') self.proc = Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8')
self.ip = None self.ip = None
self.gateway = None self.gateway = None
self.domain = None self.domain = None
self._running = True
self._event_thread = threading.Thread(target=self._read_events) self._event_thread = threading.Thread(target=self._read_events)
self._event_thread.start() self._event_thread.start()
def _attr_listener(self, event_type, args):
if event_type not in (EventType.BOUND, EventType.RENEW):
return
self.ip = ipaddress.ip_interface(args[0])
self.gateway = ipaddress.ip_address(args[1])
self.domain = args[2]
def _read_events(self): def _read_events(self):
while True: while self._running:
line = self.proc.stdout.readline() line = self.proc.stdout.readline().strip()
if not line: if not line:
break # stdout will be O_NONBLOCK if udhcpc is in a netns
if self.netns and self._running:
time.sleep(0.1)
continue
if not line.startswith(INFO_PREFIX): if not line.startswith(INFO_PREFIX):
logger.debug('[udhcpc] %s', line) logger.debug('[udhcpc#%d] %s', self.proc.pid, line)
continue continue
args = line.split(' ')[1:] args = line.split(' ')[1:]
try: try:
event_type = EventType(args[0]) event_type = EventType(args[0])
except ValueError: except ValueError:
logger.warning('udhcpc unknown event "%s"', ' '.join(args)) logger.warning('udhcpc#%d unknown event "%s"', self.proc.pid, args)
continue continue
self.ip = ipaddress.ip_interface(args[1]) logger.debug('[udhcp#%d event] %s %s', self.proc.pid, event_type, args[1:])
self.gateway = ipaddress.ip_address(args[2]) for listener in self.event_listeners:
self.domain = args[3] listener(self, event_type, args[1:])
logger.debug('[udhcp event] %s %s %s %s', event_type, self.ip, self.gateway, self.domain)
self.event_listener(event_type, self.ip, self.gateway, self.domain)
def await_ip(self, timeout=5): def await_ip(self, timeout=5):
# TODO: this bad # TODO: this bad
waited = 0 start = time.time()
while not self.ip: while not self.ip:
if waited >= timeout: if time.time() - start > timeout:
raise DHCPClientError('Timed out waiting for dhcp lease') raise DHCPClientError('Timed out waiting for dhcp lease')
time.sleep(AWAIT_INTERVAL) time.sleep(AWAIT_INTERVAL)
waited += AWAIT_INTERVAL
return self.ip return self.ip
def finish(self, timeout=5): def finish(self, timeout=5):
if not self.once: if self.once:
self.await_ip()
else:
self.proc.terminate() self.proc.terminate()
if self.proc.wait(timeout=timeout) != 0: if self.proc.wait(timeout=timeout) != 0:
raise DHCPClientError(f'udhcpc exited with non-zero exit code {self.proc.returncode}') raise DHCPClientError(f'udhcpc exited with non-zero exit code {self.proc.returncode}')
if self.netns:
self.proc.release()
self._running = False
self._event_thread.join() self._event_thread.join()
if self.once and not self.ip: return self.ip
raise DHCPClientError(f'Timed out waiting for dhcp lease')