K8S CRD及控制器实战
发布于 3 年前 作者 jie47 3982 次浏览 来自 分享

https://www.baidu.com

前言

最近往 k8s 环境搬迁服务时,经常遇到这么个现象:服务间存在依赖启动顺序。因此考虑用 CRD 实现一个带前置服务检测功能的 deployment

实现之前需串联了解相关知识:


  1. 使用 CRD 扩展 K8S API
  2. 了解声明式设计模式
  3. 使用 Controller 管理资源行为

使用 CRD 扩展 K8S API

定义CRD

定义CRD实例

了解声明式设计模式

声明式设计,是一种编程范式,与命令式相对立。它描述目标的性质,让计算机明白目标,而非流程


  • 命令式: 直接发出系统需要执行的命令,基于 R&D 机制

声明式 R&D机制


  • 声明式: 不直接发出需要执行的命令, 而是描述所需对象的状态, 系统会不断向该状态驱动

  • 声明式设计的特点

  • 一次能处理多个写操作
  • 可对“实际状态”和“期望状态”的调谐(Reconcile)
  • 不用担心命令式中发生漏执行的情况

解决的问题


  • 命令式发生漏执行重复执行可能是致命的
  • 需要人为的去控制依赖
  • 命令式无法处理多写情况
  • 命令式难以迁移, 易出错



使用 Controller 管理资源行为

控制器

在 K8S 中,控制器会持续监控集群的状态,并当前状态与期望状态不一致时,对集群做适当变动以向期望状态驱动

控制器原理

Informer

list&watch

Informer 使用 Reflector 包来监听 API 对象的变化, Reflector 使用的是一种 ListAndWatch 的方法

list 和 watch 是两个接口, 通过 HTTP 访问, list 返回对象的 List, watch 一般是一个长连接, 监听对象相关事件, 当有事件时, 返回一个 WatchEvent

list 返回全量, watch 返回增量

Controller Loop

控制器looper原理

实现一个带前置服务检测功能的 deployment

控制器实现流程

准备代码框架,定义基础文件

定义 waitdeployment crd 数据结构

通过 k8s code generator 工具自动生成基础代码

编写 controller 业务逻辑代码

部署 controller 服务到 k8s 集群

// WaitDeployment customize deployment resource definition
type Waitdeployment struct {
 metav1.TypeMeta   `json:",inline"`
 metav1.ObjectMeta `json:"metadata,omitempty"`
 Spec              appv1.DeploymentSpec   `json:"spec,omitempty"`
 Status            appv1.DeploymentStatus `json:"status,omitempty"`
 WaitProbe         WaitProbe              `json:"waitProbe,omitempty"`
}

type WaitProbe struct {
 Address string        `json:"address,omitempty"`
 Timeout time.Duration `json:"timeout,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type WaitdeploymentList struct {
 metav1.TypeMeta `json:",inline"`
 metav1.ListMeta `json:"metadata,omitempty"`

 Items []Waitdeployment `json:"items"`
}

  1. 准备代码框架,定义基础文件

代码框架

2.定义 waitdeployment crd 数据结构

type.go:自定义资源的具体数据结构定义

type Client struct {
 ctx context.Context

 kubeClient         kubernetes.Interface
 customClient       clientSet.Interface
 kubeFactory        kubeInformers.SharedInformerFactory
 customFactory      externalInformer.SharedInformerFactory
 deploymentInformer informerAppsV1.DeploymentInformer
 customInformer     qboxInformer.WaitdeploymentInformer

 deploymentsLister     appsListers.DeploymentLister
 deploymentsSynced     cache.InformerSynced
 waitDeploymentsLister qboxListers.WaitdeploymentLister
 waitDeploymentsSynced cache.InformerSynced

 // workqueue is a rate limited work queue. This is used to queue work to be
 // processed instead of performing it as soon as a change happens. This
 // means we can ensure we only process a fixed amount of resources at a
 // time, and makes it easy to ensure we are never processing the same item
 // simultaneously in two different workers.
 workqueue workqueue.RateLimitingInterface
 // recorder is an event recorder for recording Event resources to the
 // Kubernetes API.
 recorder record.EventRecorder
}

3.通过 code generator 工具自动生成基础代码

基础代码

zz_generated. deepcopy.go:一系列「deepcopy」方法,实现 CRD 的复制功能

clientset:是使用 CRD 的 SDK ,对外暴露接口

listen:从集群监听 CRD 对象变化事件

informers:观察 CRD 变化并提供 Handler 方法入口

4.编写 controller 业务逻辑代码

核心逻辑:首先查找目标资源有没有,没有就判断配置的前置服务地址是否可以建立 TCP 链接创建。可以则创建 deployment,否则返回失败等待下次执行

func (c *Client) checkAndStartDeployment(waitDeployment *v1alpha1.Waitdeployment) (*appsV1.Deployment, error) {
 err := doTCPProbe(waitDeployment.WaitProbe.Address, waitDeployment.WaitProbe.Timeout)
 if err != nil {
  return nil, err
 }
 deployment, err := c.deploymentsLister.Deployments(waitDeployment.Namespace).Get(waitDeployment.Name)
 if errors.IsNotFound(err) {
  klog.Infof("Waitdeployment not exist, create a new deployment %s in namespace %s", waitDeployment.Name, waitDeployment.Namespace)
  deployment, err = c.kubeClient.AppsV1().Deployments(waitDeployment.Namespace).Create(c.ctx, newDeployment(waitDeployment), metav1.CreateOptions{})
 }
 if err != nil {
  return nil, err
 }
 return deployment, nil
}

// tcp 连接检测代码
func doTCPProbe(addr string, timeout time.Duration) error {
 conn, err := net.DialTimeout("tcp", addr, timeout)
 if err != nil {
  return err
 }
 err = conn.Close()
 if err != nil {
  klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
 }
 return nil
}

5.部署 controller 服务到 k8s 集群

调试时,可以直接在配置好 kubeconfig 后在 IDE 运行

效果演示

发布二进制后可以直接运行

二进制放入容器,通过 deployment 运行或者直接以静态 pod 形式启动

参考

CRD 官方文档

Controller 官方文档

从零编写一个 CRD

K8S 控制器模式

Controller 实践

求贤若渴 Join us

我们团队正在招聘

测试开发工程师,详情点击:

https://jobs.qiniu.com/#/job/0e512719-8494-4dc0-b0e3-d8b6cd8b1914

工程效率研发工程师,详情点击:

https://jobs.qiniu.com/#/job/1c0af0dc-0937-47bf-b640-fd6f6feefd7e

欢迎大家发简历至 [email protected],标题推荐:【内推-姓名-岗位】

回到顶部