Categories
程式開發

淺談 Kubernetes Scheduling-Framework 插件的實現


最近幾個月一直在研究 Kubernetes 的 scheduling-framework 調度框架,發現還是十分有意思的,我自己也實現了一個基於 scheduling-framework 調度框架的自定義調度器,希望感興趣的同學一起學習:https://github.com/NJUPT-ISL/Yoda-Scheduler

Scheduling-framework 調度框架

Kubernetes 的scheduling-framework 調度框架(以下簡稱調度框架)是針對當前Kubernetes 調度器的增強,它不同於之前的scheduler-extender,用戶可以編寫多個插件,這些插件可以在調度的不同階段作為原有調度器的擴展,並且這些插件會和Kubernetes 原有的調度器源代碼會一起編譯到調度程序中。

調度框架設計目標

  • 增強 Kubernetes 原有調度器的可擴展性。
  • 通過將調度程序的某些功能移至插件,可以簡化調度程序核心。
  • 調度框架中可設置多個擴展點。
  • 調度框架通過插件機制來接收插件結果,並根據接收到的結果繼續或中止。
  • 提出一種處理錯誤並將其與插件進行通信的機制。

Proposal

調度框架在 Kubernetes 調度器中定義了很多 Go 的接口和 Go API,用於用戶設計插件使用。這些用戶設計的插件將會被添加到調度程序中,並在編譯時包含在內。可以通過配置調度程序的 ComponentConfig 將允許啟用、禁用和重新排序插件。自定義調度程序可以“ 在樹外 ” 編寫其插件並編譯包含其自己的插件的調度程序二進製文件。

調度週期和綁定週期

調度器調度一個 Pod 的過程分為兩個階段:調度週期綁定週期

在調度週期中,調度器會為 Pod 選擇一個最合適它運行的節點,然後調度過程將進入綁定週期。

在綁定週期中,調度器會檢測調度週期中選中的那個“最合適的節點”是不是真的可以讓這個Pod 穩定的運行(比如檢測PV、檢測是否有端口衝突等),或者需不需要做一些初始化操作(比如設置這個節點上的FPGA 板子的狀態、設置GPU 顯卡的驅動版本、CUDA 的版本等)。

擴展點

Kubernetes 調度框架在調度週期和綁定週期都為我們提供了豐富的擴展點,這些擴展點可以“插上”我們自己設計的調度插件,一個插件可以在多個擴展點註冊以執行更複雜或有狀態的任務,實現我們想要的調度功能:

淺談 Kubernetes Scheduling-Framework 插件的實現 1

下面闡述下各個擴展點可以實現的功能。

Sort 排序

排序擴展點,由於調度器是按照 FIFO 的順序調度Pod 的,因此當隊列裡出現多個等待調度的Pod 時,可以對這些Pod 的先後順序進行排序,把我們想要的Pod(可能優先級比較高)往出隊方向移動,讓它可以更快地被調度。

目前的 Sort 擴展點只能啟用一個,不可以啟用多個 Sort 擴展插件。

我們可以看下 Sort 的接口,代碼位於 Kubernetes 項目的 /pkg/scheduler/framework/interface.go 中:

type QueueSortPlugin interface {
    Plugin
    // Less are used to sort pods in the scheduling queue.
    Less(*PodInfo, *PodInfo) bool
}

也就是只需要實現 Less 方法即可,比如如下的實現:

func Less(podInfo1, podInfo2 *framework.PodInfo) bool {
    return GetPodPriority(podInfo1) > GetPodPriority(podInfo2)
}

Pre-filter 預過濾

該擴展點用於預處理有關 Pod 的信息,或檢查集群或 Pod 必須滿足的某些條件。預過濾器插件應實現 PreFilter 函數,如果 PreFilter 返回錯誤,則調度週期將中止。注意,在每個調度週期中,只會調用一次 PreFilter。

Pre-filter 插件可以選擇實現 PreFilterExtensions 接口,這個接口定義了 AddPodRemovePod 方法以增量方式修改其預處理信息。

type PreFilterPlugin interface {
    Plugin
    PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status
    PreFilterExtensions() PreFilterExtensions
}

這裡的CycleState ,表示調度的上下文,其實是一個map 的封裝,結構體內部通過讀寫鎖實現了並發安全,開發者可以通過CycleState 來實現多個調度插件直接的數據傳遞,也就是多個插件可以共享狀態或通過此機制進行通信。

 type CycleState struct {
     mx      sync.RWMutex
     storage map[StateKey]StateData
     recordFrameworkMetrics bool// 该值为 true, 则调度框架会记录此次调度周期的数据
 }

這裡的 StateKey 是 string 類型,StateData 是一個接口類型:

 type StateData interface {
     // Clone is an interface to make a copy of StateData. For performance reasons,
     // clone should make shallow copies for members (e.g., slices or maps) that are not
     // impacted by PreFilter's optional AddPod/RemovePod methods.
     Clone() StateData
 }

我們可以做一個簡單的接口實現,來實現 StateData:

 type Data struct {
     Value int64
 }

 func (s *Data) Clone() framework.StateData {
     c := &Data{
         Value: s.Value,
     }
     return c
 }

那麼當插件在該擴展點想傳遞數據時就可以使用如下類似的代碼實現數據的傳遞:

 Max := Data{Value: 0}
 state.Lock()
 state.Write(framework.StateKey("Max"), &Max)
 defer state.Unlock()

Filter 過濾

用於過濾不能滿足當前被調度 Pod 運行需求的節點。對於每個節點,調度程序將按配置的順序調用該類插件。如果有任何過濾器插件將節點標記為不可行,則不會為該節點調用其餘插件。可以同時評估節點,並且在同一調度週期中可以多次調用 Filter 插件。這塊其實是調度器會啟動多個 go 協程以實現對多個節點並發調用 filter,來提高過濾效率。過濾插件其實類似於上一代 Kubernetes 調度器中的預選環節,即 Predicates。

我們看下接口定義:

type FilterPlugin interface {
    Plugin
    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
}

我們可以對應的實現,比如我這裡需要做 GPU 的調度,我需要檢查每個節點的 GPU 是否滿足 Pod 的運行要求:

func (y *Yoda) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, node *nodeinfo.NodeInfo) *framework.Status {
    klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, node.Node().Name)
  // 检查节点 GPU 的健康状态
    if ok, msg := filter.CheckGPUHealth(node); ok {
    // 节点的 GPU 是否符合Pod 运行等级
        if !filter.PodFitsLevel(pod, node) {
            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Level Not Fit")
        }
    // 节点的 GPU 显存是否符合 Pod 运行
        if !filter.PodFitsMemory(pod, node) {
            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Memory Not Fit")
        }
    // 节点的 GPU 数量是否符合 Pod 运行
        if !filter.PodFitsNumber(pod, node) {
            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Number Not Fit")
        }
        return framework.NewStatus(framework.Success, "")
    } else {
        return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+msg)
    }
}

Pre-Score 預打分 (v1alpha1 版本稱為 Post-Filter)

注意:Pre-Score 從 v1alpha2 開始可用。

該擴展點將使用 通過 Filter 階段的節點列表 來調用插件。插件可以使用此數據來更新內部狀態或生成日誌、指標。比如可以通過該擴展點收集各個節點中性能指標,所有節點中最大的內存的節點,性能最好的 CPU 節點等。

我們繼續來看接口里長什麼樣子(我這裡是v1alpha1):

type PostFilterPlugin interface {
    Plugin
    PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
}

針對這個擴展點,通過傳遞的參數可以看出,接口傳入了節點的切片,因此開發者可以通過啟動多個並發協程來獲取數據,並且可以把這些數據存在CycleState 中,給之後的插件擴展點使用:

func (y *Yoda) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
    klog.V(3).Infof("collect info for scheduling  pod: %v", pod.Name)
    return collection.ParallelCollection(collection.Workers, state, nodes, filteredNodesStatuses)
}

並發這塊我們也可以參考 1.13 調度器中經常使用的經典並發模型:

func ParallelCollection(workers int, state *framework.CycleState, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
    var (
        stop <-chan struct{}
        mx   sync.RWMutex
        msg  = ""
    )
  // 数据存入管道
    pieces := len(Sum)
    toProcess := make(chan string, pieces)
    for _, v := range Sum {
        toProcess <- v
    }
    close(toProcess)
  // 并发协程数限制
    if pieces < workers {
        workers = pieces
    }
    wg := sync.WaitGroup{}
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
      // 协程消费管道数据
            for value := range toProcess {
                select {
                case <-stop:
                    return
                default:
          // state 并发安全,调用的时候可以不用加锁
                    if re := CollectMaxValue(value, state, nodes, filteredNodesStatuses); !re.IsSuccess() {
                        klog.V(3).Infof(re.Message())
                        mx.Lock()
            // message非并发安全,加锁
                        msg += re.Message()
                        mx.Unlock()
                    }
                }
            }
            wg.Done()
        }()
    }
    wg.Wait()
    if msg != "" {
        return framework.NewStatus(framework.Error, msg)
    }
    return framework.NewStatus(framework.Success, "")
}

Score 打分

Score 擴展點和上一代的調度器的優選流程很像,它分為兩個階段:

  1. 第一階段稱為 “打分”,用於對已通過過濾階段的節點進行排名。調度程序將為 Score 每個節點調用每個計分插件。
  2. 第二階段是 “歸一化”,用於在調度程序計算節點的最終排名之前修改分數,可以不實現, 但是需要保證 Score 插件的輸出必須是 [MinNodeScore,MaxNodeScore][0-100]) 範圍內的整數 。如果不是,則調度器會報錯,你需要實現 NormalizeScore 來保證最後的得分範圍。如果不實現 NormalizeScore,則 Score 的輸出必須在此範圍內。調度程序將根據配置的插件權重合併所有插件的節點分數。

看看接口的定義:

type ScorePlugin interface {
    Plugin
    // Score is called on each filtered node. It must return success and an integer
    // indicating the rank of the node. All scoring plugins must return success or
    // the pod will be rejected.
    Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
    // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
    ScoreExtensions() ScoreExtensions
}

我們也可以做如下簡單的實現:

func (y *Yoda) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
    nodeInfo, err := y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
    }
    s, err := score.Score(state, nodeInfo)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Score Node Error: %v", err))
    }
    klog.V(3).Infof("node : %v yoda-score: %v",nodeName,s)
    return s, framework.NewStatus(framework.Success, "")
}

如果最後的分數不在範圍內,我們可能需要實現 NormalizeScore 函數做進一步處理:

func (y *Yoda) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    var (
        highest int64 = 0
    )
    // 归一化 
    for i, nodeScore := range scores {
        scores[i].Score = nodeScore.Score * framework.MaxNodeScore / highest
    }
    return framework.NewStatus(framework.Success, "")
}

Reserve 保留

為給定的 Pod 保留節點上的資源時,維護運行時狀態的插件可以應實現此擴展點,以由調度程序通知。這是在調度程序實際將 Pod 綁定到 Node 之前發生的,它的存在是為了防止在調度程序等待綁定成功時發生爭用情況。

type ReservePlugin interface {
    Plugin
    // Reserve is called by the scheduling framework when the scheduler cache is
    // updated.
    Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}

這里和上面的 Score 類似,函數並沒有提供 nodeInfo 接口,我們可以通過調用 handle.SnapshotSharedLister 來獲取節點的信息。

nodeInfo, err := y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)

那麼以上就是調度週期的插件與實現,其實綁定週期的插件實現和上述的方法也都類似,實現相關的函數即可。

插件註冊

每個插件必須定義一個構造函數,並將其添加到硬編碼的註冊表中。

type PluginFactory = func(runtime.Unknown, FrameworkHandle) (Plugin, error)

type Registry map[string]PluginFactory

func NewRegistry() Registry {
   return Registry{
      fooplugin.Name: fooplugin.New,
      barplugin.Name: barplugin.New,
      // New plugins are registered here.
   }
}

那麼在編譯的時候,編譯器會將我們的插件和調度源碼一起編譯成我們的自定義調度器。

在聲明插件的時候也需要實現構造函數和對應的方法:

type Yoda struct {
    args   *Args
    handle framework.FrameworkHandle
}

func (y *Yoda) Name() string {
    return Name
}

func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) {
    args := &Args{}
    if err := framework.DecodeInto(configuration, args); err != nil {
        return nil, err
    }
    klog.V(3).Infof("get plugin config args: %+v", args)
    return &Yoda{
        args:   args,
        handle: f,
    }, nil
}

編譯小技巧

由於最終的調度器還是以容器的方式運行的,我們可以寫一個 Makefile 來簡化編譯流程:

all: local

local:
    GOOS=linux GOARCH=amd64 go build  -o=my-scheduler ./cmd/scheduler

build:
    sudo docker build --no-cache . -t registry.cn-hangzhou.aliyuncs.com/my/scheduler

push:
    sudo docker push registry.cn-hangzhou.aliyuncs.com/my/scheduler

format:
    sudo gofmt -l -w .
clean:
    sudo rm -f my-scheduler

編寫調度器的Dockerfile:

FROM debian:stretch-slim

WORKDIR /

COPY my-scheduler /usr/local/bin

CMD ["my-scheduler"]

那麼編譯 -> 構建就可以三步走了:

  • 編譯

    make local
    
  • 構建鏡像

    make build
    
  • 上傳鏡像

    make push
    

自定義調度器的配置

首先需要設置一個 ConfigMap ,用於存放調度器的配置文件:

apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-config
  namespace: kube-system
data:
  scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1alpha1
    kind: KubeSchedulerConfiguration
    schedulerName: yoda-scheduler
    leaderElection:
      leaderElect: true
      lockObjectName: yoda-scheduler
      lockObjectNamespace: kube-system
    plugins:
      queueSort:
        enabled:
          - name: "yoda"
      filter:
        enabled:
        - name: "yoda"
      score:
        enabled:
        - name: "yoda"
      postFilter:
        enabled:
        - name: "yoda"
    pluginConfig:
    - name: "yoda"
      args: {"master": "master", "kubeconfig": "kubeconfig"}

這裡主要需要修改的就是 schedulerName 字段的調度器名稱和 plugins 字段中各個擴展點的插件名稱,enable 才能保證該擴展點運行了你的插件。

接著為調度器創建 RBAC:

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: yoda-cr
rules:
  - apiGroups:
      - ""
    resources:
      - endpoints
      - events
    verbs:
      - create
      - get
      - update
  - apiGroups:
      - ""
    resourceNames:
      - yoda-scheduler
    resources:
      - endpoints
    verbs:
      - delete
      - get
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - pods
    verbs:
      - delete
      - get
      - list
      - watch
      - update
  - apiGroups:
      - ""
    resources:
      - bindings
      - pods/binding
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - pods/status
    verbs:
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      - replicationcontrollers
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - apps
      - extensions
    resources:
      - replicasets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - apps
    resources:
      - statefulsets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - policy
    resources:
      - poddisruptionbudgets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - persistentvolumeclaims
      - persistentvolumes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "storage.k8s.io"
    resources:
      - storageclasses
      - csinodes
    verbs:
      - watch
      - list
      - get
  - apiGroups:
      - "coordination.k8s.io"
    resources:
      - leases
    verbs:
      - create
      - get
      - list
      - update
  - apiGroups:
      - "events.k8s.io"
    resources:
      - events
    verbs:
      - create
      - patch
      - update
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: yoda-sa
  namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: yoda-crb
  namespace: kube-system
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: yoda-cr
subjects:
  - kind: ServiceAccount
    name: yoda-sa
    namespace: kube-system

最後配置調度器的 Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: yoda-scheduler
  namespace: kube-system
  labels:
    component: yoda-scheduler
spec:
  replicas: 1
  selector:
    matchLabels:
      component: yoda-scheduler
  template:
    metadata:
      labels:
        component: yoda-scheduler
    spec:
      serviceAccount: yoda-sa
      priorityClassName: system-cluster-critical
      volumes:
        - name: scheduler-config
          configMap:
            name: scheduler-config
      containers:
        - name: yoda-scheduler
          image: registry.cn-hangzhou.aliyuncs.com/geekcloud/yoda-scheduler
          imagePullPolicy: Always
          args:
            - yoda-scheduler
            - --config=/scheduler/scheduler-config.yaml
            - --v=3
          resources:
            requests:
              cpu: "50m"
          volumeMounts:
            - name: scheduler-config
              mountPath: /scheduler

隨著雲計算技術的不斷發展,Kubernetes scheduler 也在根據各種複雜的需求不斷進化,未來也會湧現更多各種各樣的豐富的、支持不同功能的調度器在不同的生產環境中發揮著更多強勁的作用,一起期待吧!

作者介紹

李俊江

Kubernetes & istio member

南京郵電大學物聯網學院研究生,熱衷於 Kubernetes 與雲原生相關技術。

微信:FUNKY-STARS 歡迎交流!

參考

  • Scheduling Framework
  • enhancements/624
  • scheduler-framework-sample
  • Kubernetes 1.13 源碼分析

致謝

感謝 Scheduler-SIG Leader HuangWei 大佬在 kubecon 2018 的 Q&A 和指導!

感謝張磊、車漾大佬在 kubecon 2018 的分享和討論!

作者介紹

本文轉載自公眾號ServiceMesher(ID:ServiceMesher)。

原文鏈接

https://mp.weixin.qq.com/s/If-Y72rnZxXfQvEjd1UsFQ