1

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)

 1 year ago
source link: https://blog.51cto.com/saynaihe/5372369
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)

原创

前面完成了 client-go gin的简单整合五-list-watch deployment应用,进一步把 Rs Pod也实现list-watch!
前面少写的:

/src/core/deployment_init.go

//更新
func (depmap *DeploymentMap) Update(dep *v1.Deployment) error {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		for i, range_dep := range list.([]*v1.Deployment) {
			if range_dep.Name == dep.Name {
				list.([]*v1.Deployment)[i] = dep
				depmap.data.Store(dep.Namespace, list)
			}
		}
		return nil
	}
	return fmt.Errorf("deployment-%s not found", dep.Name)
}

忘记了Store数据…depmap.data.Store(dep.Namespace, list)!

client-go gin的简单整合六-list-watch二(关于Rs与Pod)

关于Rs list-watch

/src/core/rs_init.go

package core

import (
	"errors"
	"fmt"
	v1 "k8s.io/api/apps/v1"
	"log"
	"sync"
)

type RSMapStruct struct {
	Data sync.Map
}

func (rsmap *RSMapStruct) Add(rs *v1.ReplicaSet) {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		value = append(value.([]*v1.ReplicaSet), rs)
		rsmap.Data.Store(key, value)
	} else {
		rsmap.Data.Store(key, []*v1.ReplicaSet{rs})
	}
}
func (rsmap *RSMapStruct) Update(rs *v1.ReplicaSet) error {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		for index, r := range value.([]*v1.ReplicaSet) {
			if r.Name == rs.Name {
				value.([]*v1.ReplicaSet)[index] = rs
				rsmap.Data.Store(key, value)
				return nil
			}
		}
	}

	return fmt.Errorf("rs-%s not found", rs.Name)
}

func (rsmap *RSMapStruct) Delete(rs *v1.ReplicaSet) {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		for index, r := range value.([]*v1.ReplicaSet) {
			if r.Name == rs.Name {
				value = append(value.([]*v1.ReplicaSet)[0:index], value.([]*v1.ReplicaSet)[index+1:]...)
				rsmap.Data.Store(key, value)
				return
			}
		}
	}
}

func (rsmap *RSMapStruct) ListByNS(ns string) ([]*v1.ReplicaSet, error) {
	if list, ok := rsmap.Data.Load(ns); ok {
		return list.([]*v1.ReplicaSet), nil
	}
	return nil, errors.New("rs record not found")
}

func (rsmap *RSMapStruct) GetRsLabelsByDeployment(deploy *v1.Deployment) ([]map[string]string, error) {
	rs, err := rsmap.ListByNS(deploy.Namespace)
	if err != nil {
		return nil, err
	}
	ret := make([]map[string]string, 0)
	for _, item := range rs {
		//if item.Annotations["deployment.kubernetes.io/revision"] != deploy.Annotations["deployment.kubernetes.io/revision"] {
		//	continue
		//}
		for _, v := range item.OwnerReferences {
			if v.Name == deploy.Name {
				ret = append(ret, item.Labels)
				break
			}
		}
	}
	return ret, nil
}

type RSHandler struct {
}

func (rsmap *RSHandler) OnAdd(obj interface{}) {
	RSMap.Add(obj.(*v1.ReplicaSet))
}
func (rsmap *RSHandler) OnUpdate(oldObj interface{}, newObj interface{}) {
	err := RSMap.Update(newObj.(*v1.ReplicaSet))
	if err != nil {
		log.Println(err)
	}
}
func (rsmap *RSHandler) OnDelete(obj interface{}) {
	RSMap.Delete(obj.(*v1.ReplicaSet))
}

var RSMap *RSMapStruct

func init() {
	RSMap = &RSMapStruct{}
}

关于Pod list-watch

先写一个util方法(就为了判断两个label是否相同!):
/src/core/Util.go

func IsValidLabel(m1, m2 map[string]string) bool {
	for key := range m2 {
		if m2[key] != m1[key] {
			return false
		}
	}

	return true
}

基本copy自deployment_init.go:
/src/core/pod_int.go

package core

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"log"
	"sync"
)

type PodMapStruct struct {
	data sync.Map
}

func (podmap *PodMapStruct) Add(pod *corev1.Pod) {
	key := pod.Namespace
	if value, ok := podmap.data.Load(key); ok {
		value = append(value.([]*corev1.Pod), pod)
		podmap.data.Store(key, value)
	} else {
		podmap.data.Store(key, []*corev1.Pod{pod})
	}
}
func (podmap *PodMapStruct) Update(pod *corev1.Pod) error {
	key := pod.Namespace
	if value, ok := podmap.data.Load(key); ok {

		for index, p := range value.([]*corev1.Pod) {
			if p.Name == pod.Name {
				value.([]*corev1.Pod)[index] = pod
				podmap.data.Store(key, value)
				return nil
			}
		}
	}

	return fmt.Errorf("pod-%s not found", pod.Name)
}
func (podmap *PodMapStruct) Delete(pod *corev1.Pod) {
	key := pod.Namespace
	if value, ok := podmap.data.Load(key); ok {
		for index, p := range value.([]*corev1.Pod) {
			if p.Name == pod.Name {
				value = append(value.([]*corev1.Pod)[0:index], value.([]*corev1.Pod)[index+1:]...)
				podmap.data.Store(key, value)
				return
			}
		}
	}
}
func (podmap *PodMapStruct) ListByNS(ns string) ([]*corev1.Pod, error) {

	if ns != "" {
		if list, ok := podmap.data.Load(ns); ok {
			return list.([]*corev1.Pod), nil
		}
	}
	return nil, fmt.Errorf("pods not found")
}
func (podmap *PodMapStruct) ListByLabels(ns string, labels []map[string]string) ([]*corev1.Pod, error) {
	pods, err := podmap.ListByNS(ns)
	if err != nil {
		return nil, err
	}
	ret := make([]*corev1.Pod, 0)
	for _, pod := range pods {
		for _, label := range labels {
			if IsValidLabel(pod.Labels, label) {
				ret = append(ret, pod)
			}
		}
	}
	return ret, nil
}

type PodHandler struct {
}
var PodMap *PodMapStruct

func init() {
	PodMap = &PodMapStruct{}
}
func (podmap *PodHandler) OnAdd(obj interface{}) {
	PodMap.Add(obj.(*corev1.Pod))
}
func (podmap *PodHandler) OnUpdate(oldObj interface{}, newObj interface{}) {
	err := PodMap.Update(newObj.(*corev1.Pod))
	if err != nil {
		log.Println(err)
	}
}
func (podmap *PodHandler) OnDelete(obj interface{}) {
	PodMap.Delete(obj.(*corev1.Pod))
}

deployment informer AddEventHandler

/src/core/deployment_init.go

package core

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"log"
	"sync"
)

type DeploymentMap struct {
	data sync.Map
}

func (depmap *DeploymentMap) Add(dep *v1.Deployment) {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		list = append(list.([]*v1.Deployment), dep)
		depmap.data.Store(dep.Namespace, list)
	} else {
		depmap.data.Store(dep.Namespace, []*v1.Deployment{dep})
	}
}
func (depmap *DeploymentMap) Update(dep *v1.Deployment) error {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		for i, range_dep := range list.([]*v1.Deployment) {
			if range_dep.Name == dep.Name {
				list.([]*v1.Deployment)[i] = dep
			}
		}
		return nil
	}
	return fmt.Errorf("deployment-%s not found", dep.Name)
}

// 删除
func (depmap *DeploymentMap) Delete(dep *v1.Deployment) {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		for i, range_dep := range list.([]*v1.Deployment) {
			if range_dep.Name == dep.Name {
				newList := append(list.([]*v1.Deployment)[:i], list.([]*v1.Deployment)[i+1:]...)
				depmap.data.Store(dep.Namespace, newList)
				break
			}
		}
	}
}
func (depmap *DeploymentMap) ListByNS(ns string) ([]*v1.Deployment, error) {
	if list, ok := depmap.data.Load(ns); ok {
		return list.([]*v1.Deployment), nil
	}
	return nil, fmt.Errorf("record not found")
}

var DepMap *DeploymentMap

func init() {
	DepMap = &DeploymentMap{}
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {
	//fmt.Println(obj.(*v1.Deployment).Name)
	DepMap.Add(obj.(*v1.Deployment))
}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	err := DepMap.Update(newObj.(*v1.Deployment))
	if err != nil {
		log.Println(err)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
	if d, ok := obj.(*v1.Deployment); ok {
		DepMap.Delete(d)
	}
}

func InitDeployment() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	podInformer := factory.Core().V1().Pods().Informer()
	podInformer.AddEventHandler(&PodHandler{})

	rsInformer := factory.Apps().V1().ReplicaSets().Informer()
	rsInformer.AddEventHandler(&RSHandler{})
	factory.Start(wait.NeverStop)
}

Pod.go

/src/service/Pod.go

package service

import (
	"context"
	"fmt"
	"github.com/gin-gonic/gin"
	"k8s-demo1/src/core"
	. "k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Pod struct {
	Namespace  string
	Name       string
	Status     string
	Images     string
	NodeName   string
	CreateTime string
	Labels     map[string]string
}

func ListallPod(g *gin.Context) {
	ns := g.Query("ns")

	//pods, err := K8sClient.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{})
	pods, err := core.PodMap.ListByNS(ns)
	if err != nil {
		g.Error(err)
	}
	ret := make([]*Pod, 0)
	for _, item := range pods {

		ret = append(ret, &Pod{
			Namespace:  item.Namespace,
			Name:       item.Name,
			Status:     string(item.Status.Phase),
			Labels:     item.Labels,
			NodeName:   item.Spec.NodeName,
			Images:     item.Spec.Containers[0].Image,
			CreateTime: item.CreationTimestamp.Format("2006-01-02 15:04:05"),
		})

	}
	g.JSON(200, ret)
	return
}

deployment.go的修改

deployment.go也要修改一下(上次没有将GetPodsByDep等方法修改为list-wath!)
/src/service/deployment.go:

package service

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"k8s-demo1/src/core"
	v1 "k8s.io/api/apps/v1"
	"log"
)

type Deployment struct {
	Namespace           string
	Name                string
	Replicas            int32
	AvailableReplicas   int32
	UnavailableReplicas int32
	Images              string
	CreateTime          string
	Labels              map[string]string
	Pods                []*Pod
}

func ListDeployment(g *gin.Context) {
	ns := g.Query("ns")
	deplist, err := core.DepMap.ListByNS(ns)
	//dps, err := K8sClient.AppsV1().Deployments(ns).List(context.Background(), metav1.ListOptions{})
	if err != nil {
		g.Error(err)
	}
	ret := make([]*Deployment, 0)
	for _, item := range deplist {
		ret = append(ret, &Deployment{
			Namespace:           item.Namespace,
			Name:                item.Name,
			Replicas:            item.Status.Replicas,
			AvailableReplicas:   item.Status.AvailableReplicas,
			UnavailableReplicas: item.Status.UnavailableReplicas,
			Images:              item.Spec.Template.Spec.Containers[0].Image,
			Labels:              item.GetLabels(),
			Pods:                GetPodsByDep(ns, *item),
			CreateTime:          item.CreationTimestamp.Format("2006-01-02 15:03:04"),
		})

	}
	g.JSON(200, ret)
	return
}

func GetLabels(m map[string]string) string {
	labels := ""
	// aa=xxx,xxx=xx
	for k, v := range m {
		if labels != "" {
			labels += ","
		}
		labels += fmt.Sprintf("%s=%s", k, v)
	}
	return labels
}
func GetPodsByDep(ns string, dep v1.Deployment) []*Pod {
	rsLabelsMap, err := core.RSMap.GetRsLabelsByDeployment(&dep)
	if err != nil {
		log.Fatal(err)
	}
	pods, err := core.PodMap.ListByLabels(dep.Namespace, rsLabelsMap)
	if err != nil {
		log.Fatal(err)
	}
	ret := make([]*Pod, 0)
	for _, pod := range pods {
		ret = append(ret, &Pod{
			Name:       pod.Name,
			Namespace:  pod.Namespace,
			Images:     pod.Spec.Containers[0].Image,
			NodeName:   pod.Spec.NodeName,
			Labels:     pod.Labels,
			Status:     string(pod.Status.Phase),
			CreateTime: pod.CreationTimestamp.Format("2006-01-02 15:04:05"),
		})
	}
	return ret
}

运行main.go

运行main.go 看是否能访问pod列表?
 http://127.0.0.1:8080/pods?ns=default

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)_gin

deployments下的坑…

运行main.go也顺便看了一眼deployment是否正常:
 http://127.0.0.1:8080/deployments?ns=default

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)_云原生_02

what…deployment的pod串了!想一下也是的:GetRsLabelsByDeployment的时候并没有判断rc与pod的关系,pod的label判断还是不精确的,没有想到好的办法,偷了一个懒:
/src/core/rs_int.go

package core

import (
	"errors"
	"fmt"
	v1 "k8s.io/api/apps/v1"
	"log"
	"sync"
)

type RSMapStruct struct {
	Data sync.Map
}

func (rsmap *RSMapStruct) Add(rs *v1.ReplicaSet) {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		value = append(value.([]*v1.ReplicaSet), rs)
		rsmap.Data.Store(key, value)
	} else {
		rsmap.Data.Store(key, []*v1.ReplicaSet{rs})
	}
}
func (rsmap *RSMapStruct) Update(rs *v1.ReplicaSet) error {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		for index, r := range value.([]*v1.ReplicaSet) {
			if r.Name == rs.Name {
				value.([]*v1.ReplicaSet)[index] = rs
				rsmap.Data.Store(key, value)
				return nil
			}
		}
	}

	return fmt.Errorf("rs-%s not found", rs.Name)
}

func (rsmap *RSMapStruct) Delete(rs *v1.ReplicaSet) {
	key := rs.Namespace
	if value, ok := rsmap.Data.Load(key); ok {
		for index, r := range value.([]*v1.ReplicaSet) {
			if r.Name == rs.Name {
				value = append(value.([]*v1.ReplicaSet)[0:index], value.([]*v1.ReplicaSet)[index+1:]...)
				rsmap.Data.Store(key, value)
				return
			}
		}
	}
}

func (rsmap *RSMapStruct) ListByNS(ns string) ([]*v1.ReplicaSet, error) {
	if list, ok := rsmap.Data.Load(ns); ok {
		return list.([]*v1.ReplicaSet), nil
	}
	return nil, errors.New("rs record not found")
}

func (rsmap *RSMapStruct) GetRsLabelsByDeployment(deploy *v1.Deployment) ([]map[string]string, error) {
	rs, err := rsmap.ListByNS(deploy.Namespace)
	if err != nil {
		return nil, err
	}
	ret := make([]map[string]string, 0)
	for _, item := range rs {
		//if item.Annotations["deployment.kubernetes.io/revision"] != deploy.Annotations["deployment.kubernetes.io/revision"] {
		//	continue
		//}
		for _, v := range item.OwnerReferences {
			if v.Name == deploy.Name {
				ret = append(ret, item.Labels)
				break
			}
		}
	}
	return ret, nil
}
func (rsmap *RSMapStruct) GetRsLabelsByDeploymentname(deploy *v1.Deployment) string {
	rs, err := rsmap.ListByNS(deploy.Namespace)
	if err != nil {
		fmt.Println(err)
	}
	for _, item := range rs {
		//if item.Annotations["deployment.kubernetes.io/revision"] != deploy.Annotations["deployment.kubernetes.io/revision"] {
		//	continue
		//}
		for _, v := range item.OwnerReferences {
			if v.Name == deploy.Name {
				return item.Name
			}
		}
	}
	return ""
}

type RSHandler struct {
}

func (rsmap *RSHandler) OnAdd(obj interface{}) {
	RSMap.Add(obj.(*v1.ReplicaSet))
}
func (rsmap *RSHandler) OnUpdate(oldObj interface{}, newObj interface{}) {
	err := RSMap.Update(newObj.(*v1.ReplicaSet))
	if err != nil {
		log.Println(err)
	}
}
func (rsmap *RSHandler) OnDelete(obj interface{}) {
	RSMap.Delete(obj.(*v1.ReplicaSet))
}

var RSMap *RSMapStruct

func init() {
	RSMap = &RSMapStruct{}
}

模仿GetRsLabelsByDeployment写了一个**GetRsLabelsByDeploymentname方法,**恩返回rc的name 等于pod的OwnerReferences字段中的name字段!

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)_gin_03

/src/service/deployment.go

package service

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"k8s-demo1/src/core"
	v1 "k8s.io/api/apps/v1"
	"log"
)

type Deployment struct {
	Namespace           string
	Name                string
	Replicas            int32
	AvailableReplicas   int32
	UnavailableReplicas int32
	Images              string
	CreateTime          string
	Labels              map[string]string
	Pods                []*Pod
}

func ListDeployment(g *gin.Context) {
	ns := g.Query("ns")
	deplist, _ := core.DepMap.ListByNS(ns)
	//dps, err := K8sClient.AppsV1().Deployments(ns).List(context.Background(), metav1.ListOptions{})
	//if err != nil {
	//	g.Error(err)
	//}
	ret := make([]*Deployment, 0)
	for _, item := range deplist {
		ret = append(ret, &Deployment{
			Namespace:           item.Namespace,
			Name:                item.Name,
			Replicas:            item.Status.Replicas,
			AvailableReplicas:   item.Status.AvailableReplicas,
			UnavailableReplicas: item.Status.UnavailableReplicas,
			Images:              item.Spec.Template.Spec.Containers[0].Image,
			Labels:              item.GetLabels(),
			Pods:                GetPodsByDep(*item),
			CreateTime:          item.CreationTimestamp.Format("2006-01-02 15:03:04"),
		})

	}
	g.JSON(200, ret)
	return
}

func GetLabels(m map[string]string) string {
	labels := ""
	// aa=xxx,xxx=xx
	for k, v := range m {
		if labels != "" {
			labels += ","
		}
		labels += fmt.Sprintf("%s=%s", k, v)
	}
	return labels
}
func GetPodsByDep(dep v1.Deployment) []*Pod {
	rsLabelsMap, err := core.RSMap.GetRsLabelsByDeployment(&dep)
	if err != nil {
		log.Fatal(err)
	}

	pods, err := core.PodMap.ListByRsLabels(dep.Namespace, rsLabelsMap)
	if err != nil {
		log.Fatal(err)
	}
	ret := make([]*Pod, 0)

	for _, pod := range pods {
		if core.RSMap.GetRsLabelsByDeploymentname(&dep) == pod.OwnerReferences[0].Name {
			ret = append(ret, &Pod{
				Name:       pod.Name,
				Namespace:  pod.Namespace,
				Images:     pod.Spec.Containers[0].Image,
				NodeName:   pod.Spec.NodeName,
				Labels:     pod.Labels,
				Status:     string(pod.Status.Phase),
				CreateTime: pod.CreationTimestamp.Format("2006-01-02 15:04:05"),
			})
		}
	}
	return ret
}

运行mai.go,访问http://127.0.0.1:8080/deployments?ns=default

client-go gin的简单整合六-list-watch二(关于Rs与Pod以及Deployment的完善)_go_04

算是精准匹配了,还不知道会不会有什么坑…

总结一下这过程学到的:

  1. deployment rs pod之间的关系
  2. OwnerReferences的匹配
  3. 下面还是继续crud吧…这个地方让我理解的有点长list-watch

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK