使用client go实现自定义控制器的方法

目录
  • 介绍
  • 具体实现
  • 测试

介绍

我们已经知道,Service对集群之外暴露服务的主要方式有两种:NodePort和LoadBalancer,但是这两种方式,都有一定的缺点:

  • NodePort方式的缺点是会占用很多集群机器的端口,那么当集群服务变多的时候,这个缺点就愈发明显。
  • LoadBalancer的缺点是每个Service都需要一个LB,浪费,麻烦,并且需要Kubernetes之外的设备的支持。

基于这种现状,Kubernetes提供了Ingress资源对象,Ingress只需要一个NodePort或者一个LB就可以满足暴露多个Service的需求。

客户端首先对 域名 执行 DNS 解析,得到 Ingress Controller 所在节点的 IP,然后客户端向 Ingress Controller 发送 HTTP 请求,然后根据 Ingress 对象里面的描述匹配域名,找到对应的 Service 对象,并获取关联的 Endpoints 列表,将客户端的请求转发给其中一个 Pod。

本文我们来使用client-go实现一个自定义控制器,通过判断serviceAnnotations属性是否包含ingress/http,如果包含则创建ingress,如果不包含则不创建。而且如果存在ingress则进行删除。

具体实现

首先我们创建项目。

$ mkdir ingress-manager && cd ingress-manager
$ go mod init ingress-manager
# 由于控制器部分的内容比较多,将它们单独放到pkg目录下
$ mkdir pkg
# 最终项目目录结构如下
.
├── go.mod
├── go.sum
├── main.go
└── pkg
    └── controller.go

接着我们来实现controller部分:

package pkg
import (
	"context"
	apiCoreV1 "k8s.io/api/core/v1"
	netV1 "k8s.io/api/networking/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	informersCoreV1 "k8s.io/client-go/informers/core/v1"
	informersNetV1 "k8s.io/client-go/informers/networking/v1"
	"k8s.io/client-go/kubernetes"
	coreV1 "k8s.io/client-go/listers/core/v1"
	v1 "k8s.io/client-go/listers/networking/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"reflect"
	"time"
)
const (
	workNum  = 5  // 工作的节点数
	maxRetry = 10 // 最大重试次数
)
// 定义控制器
type Controller struct {
	client        kubernetes.Interface
	ingressLister v1.IngressLister
	serviceLister coreV1.ServiceLister
	queue         workqueue.RateLimitingInterface
}
// 初始化控制器
func NewController(client kubernetes.Interface, serviceInformer informersCoreV1.ServiceInformer, ingressInformer informersNetV1.IngressInformer) Controller {
	c := Controller{
		client:        client,
		ingressLister: ingressInformer.Lister(),
		serviceLister: serviceInformer.Lister(),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
	}
	// 添加事件处理函数
	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.addService,
		UpdateFunc: c.updateService,
	})
	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: c.deleteIngress,
	})
	return c
}
// 入队
func (c *Controller) enqueue(obj interface{}) {
	key, err := cache.MetaNamespaceKeyFunc(obj)
	if err != nil {
		runtime.HandleError(err)
	}
	c.queue.Add(key)
}
func (c *Controller) addService(obj interface{}) {
	c.enqueue(obj)
}
func (c *Controller) updateService(oldObj, newObj interface{}) {
	// todo 比较annotation
	// 这里只是比较了对象是否相同,如果相同,直接返回
	if reflect.DeepEqual(oldObj, newObj) {
		return
	}
	c.enqueue(newObj)
}
func (c *Controller) deleteIngress(obj interface{}) {
	ingress := obj.(*netV1.Ingress)
	ownerReference := metaV1.GetControllerOf(ingress)
	if ownerReference == nil {
		return
	}
	// 判断是否为真的service
	if ownerReference.Kind != "Service" {
		return
	}
	c.queue.Add(ingress.Namespace + "/" + ingress.Name)
}
// 启动控制器,可以看到开了五个协程,真正干活的是worker
func (c *Controller) Run(stopCh chan struct{}) {
	for i := 0; i < workNum; i++ {
		go wait.Until(c.worker, time.Minute, stopCh)
	}
	<-stopCh
}
func (c *Controller) worker() {
	for c.processNextItem() {
	}
}
// 业务真正处理的地方
func (c *Controller) processNextItem() bool {
	// 获取key
	item, shutdown := c.queue.Get()
	if shutdown {
		return false
	}
	defer c.queue.Done(item)
  // 调用业务逻辑
	err := c.syncService(item.(string))
	if err != nil {
    // 对错误进行处理
		c.handlerError(item.(string), err)
		return false
	}
	return true
}

func (c *Controller) syncService(item string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(item)
	if err != nil {
		return err
	}
	// 获取service
	service, err := c.serviceLister.Services(namespace).Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	// 新增和删除
	_, ok := service.GetAnnotations()["ingress/http"]
	ingress, err := c.ingressLister.Ingresses(namespace).Get(name)
	if err != nil && !errors.IsNotFound(err) {
		return err
	}
	if ok && errors.IsNotFound(err) {
		// 创建ingress
		ig := c.constructIngress(service)
		_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metaV1.CreateOptions{})
		if err != nil {
			return err
		}
	} else if !ok && ingress != nil {
		// 删除ingress
		err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{})
		if err != nil {
			return err
		}
	}
	return nil
}
func (c *Controller) handlerError(key string, err error) {
	// 如果出现错误,重新加入队列,最大处理10次
	if c.queue.NumRequeues(key) <= maxRetry {
		c.queue.AddRateLimited(key)
		return
	}
	runtime.HandleError(err)
	c.queue.Forget(key)
}
func (c *Controller) constructIngress(service *apiCoreV1.Service) *netV1.Ingress {
	// 构造ingress
	pathType := netV1.PathTypePrefix
	ingress := netV1.Ingress{}
	ingress.ObjectMeta.OwnerReferences = []metaV1.OwnerReference{
		*metaV1.NewControllerRef(service, apiCoreV1.SchemeGroupVersion.WithKind("Service")),
	}
	ingress.Namespace = service.Namespace
	ingress.Name = service.Name
	ingress.Spec = netV1.IngressSpec{
		Rules: []netV1.IngressRule{
			{
				Host: "example.com",
				IngressRuleValue: netV1.IngressRuleValue{
					HTTP: &netV1.HTTPIngressRuleValue{
						Paths: []netV1.HTTPIngressPath{
							{
								Path:     "/",
								PathType: &pathType,
								Backend: netV1.IngressBackend{
									Service: &netV1.IngressServiceBackend{
										Name: service.Name,
										Port: netV1.ServiceBackendPort{
											Number: 80,
										},
									},
								},
							},
						},
					},
				},
			},
		},
	}
	return &ingress
}

接下来我们来实现main:

package main
import (
	"ingress-manager/pkg"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)
func main() {
	// 获取config
	// 先尝试从集群外部获取,获取不到则从集群内部获取
	var config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		clusterConfig, err := rest.InClusterConfig()
		if err != nil {
			panic(err)
		}
		config = clusterConfig
	}
	// 通过config创建 clientSet
	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	// 通过 client 创建 informer,添加事件处理函数
	factory := informers.NewSharedInformerFactory(clientSet, 0)
	serviceInformer := factory.Core().V1().Services()
	ingressInformer := factory.Networking().V1().Ingresses()
	newController := pkg.NewController(clientSet, serviceInformer, ingressInformer)
	// 启动 informer
	stopCh := make(chan struct{})
	factory.Start(stopCh)
	factory.WaitForCacheSync(stopCh)
	newController.Run(stopCh)
}

测试

首先创建deploy和service:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-nginx
spec:
  selector:
    matchLabels:
      app: my-nginx
  template:
    metadata:
      labels:
        app: my-nginx
    spec:
      containers:
        - name: my-nginx
          image: nginx:1.17.1
          ports:
            - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: my-nginx
  labels:
    app: my-nginx
spec:
  ports:
    - port: 80
      protocol: TCP
      name: http
  selector:
    app: my-nginx

创建完成后进行查看:

$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-nginx          1/1     1            1           7m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    7m

上面的命令我分别获取deploy,service,ingress,但是只获取到了deployservice,这符合我们的预期。接着我们给service/m-nginx中的annotations添加ingress/http: nginx

$ kubectl edit service/my-nginx
apiVersion: v1
kind: Service
metadata:
  annotations:
    ingress/http: nginx
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"my-nginx"},"name":"my-nginx","namespace":"default"},"spec":{"ports":[{"name":"http","port":80,"protocol":"TCP"}],"selector":{"app":"my-nginx"}}}
      ......
service/my-nginx edited

重新进行查看:

$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/demo-deployment   1/1     1            1           41d
deployment.apps/my-nginx          1/1     1            1           11m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    11m
NAME                                 CLASS    HOSTS         ADDRESS   PORTS   AGE
ingress.networking.k8s.io/my-nginx   <none>   example.com             80      19s

接着我们再来测试下,将ingress/http: nginx注释掉,看看ingress是否会自动删除:

$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/demo-deployment   1/1     1            1           41d
deployment.apps/my-nginx          1/1     1            1           19m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    19m

我们发现和我们预期的效果一样。

如果service被删除了,ingress肯定也是不会存在的。这个这里就不多演示了。有兴趣可以自行测试下。

到此这篇关于使用client-go实现自定义控制器的文章就介绍到这了,更多相关client-go自定义控制器内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang在GRPC中设置client的超时时间

    超时 建立连接 主要就2函数Dail和DialContext. // Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) } func DialContext(ctx context.Cont

  • Golang 使用http Client下载文件的实现方法

    之前使用beego的http库的时候,有的情况需要下载文件.beego是能实现,但就是有点问题:不支持回调,没法显示下载速度,这在日常开发中是不可忍受的. 看了下beego的实现主要是使用了io.copy函数,于是就深入的看了下实现原理,发现挺简单的,于是就根据io.copy原理实现的一个简单的下载器 //定义要下载的文件 var durl = "https://dl.google.com/go/go1.10.3.darwin-amd64.pkg"; //解析url uri, err

  • golang使用http client发起get和post请求示例

    golang要请求远程网页,可以使用net/http包中的client提供的方法实现.查看了官方网站有一些示例,没有太全面的例子,于是自己整理了一下: get请求 func httpGet() { resp, err := http.Get("http://www.01happy.com/demo/accept.php?id=1") if err != nil { // handle error } defer resp.Body.Close() body, err := ioutil

  • Go http client 连接池不复用的问题

    当 http client 返回值为不为空,只读取 response header,但不读 body 内容就执行 response.Body.Close(),那么连接会被主动关闭,得不到复用. 测试代码如下: // xiaorui.cc func HttpGet() { for { fmt.Println("new") resp, err := http.Get("http://www.baidu.com") if err != nil { fmt.Println(

  • 使用client-go工具调用kubernetes API接口的教程详解(v1.17版本)

    目录 说明 效果 实现 1.拉取工具源码 2.创建目录结构 查询代码实例 创建deployment资源 更新deployment类型服务 删除deployment类型服务 说明 可以调取k8s API 接口的工具有很多,这里我就介绍下client-go gitlab上client-go项目地址: https://github.com/kubernetes/client-go 这个工具是由kubernetes官方指定维护的,大家可以放心使用 效果 运行完成后,可以直接获取k8s集群信息等 实现 1

  • 使用client go实现自定义控制器的方法

    目录 介绍 具体实现 测试 介绍 我们已经知道,Service对集群之外暴露服务的主要方式有两种:NodePort和LoadBalancer,但是这两种方式,都有一定的缺点: NodePort方式的缺点是会占用很多集群机器的端口,那么当集群服务变多的时候,这个缺点就愈发明显. LoadBalancer的缺点是每个Service都需要一个LB,浪费,麻烦,并且需要Kubernetes之外的设备的支持. 基于这种现状,Kubernetes提供了Ingress资源对象,Ingress只需要一个Node

  • 从生成CRD到编写自定义控制器教程示例

    目录 介绍 CRD定义 生成客户端相关代码 编写控制器 测试 介绍 我们可以使用code-generator 以及controller-tools来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象,以及客户端访问的 ClientSet.Informer.Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器. CRD定义 首先初始化项目: $ mkdir operator-crd && cd operator-crd $ go mod init ope

  • IOS 开发之操作图库自定义控制器

    IOS 开发之操作图库自定义控制器 步骤如下: 新建此类的代理属性必须遵守的协议: 新建PhotoButtonDelegate.h如下: // // PhotoButtonDelegate.h // 作业整理 // // Created by apple on 15/9/16. // Copyright (c) 2015年 LiuXun. All rights reserved. // #import <Foundation/Foundation.h> @class ImageAndPhoto

  • AngularJS基于factory创建自定义服务的方法详解

    本文实例讲述了AngularJS基于factory创建自定义服务的方法.分享给大家供大家参考,具体如下: 为什么要创建自定义服务? 很简单,不想让控制器显得过于"臃肿",而且服务可复用.针对性强,每个服务对应不同的功能. 这里介绍如何使用factory创建自定义服务,并且使用他. 例子1: <!--HTML--> <div ng-controller="showTheName"> <h1 ng-bind="name"

  • AngularJS创建自定义指令的方法详解

    本文实例讲述了AngularJS创建自定义指令的方法.分享给大家供大家参考,具体如下: 这是一篇译文,来自angular开发者说明的指令.主要面向已经熟悉angular开发基础的开发者.这篇文档解释了什么情况下需要创建自己的指令,和如何去创建指令. 什么是指令 从一个高的层面来讲,指令是angular $compile服务的说明,当特定的标签(属性,元素名,或者注释) 出现在DOM中的时候,它让编译器附加指定的行为到DOM上. 这个过程是很简单的.angular内部有很用这样自带的指令,比如说n

  • CodeIgniter自定义控制器MY_Controller用法分析

    本文实例讲述了CodeIgniter自定义控制器MY_Controller用法.分享给大家供大家参考,具体如下: Codeigniter所有的控制器都必须继承CI_Controller类,但CI_Controller类位于system目录下,不太方便修改.为方便做一些公用的处理,通常情况下我们会在core下创建MY_Controller,用来继承CI_Controller,从而项目中所有的控制器继承MY_Controller. 那么,MY_Controller 通常会做些什么呢? 所有的控制器都

  • Android实现在ServiceManager中加入自定义服务的方法详解

    本文实例讲述了Android实现在ServiceManager中加入自定义服务的方法.分享给大家供大家参考,具体如下: 当我们要使用android的系统服务时,一般都是使用Context.getSystemService方法.例如我们要获取AudioManager,我们可以: AudioManager am = (AudioManager) getSystemService(Context.AUDIO_SERVICE); 获取的服务,其实是在ServiceManager中注册的Binder服务,

  • thinkPHP3.2实现分页自定义样式的方法

    本文实例讲述了thinkPHP3.2实现分页自定义样式的方法.分享给大家供大家参考,具体如下: 下面是一个Tp3.2的自定义分页,这个方法也是在看过一个网友的博客之后受到启发这么写的.经过了一些修改,大家在看到代码之后也可以进行修改自定义样式: 主要的样式控制文件就是page.css,框架底层的分页类可以直接进行粘贴复制使用: 1. 框架底层的page.class.php 路径( Engine\Library\Think) 其实这个文件不需要过多修改,也可以直接使用官方的就行:下面是我现在用的,

  • iOS如何自定义控制器转场动画push详解

    前言 最近有些空闲时间,整理了下最近做的项目,本文主要介绍了关于iOS自定义控制器转场动画push的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 效果图: iOS7 开始苹果推出了自定义转场的 API .从此,任何可以用 CoreAnimation 实现的动画,都可以出现在两个 ViewController 的切换之间.并且实现方式高度解耦,这也意味着在保证代码干净的同时想要替换其他动画方案时只需简单改一个类名就可以了,真正体会了一把高颜值代码带来的愉悦感. 其实网

  • iOS实现容器视图控制器的方法

    一直以来想写一个抽屉效果,看了一些文章后发现并不是那么简单,网上的一些抽屉效果不是很严谨.看了下MMDrawerController的源码,等于定制了一个Container View Controller.(类似于系统的UINavigationController以及UITabbarController); 比如下面几个方法就是MMDrawerController实现的: 下面的描述是官方文档帮助理解什么是容器控制器,文档中以UINavigationController和UISplitViewC

随机推荐