Please enable Javascript to view the contents

源码分析 Kubernetes 对 Pod IP 的管理

 ·  ☕ 9 分钟

1. kube-controller-manager 对网段的管理

在 kube-controller-manager 有众多控制器,与 Pod IP 相关的是 NodeIpamController。

NodeIpamController 控制器主要是管理节点的 podcidr,当有新节点加入集群时,分配一个子网段给节点;当节点删除时,回收子网段。

每个节点的子网段不会重叠,每个节点都能够独立地完成 Pod IP 的分配。

下面看一个 kube-controller-manager 的运行示例:

1
kubectl -n kube-system get pod kube-controller-manager -o yaml

其中关于网段配置的部分为:

1
2
3
4
5
6
7
8
spec:
  containers:
    - command:
        - kube-controller-manager
        - --allocate-node-cidrs=true
        - --cluster-cidr=10.234.0.0/16
        - --node-cidr-mask-size=24
        - --service-cluster-ip-range=10.96.0.0/16

cluster-cidr 指定了 Pod IP 的范围,掩码位数 16,如果不考虑保留 IP,意味着集群最多可以容纳 2^16 = 65536 个 pod。

这些 Pod 分布在若干个节点上,接着看 node-cidr-mask-size 为 24,每个节点只剩下 32-24=8 位留给 pod,每个节点最多能创建 2^8=256 个 pod。

相应的,这个集群能够容纳的节点数量为 2^(32-16-8)=256 个节点。

在规划集群时,需要根据集群的规模来调整这两个参数。

开启 allocate-node-cidrs、设置 cluster-cidr 之后,kube-controller-manager 会给每个节点分配子网段,将结果写入 spec.podCIDR 字段。

1
2
3
4
spec:
  podCIDR: 10.234.58.0/24
  podCIDRs:
    - 10.234.58.0/24

下面我们从源码分析一下这一过程。

1. 启动 NodeIpamController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	// 如果 allocate-node-cidrs 没有开启会立即返回
	if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
		return nil, false, nil
	}

	// 获取 clusterCIDR, serviceCIDR 启动 NodeIpamController
	nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
		ctx,
		controllerContext.InformerFactory.Core().V1().Nodes(),
		clusterCIDRInformer,
		controllerContext.Cloud,
		controllerContext.ClientBuilder.ClientOrDie("node-controller"),
		clusterCIDRs,
		serviceCIDR,
		secondaryServiceCIDR,
		nodeCIDRMaskSizes,
		ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
	)
	go nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics)
	return nil, true, nil
}

RunWithMetrics 只是提供了一些监控指标,真正的启动逻辑在 Run 方法中。

1
2
3
4
5
func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
	controllerManagerMetrics.ControllerStarted("nodeipam")
	defer controllerManagerMetrics.ControllerStopped("nodeipam")
	nc.Run(ctx)
}
1
2
3
4
5
6
7
8
9
func (nc *Controller) Run(ctx context.Context) {
	if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
		go nc.legacyIPAM.Run(ctx)
	} else {
		go nc.cidrAllocator.Run(ctx)
	}

	<-ctx.Done()
}

1.2 监听节点变化

在查找 cidrAllocator 接口实现的时候,我发现了三种 CIDR 分配器,分别是 RangeAllocator 适用单网段分配、MultiCIDRRangeAllocator 适用于多 CIDR、CloudCIDRAllocator 适用于对接云厂。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func New(ctx context.Context, kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, clusterCIDRInformer networkinginformers.ClusterCIDRInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
	switch allocatorType {
	case RangeAllocatorType:
		return NewCIDRRangeAllocator(logger, kubeClient, nodeInformer, allocatorParams, nodeList)
	case MultiCIDRRangeAllocatorType:
		if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
			return nil, fmt.Errorf("invalid CIDR allocator type: %v, feature gate %v must be enabled", allocatorType, features.MultiCIDRRangeAllocator)
		}
		return NewMultiCIDRRangeAllocator(ctx, kubeClient, nodeInformer, clusterCIDRInformer, allocatorParams, nodeList, nil)

	case CloudAllocatorType:
		return NewCloudCIDRAllocator(logger, kubeClient, cloud, nodeInformer)
	default:
		return nil, fmt.Errorf("invalid CIDR allocator type: %v", allocatorType)
	}
}

这里看看 RangeAllocator 的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func NewCIDRRangeAllocator(logger klog.Logger, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
			return ra.AllocateOrOccupyCIDR(logger, node)
		}),
		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
			if len(newNode.Spec.PodCIDRs) == 0 {
				return ra.AllocateOrOccupyCIDR(logger, newNode)
			}
			return nil
		}),
		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
			return ra.ReleaseCIDR(logger, node)
		}),
	})

	return ra, nil
}

其实 RangeAllocator 分配器的实现与写 Operator 时的控制器类似,都是通过 informer 来监听资源的变化,然后调用相应的方法。

1.3 更新节点的 podCIDR

这里比较特殊的是,控制器并不是直接操作资源,而是将变更放到了一个 channel 中,然后通过 goroutine 处理状态更新。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
	allocated := nodeReservedCIDRs{
		nodeName:       node.Name,
		allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
	}

	for idx := range r.cidrSets {
		podCIDR, err := r.cidrSets[idx].AllocateNext()
		allocated.allocatedCIDRs[idx] = podCIDR
	}
	// 将更新的内容放入 channel 中
	r.nodeCIDRUpdateChannel <- allocated
	return nil
}

nodeCIDRUpdateChannel 的长度是 5000。

1
2
	cidrUpdateQueueSize = 5000
	nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),

而更新 Node Spec 的逻辑是通过 30 个 goroutine 来处理。

1
2
3
4
	const cidrUpdateWorkers untyped int = 30
	for i := 0; i < cidrUpdateWorkers; i++ {
		go r.worker(ctx)
	}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (r *rangeAllocator) worker(ctx context.Context) {
	logger := klog.FromContext(ctx)
	for {
		select {
		case workItem, ok := <-r.nodeCIDRUpdateChannel:
			if !ok {
				logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
				return
			}
			if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
				// Requeue the failed node for update again.
				r.nodeCIDRUpdateChannel <- workItem
			}
		case <-ctx.Done():
			return
		}
	}
}

cidrUpdateRetries = 3 这里会重试 3 次更新,如果一直更新失败,会将节点重新放入 channel 中,等待下次更新。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
	// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
	for i := 0; i < cidrUpdateRetries; i++ {
		if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
			logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
			return nil
		}
	}
	// 放回 pool 中
	controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
}

使用 Patch 方法更新节点对象的 Spec 字段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error {
	// set the Pod cidrs list and set the old Pod cidr field
	patch := nodeForCIDRMergePatch{
		Spec: nodeSpecForMergePatch{
			PodCIDR:  cidrs[0],
			PodCIDRs: cidrs,
		},
	}

	patchBytes, err := json.Marshal(&patch)
	if err != nil {
		return fmt.Errorf("failed to json.Marshal CIDR: %v", err)
	}
	if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
		return fmt.Errorf("failed to patch node CIDR: %v", err)
	}
	return nil
}

2. kubelet 对网络的配置

上图是 Kubelet 创建 Pod 的过程,这里截取其中对网络配置的部分进行分析:

  1. Pod 调度到某个节点上
  2. kubelet 通过 cri 调用 container runtime 创建 sandbox
  3. container runtime 创建 sandbox
  4. container runtime 调用 cni 创建 Pod 网络
  5. IPAM 对 Pod IP 的管理

下面从源码实现的角度来看看这个过程。

2.1 Pod 调度到某个节点上

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: demo
    pod-template-hash: 7b9b5cf76b
  name: demo-7b9b5cf76b-5lpmj
  namespace: default
spec:
  containers:
    - image: hubimage/demo-ubuntu
  nodeName: node1

Kubernetes 中调度的过程是 kube-scheduler 根据 Pod 的资源需求和节点的资源情况,将 Pod 调度到某个节点上,并将调度结果写入 pod.spec.nodeName 字段。

这部分不是网络的重点,之前我也在生产环境下定制过调度器,感兴趣的话可以看看 Tekton 优化之定制集群调度器

2.2 kubelet 调用 cri 创建 sandbox

SyncPod 是 kubelet 中的核心方法,它会根据 Pod 的状态,调用 cri 创建或删除 pod。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// SyncPod syncs the running Pod into the desired Pod by executing following steps:
//
// 1.计算沙箱和容器变化。
// 2. 必要时关闭 Pod 沙箱。
// 3. 关闭任何不应运行的容器。
// 4.必要时创建沙箱。
// 5.创建 ephemeral 容器。
// 6. 创建 init 容器。
// 7. 调整运行容器的大小(如果 InPlacePodVerticalScaling==true)
// 8. 创建正常容器
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, Pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
		// Step 4: Create a sandbox for the Pod if necessary.
		podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
}

调用 RuntimeService 接口的 RunPodSandbox 方法创建 sandbox。

1
2
3
// createPodSandbox creates a Pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, Pod *v1.Pod, attempt uint32) (string, string, error) {
	podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)

经过 runtimeService、instrumentedRuntimeService 接口的封装,最终会调用 remoteRuntimeService 的 RunPodSandbox 方法。

1
2
3
4
5
6
7
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
		Config:         config,
		RuntimeHandler: runtimeHandler,
	})

这里的 runtimeClient 是一个 rpc client,通过 rpc 调用 container runtime 创建 sandbox。

2.3 container runtime 创建 sandbox

以 containerd 为例,创建 sandbox:

1
2
3
4
5
6
7
func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
	if err := in.checkInitialized(); err != nil {
		return nil, err
	}
	res, err = in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
	return res, errdefs.ToGRPC(err)
}

调用 CNI 创建网络,创建 sandbox。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
	// 生成 sandbox id
	id := util.GenerateID()
	metadata := config.GetMetadata()
	name := makeSandboxName(metadata)

	// 获取 sandbox 的 oci 运行时
	ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())
	sandboxInfo.Runtime.Name = ociRuntime.Type
	sandboxInfo.Sandboxer = ociRuntime.Sandboxer

	// 创建 sandbox 对象
	sandbox := sandboxstore.NewSandbox(
		sandboxstore.Metadata{
			ID:             id,
			Name:           name,
			Config:         config,
			RuntimeHandler: r.GetRuntimeHandler(),
		},
		sandboxstore.Status{
			State: sandboxstore.StateUnknown,
		},
	)

	// 调用 CNI 插件,创建 sandbox 的网络
	if !hostNetwork(config) && !userNsEnabled {
		var netnsMountDir = "/var/run/netns"
		sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)
		// Save sandbox metadata to store
		if err := c.setupPodNetwork(ctx, &sandbox); err != nil {
			return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)
		}
	}

	// 创建 sandbox
	err = c.nri.RunPodSandbox(ctx, &sandbox)
}

2.4 container runtime 调用 cni 创建 Pod 网络

在上一步骤中,调用 RunPodSandbox 创建 sandbox 之前,会先调用 setupPodNetwork 配置网络。这里展开看一下 setupPodNetwork 的实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (c *criService) setupPodNetwork(ctx context.Context, sandbox *sandboxstore.Sandbox) error {
	var (
		id        = sandbox.ID
		config    = sandbox.Config
		path      = sandbox.NetNSPath
		netPlugin = c.getNetworkPlugin(sandbox.RuntimeHandler)
		err       error
		result    *cni.Result
	)
	if c.config.CniConfig.NetworkPluginSetupSerially {
		result, err = netPlugin.SetupSerially(ctx, id, path, opts...)
	} else {
		result, err = netPlugin.Setup(ctx, id, path, opts...)
	}
}

libcni 实现了 netPlugin 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// containerd/go-cni/cni.go
func (c *libcni) Setup(ctx context.Context, id string, path string, opts ...NamespaceOpts) (*Result, error) {
	if err := c.Status(); err != nil {
		return nil, err
	}
	// 建一个新的网络命名空间
	ns, err := newNamespace(id, path, opts...)
	if err != nil {
		return nil, err
	}
	// 调用 CNI 插件
	result, err := c.attachNetworks(ctx, ns)
	if err != nil {
		return nil, err
	}
	return c.createResult(result)
}

attachNetworks 起了很多协程,每个协程调用 asynchAttach 方法,asynchAttach 方法调用 Attach 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *libcni) attachNetworks(ctx context.Context, ns *Namespace) ([]*types100.Result, error) {
	var wg sync.WaitGroup
	var firstError error
	results := make([]*types100.Result, len(c.Networks()))
	rc := make(chan asynchAttachResult)

	for i, network := range c.Networks() {
		wg.Add(1)
		go asynchAttach(ctx, i, network, ns, &wg, rc)
	}

	for range c.Networks() {
		rs := <-rc
		if rs.err != nil && firstError == nil {
			firstError = rs.err
		}
		results[rs.index] = rs.res
	}
	wg.Wait()

	return results, firstError
}

运行了很多协程调用 CNI,但 rc channel 的长度为 1,处理结果时却一个一个的。

1
2
3
4
5
func asynchAttach(ctx context.Context, index int, n *Network, ns *Namespace, wg *sync.WaitGroup, rc chan asynchAttachResult) {
	defer wg.Done()
	r, err := n.Attach(ctx, ns)
	rc <- asynchAttachResult{index: index, res: r, err: err}
}

Attach 方法中才真正开始调用 CNI 插件。

1
2
3
4
5
6
7
func (n *Network) Attach(ctx context.Context, ns *Namespace) (*types100.Result, error) {
	r, err := n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))
	if err != nil {
		return nil, err
	}
	return types100.NewResultFromResult(r)
}

https://github.com/containernetworking/cni/blob/main/libcni/api.go 中 CNI 接口定义了很多方法,其中最重要的是 AddNetwork 和 DelNetwork 方法,带 List 的方法是批量操作。

1
2
3
4
5
6
type CNI interface {
	AddNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
	AddNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
	DelNetworkList(ctx context.Context, net *NetworkConfigList, rt *RuntimeConf) error
	DelNetwork(ctx context.Context, net *NetworkConfig, rt *RuntimeConf) error
}

AddNetwork 用于为容器添加网络接口,在主机上创建 veth 网卡绑定到容器的 ech0 网卡上。DelNetwork 用于在容器删除时,清理容器相关的网络配置。

CNI 调用插件的核心是 Exec 接口,直接调用二进制程序。

1
2
3
4
5
type Exec interface {
	ExecPlugin(ctx context.Context, pluginPath string, stdinData []byte, environ []string) ([]byte, error)
	FindInPath(plugin string, paths []string) (string, error)
	Decode(jsonBytes []byte) (version.PluginInfo, error)
}

CRI 以标准输入、环境变量的形式将网络配置信息传递给 CNI 插件。CNI 插件处理完成之后,将网络配置信息写入到标准输出中,CRI 将标准输出中的网络配置信息解析出来,写入到容器的网络配置文件中。

再回到 container runtime 的实现 containerd:

1
2
3
4
5
/usr/bin/containerd config  dump |grep cni

    [plugins."io.containerd.grpc.v1.cri".cni]
      bin_dir = "/opt/cni/bin"
      conf_dir = "/etc/cni/net.d"

这里的 /etc/cni/net.d 是 CNI 网络配置文件的默认存放路径,/opt/cni/bin 是 CNI 网络插件的默认搜索路径。

1
2
3
4
ls /opt/cni/bin

bandwidth  calico       cilium-cni  firewall  host-device  install  loopback  portmap  sbr     tuning  vrf
bridge     calico-IPAM  dhcp        flannel   host-local   ipvlan   macvlan   ptp      static  vlan
1
2
3
4
5
6
7
cat /etc/cni/net.d/05-cilium.conf
{
  "cniVersion": "0.3.1",
  "name": "cilium",
  "type": "cilium-cni",
  "enable-debug": false
}

这些配置用来初始化 CRI 获取 CNI 插件的 netPlugin map[string]cni.CNI 结构。

2.5 IPAM 对 Pod IP 的管理

IPAM 是 IP Address Management 的缩写,负责为容器分配 ip 地址。IPAM 组件通常是一个独立的二进制文件,也可以直接由 CNI 插件实现。在 https://github.com/containernetworking/plugins/tree/main/plugins/ipam 中,目前有三种实现 host-local、dhcp、static。 这里以 host-local 为例:

  • 查看 CNI 的配置文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
cat /etc/cni/net.d/10-cni.conflist

{
  "name": "networks",
  "type": "cni",
  "ipam": {
    "type": "host-local",
    "subnet": "10.234.58.0/24",
    "routes": [{ "dst": "0.0.0.0/0" }]
  }
}

指定了 CNI 插件的类型为 host-local,指定了 Pod IP 的网段为 “10.234.58.0/24” 。

  • 查看 CNI 插件的存储目录
1
2
3
ls /var/lib/cni/networks

10.234.58.76  10.234.58.87  last_reserved_ip.0  lock
1
2
3
cat 10.234.58.76

b3b668af977bbeca6853122514044865793c056e81cccebf115dacffd25a8bcc

这里有一组以 ip 命名的文件,而文件里面又是一串字符串。那么这些到底是什么呢?

  • 以 ip 命名的文件是如何生成的

申请一个 Pod IP 时,先获取一个可用 ip

1
2
3
4
5
func cmdAdd(args *skel.CmdArgs) error {
	for idx, rangeset := range ipamConf.Ranges {
		ipConf, err := allocator.Get(args.ContainerID, args.IfName, requestedIP)
	}
}

获取到可用 ip 之后,先尝试着存储到本地目录文件中

1
2
3
4
5
6
func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
	for {
		reservedIP, gw = iter.Next()
		reserved, err := a.store.Reserve(id, ifname, reservedIP.IP, a.rangeID)
	}
}

直接写本地文件目录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (s *Store) Reserve(id string, ifname string, ip net.IP, rangeID string) (bool, error) {
	fname := GetEscapedPath(s.dataDir, ip.String())

	f, err := os.OpenFile(fname, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0o600)
	if os.IsExist(err) {
		return false, nil
	}
	if _, err := f.WriteString(strings.TrimSpace(id) + LineBreak + ifname); err != nil {
		f.Close()
		os.Remove(f.Name())
		return false, err
	}
}

写入的内容为 strings.TrimSpace(id) + LineBreak + ifname,这里的 id 其实是容器的 id,ifname 是网卡名称,LineBreak 是换行符。

通过 id 在主机上可以找到对应的容器:

1
2
3
docker ps |grep b3b668

b3b668af977b   k8s.gcr.io/pause:3.5                      "/pause"                 6 weeks ago    Up 6 weeks              k8s_POD_xxx-5b795fd7dd-82hrh_kube-system_b127b65c-f0ca-48a7-9020-ada60dfa535a_0
  • last_reserved_ip.0 文件的用途
1
2
3
cat last_reserved_ip.0

10.234.58.87

在获取可用 IP 时,IPAM 会创建一个迭代器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (a *IPAllocator) Get(id string, ifname string, requestedIP net.IP) (*current.IPConfig, error) {
    iter, err := a.GetIter()
    if err != nil {
        return nil, err
    }
    for {
        reservedIP, gw = iter.Next()
        if reservedIP == nil {
            break
        }
    }

而迭代器需要依靠 last_reserved_ip.0 找到上一次分配的 IP,然后从这个 IP 之后开始分配。

1
2
3
4
5
6
7
func (a *IPAllocator) GetIter() (*RangeIter, error) {
	lastReservedIP, err := a.store.LastReservedIP(a.rangeID)
	if err != nil && !os.IsNotExist(err) {
		log.Printf("Error retrieving last reserved ip: %v", err)
	} else if lastReservedIP != nil {
		startFromLastReservedIP = a.rangeset.Contains(lastReservedIP)
	}

这里的 lastIPFilePrefix = “last_reserved_ip.”

1
2
3
4
5
6
7
8
func (s *Store) LastReservedIP(rangeID string) (net.IP, error) {
	ipfile := GetEscapedPath(s.dataDir, lastIPFilePrefix+rangeID)
	data, err := os.ReadFile(ipfile)
	if err != nil {
		return nil, err
	}
	return net.ParseIP(string(data)), nil
}

host-local 分配 ip 时是按照轮询的方式,递增分配,如果分配到最后一个 IP,就又从头开始分配。

  • lock 文件
1
2
3
4
type Store struct {
	*FileLock
	dataDir string
}

每次存储操作都会进行加锁,IP 分配不会并发进行,确保唯一性。

1
2
a.store.Lock()
defer a.store.Unlock()

3. 总结

本篇主要是从 Pod IP 管理的角度,梳理了一下从 kube-controller-manager 到 kubelet 的 Pod IP 管理过程。主要内容如下:

  • kube-controller-manager 通过 NodeIpamController 控制器为每个节点分配 Pod IP 网段,在集群规划时需要根据集群规模调整 cluster-cidr、node-cidr-mask-size 参数
  • kubelet 通过 cri 调用 container runtime 创建 sandbox
  • container runtime 调用 cni 创建 Pod 网络
  • IPAM 对 Pod IP 的管理

在工作中很多熟悉的路径,可能仅仅只是知道大概的流程,不知道具体的实现。通过源码分析,可以更加深入地了解相关的细节,也能学习到新的知识。

比如,在源码中,我看到了 InPlacePodVerticalScaling 这个参数,发现是 Kubernetes 1.27 的一个 alpha feature,可以在不重启 Pod 的情况下,调整 Pod 的资源配置;在写 Operator 更新 CR 状态时,在合适的场景下,可以学习 nodeCIDRUpdateChannel 的实现,将更新的状态放入 channel 中,然后通过 goroutine 处理状态更新。


微信公众号
作者
微信公众号