Kubernetes

Custom Controller 3 - CronJob 구현하기

seungkyua@gmail.com 2023. 7. 12. 09:51
반응형

Kubernetes 에는 이미 CronJob 이라는 리소스 타입이 있지만, Kubebuilder 을 이용하여 Custom Controller 로 재작성 해보는 연습을 해보도록 하자.

Project 구조 만들기

먼저, Project 구조를 만들기 위해 아래와 같이 kubebuilder init 명령어를 실행한다.

$ mkdir -p cronjob-kubebuilder
$ cd cronjob-kubebuilder

$ kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project

도메인을 tutorial.kubebuilder.io 로 했으므로 모든 API Group 은 <group>.tutorial.kubebuilder.io 방식으로 정해지게 된다. 또한 특별히 프로젝트 이름은 지정하지 않았는데, --project-name=<dns1123-label-string> 과 같이 옵션을 추가하지 않으면 폴더의 이름이 기본적으로 프로젝트 이름이 된다. (여기서 프로젝트명은 DNS-1123 label 규칙을 따라야 한다)

한가지 주의해야 할 점은 cronjob-kubebuilder 디렉토리는 $GOPATH 경로 아래에 있어서는 안된다. 이는 Go modules 의 규칙 때문인데 좀 더 자세히 알고 싶으면 https://go.dev/blog/using-go-modules 블로그 포스트를 읽어보자.

만들어진 프로젝트의 구조는 다음과 같다.

$ tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── cmd
│   └── main.go
├── config
│   ├── default
│   ├── manager
│   ├── prometheus
│   └── rbac
├── go.mod
├── go.sum
└── hack
    └── boilerplate.go.txt

7 directories, 8 files

go.mod 파일은 모듈 디펜던시를 표시하고, Makefile 은 custom controller 를 빌드하고 디플로이 할 수 있다.

config 디렉토리 아래에는 Kustomize 로 작성되어 CustomResourceDefinition, RBAC, WebhookConfiguration 등의 yaml 파일들이 정의되어 있다.

특히, config/manager 디렉토리에는 Cluster 에 Custom Controller 를 파드 형태로 배포할 수 있는 Kustomize yaml 이 있고, config/rbac 디렉토리에는 서비스 어카운트로 Custom Controller 의 권한이 정의되어 있다.

Custom Controller 의 Entrypoint 는 cmd/main.go 파일이다.

처음 필요한 모듈을 임포트 한 것을 보면 아래 2개가 보인다.

  • core controller-runtime 라이브러리
  • 기본 controller-runtime 로깅인 Zap
package main

import (
    "flag"
    "fmt"
    "os"

    _ "k8s.io/client-go/plugin/pkg/client/auth"

    "k8s.io/apimachinery/pkg/runtime"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/cache"
    "sigs.k8s.io/controller-runtime/pkg/healthz"
    "sigs.k8s.io/controller-runtime/pkg/log/zap"
    // +kubebuilder:scaffold:imports
)

모든 컨트롤러에는 Scheme 이 필요하다. 스킴은 KindGo types 간의 매핑을 제공해 준다.

var (
    scheme   = runtime.NewScheme()
    setupLog = ctrl.Log.WithName("setup")
)

func init() {
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))

    //+kubebuilder:scaffold:scheme
}

main function 에는 아래의 내용들이 들어가 있다.

  • 기본 플래그 셋업
  • manager 를 생성하여 모든 Custom Controller 의 실행을 추적하고, shared cache 세팅하고, scheme 을 아규먼트로 넘기주어 클라이언트를 API 서버에 설정한다.
  • manager 를 실행하면 manager 가 모든 컨트롤러와 웹혹을 실행한다.
func main() {
    var metricsAddr string
    var enableLeaderElection bool
    var probeAddr string
    flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
    flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
    flag.BoolVar(&enableLeaderElection, "leader-elect", false,
        "Enable leader election for controller manager. "+
            "Enabling this will ensure there is only one active controller manager.")
    opts := zap.Options{
        Development: true,
    }
    opts.BindFlags(flag.CommandLine)
    flag.Parse()

    ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
    })
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }

manager 생성 시에 컨트롤러가 특정 네임스페이스의 리소스만을 감시할 수 있도록 할 수 있다.

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        Namespace:              namespace,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
    })

이렇게 특정 네임스페이스를 지정한 경우에는 권한을 ClusterRoleClusterRoleBinding 에서 RoleRoleBinding 으로 변경하는 것을 권장한다.

그리고 MutiNamespacedCacheBuilder 를 사용하면 특정 네임스페이스의 묶음의 리소스만을 감시하게 제한할 수 있다.

    var namespaces []string // List of Namespaces
    cache.Options.Namespaces = namespaces

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        NewCache:               cache.MultiNamespacedCacheBuilder(namespaces),
        MetricsBindAddress:     fmt.Sprintf("%s:%d", metricsHost, metricsPort),
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
    }) 

MultiNamespacedCacheBuilder 는 deprecated api 이므로 cache.Options.Namespaces 를 사용한다. (https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/cache#Options)

Groups, Versions, Kinds and Resources

쿠버네티스에서 API 에 대해서 이야기할 때는 groups, versions, kinds and resources 4개의 용어를 사용한다.

쿠버네티스의 API Group 은 단순히 관련 기능의 모음이다. 각 Group 에는 하나 이상의 Version 이 있으며, 이름에서 알 수 있듯이 시간이 지남에 따라 API의 작동 방식을 변경할 수 있다.

각 API group-version 에는 하나 이상의 API type 이 포함되며, 이를 Kind 라고 부른다. Kind 는 Version 간에 양식을 변경할 수 있지만, 각 양식은 어떻게든 다른 양식의 모든 데이터를 저장할 수 있어야 한다(데이터를 필드 또는 주석에 저장할 수 있음). 즉, 이전 API 버전을 사용해도 최신 데이터가 손실되거나 손상되지 않는다.

Resource 란 간단히 말해서 API 안에서 Kind 를 사용하는 것이다. 종종, Kind 와 Resource 는 일대일로 매핑된다. 예를 들어, Pod Resource 는 Pod Kind 에 해당한다. 그러나 때로는 여러 Resource 에서 동일한 Kind를 반환할 수도 있다. 예를 들어, Scale Kind 는 deployments/scale 또는 replicasets/scale 과 같은 모든 scale 하위 리소스에 의해 반환된다. 이것이 바로 Kubernetes HorizontalPodAutoscaler 가 서로 다른 resource 와 상호 작용할 수 있는 이유다. 그러나 CRD를 사용하면 각 Kind 는 단일 resource 에 해당한다.

resource 는 항상 소문자이며, 관례에 따라 소문자 형태의 Kind를 사용한다.

특정 group-version 에서 어떤 kind 를 지칭할 때는 줄여서 GroupVersionKind 혹은 줄여서 GVK 라고 부른다. 같은 방식으로 resource 도 GroupVersionResource 혹은 GVR 이라고 부른다.

GVK 는 패키지에서 Go type 에 해당한다.

API 는 왜 만들어야 할까?

Kind 에 대해서 Custom Resource (CR) 과 Custom Resource Definition (CRD) 을 만들어야 한다. 그 이유는 CustomResourceDefinitions 으로 Kubernetes API 를 확장할 수 있기 때문이다.

새롭게 만드는 API 는 쿠버네티스에게 custom object 를 가리치는 방법이다.

기본으로 CRD 는 customized Objects 의 정의이며, CR 은 그것에 대한 인스턴스이다.

API 추가

아래 명령으로 새로운 Kind 를 추가하자.

$ kubebuilder create api --group batch --version v1 --kind CronJob

Create Resource 와 Create Controller 를 하겠냐고 물으면 y 로 대답한다.

$ tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
├── bin
│   └── controller-gen
├── cmd
│   └── main.go
├── config
│   ├── crd
│   ├── default
│   ├── manager
│   ├── prometheus
│   ├── rbac
│   └── samples
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── internal
    └── controller 

이 경우 batch.tutorial.kubebuilder.io/v1 에 해당하는 api/v1 디렉토리가 생성된다.

api/v1/cronjob_types.go 파일을 보면, 모든 쿠버네티스 Kind 에 공통으로 포함된 metadata 를 가리키는 meta/v1 API group 을 임포트 하고 있다.

package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

다음으로 Kind 의 SpecStatus 에 대한 type 을 정의 한다.

쿠버네티스는 원하는 상태(Spec)를 실제 클러스터 상태(Status) 및 외부 상태와 조정한 다음 관찰한 것(Status)를 기록하는 방식으로 작동한다. 따라서 모든 기능 object 는 spec 과 status 를 포함한다. ConfigMap 과 같은 몇몇 타입은 원하는 상태를 인코딩하지 않기 때문에 이 패턴을 따르지 않지만 대부분의 타입은 이 패턴을 따른다.

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// CronJobSpec defines the desired state of CronJob
type CronJobSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
    // Important: Run "make" to regenerate code after modifying this file
}

// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
    // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
    // Important: Run "make" to regenerate code after modifying this file
}

실제 Kind 에 해당하는 타입인 CronJob 과 CronJobList 를 정의한다. CronJob 은 루트 타입이며, CronJob kind를 설명한다. 모든 쿠버네티스 오브젝트와 마찬가지로, API version 과 Kind 를 설명하는 TypeMeta를 포함하며, name, namespace, labes 과 같은 것을 보유하는 ObjectMeta 도 포함한다.

CronJobList 는 단순히 여러 CronJob 을 위한 컨테이너이다. LIST와 같은 대량 작업에 사용되는 Kind 이다.

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// CronJob is the Schema for the cronjobs API
type CronJob struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   CronJobSpec   `json:"spec,omitempty"`
    Status CronJobStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// CronJobList contains a list of CronJob
type CronJobList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []CronJob `json:"items"`
}

마지막으로 API group 에 Go 타입을 추가한다. 이렇게 하면 이 API group 의 타입을 모든 Scheme 에 추가할 수 있다.

func init() {
    SchemeBuilder.Register(&CronJob{}, &CronJobList{})
}

API 설계

쿠버네티스에는 API를 설계하는 방법에 대한 몇 가지 규칙이 있다. 즉, 직렬화된 모든 필드는 camelCase 여야 하며 JSON 구조체 태그를 사용하여 이를 지정한다. 또한, 필드가 비어 있을 때 직렬화에서 필드를 생략해야 한다는 것을 표시하기 위해 omitempty 구조체 태그를 사용할 수도 있다.

필드는 대부분의 기본 유형을 사용할 수 있다. 다만 숫자는 예외이다. API 호환성을 위해 정수의 경우 int32int64, 소수의 경우 resource.Quantity 와 같이 3가지 형식의 숫자를 허용한다.

Quantity 는 10진수에 대한 특수 표기법으로, 머신 간에 이식성을 높이기 위해 명시적으로 고정된 표현을 가지고 있다.

예를 들어 2m 값은 십진수 표기법에서 0.002 를 의미한다. 2Ki 는 십진수로 2048 을 의미하고, 2K 는 십진수로 2000 을 의미한다. 분수를 지정하려면 정수를 사용할 수 있는 접미사로 전환하면 된다(예: 2.52500m).

지원되는 베이스는 두 가지이다: 10과 2(각각 10진수 및 2진수라고 함)이다. 10진수는 "nomal" SI 접미사(예: MK)로 표시되며, 2진수는 "mebi" 표기법(예: MiKi)으로 지정된다. 메가바이트와 메비바이트를 생각하면 된다.

우리가 사용하는 또 다른 특수 유형이 하나 더 있는데, 바로 metav1.Time 이다. 이것은 고정된 이식 가능한 직렬화 형식을 가지고 있다는 점을 제외하면 time.Time 과 동일하게 작동한다.

package v1

import (
    batchv1 "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to b

CronJob 을 세부적으로 살펴보자.

먼저 spec 을 보면, spec 에는 원하는 상태가 저장되므로 controller 에 대한 모든 "입력" 은 여기에 저장된다.

기본적으로 크론잡에는 다음과 같은 요소가 필요하다:

  • 스케줄 (CronJob 내의 cron)
  • 실행할 Job 에 대한 template (CronJob 내의 job)

편하게 만들어줄 몇 가지 추가 기능도 필요하다:

  • job 시작에 대한 deadline (이 deadline 을 놓치면 다음 예정된 시간까지 기다리게 된다)
  • 여러 job 이 한 번에 실행될 경우 어떻게 할 것인가(기다릴 것인가? 기존 job 을 중지할 것인가? 둘 다 실행할 것인가?)
  • CronJob 에 문제가 있을 경우, CronJob 실행을 일시 중지하는 방법
  • 이전 job 기록에 대한 limit

자신의 상태를 읽지 않기 때문에 job 이 실행되었는지 여부를 추적할 수 있는 다른 방법이 필요하다. 이를 위해 적어도 하나의 이전 job 을 사용할 수 있다.

// CronJobSpec defines the desired state of CronJob
type CronJobSpec struct {
    //+kubebuilder:validation:MinLength=0

    // The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
    Schedule string `json:"schedule"`

    //+kubebuilder:validation:Minimum=0

    // Optional deadline in seconds for starting the job if it misses scheduled
    // time for any reason.  Missed jobs executions will be counted as failed ones.
    // +optional
    StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`

    // Specifies how to treat concurrent executions of a Job.
    // Valid values are:
    // - "Allow" (default): allows CronJobs to run concurrently;
    // - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
    // - "Replace": cancels currently running job and replaces it with a new one
    // +optional
    ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`

    // This flag tells the controller to suspend subsequent executions, it does
    // not apply to already started executions.  Defaults to false.
    // +optional
    Suspend *bool `json:"suspend,omitempty"`

    // Specifies the job that will be created when executing a CronJob.
    JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`

    //+kubebuilder:validation:Minimum=0

    // The number of successful finished jobs to retain.
    // This is a pointer to distinguish between explicit zero and not specified.
    // +optional
    SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`

    //+kubebuilder:validation:Minimum=0

    // The number of failed finished jobs to retain.
    // This is a pointer to distinguish between explicit zero and not specified.
    // +optional
    FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
}

ConcurrencyPolicy 는 실제로는 string 이지만, 재사용과 유효성 검사를 쉽게 할 수 있으므로 타입을 재정의 했다.

// ConcurrencyPolicy describes how the job will be handled.
// Only one of the following concurrent policies may be specified.
// If none of the following policies is specified, the default one
// is AllowConcurrent.
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string

const (
    // AllowConcurrent allows CronJobs to run concurrently.
    AllowConcurrent ConcurrencyPolicy = "Allow"

    // ForbidConcurrent forbids concurrent runs, skipping next run if previous
    // hasn't finished yet.
    ForbidConcurrent ConcurrencyPolicy = "Forbid"

    // ReplaceConcurrent cancels currently running job and replaces it with a new one.
    ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

다음은 관찰된 상태를 저장하는 status 를 디자인해 보자.

현재 실행중인 job 목록과 마지막으로 job 을 성공적으로 실행한 시간을 유지한다. 그리고 직렬화를 위해서 time.Time 대신 metav1.Time 을 사용한다.

// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
    // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
    // Important: Run "make" to regenerate code after modifying this file

    // A list of pointers to currently running jobs.
    // +optional
    Active []corev1.ObjectReference `json:"active,omitempty"`

    // Information when was the last time the job was successfully scheduled.
    // +optional
    LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}

Controller 구현

컨트롤러는 쿠버네티스와 모든 operator 의 핵심이다.

컨트롤러의 역할은 주어진 오브젝트에 대해 실세계의 실제 상태(클러스터 상태와 잠재적으로 외부 상태(예: Kubelet의 경우 컨테이너 실행 또는 Cloud Provider 의 경우 로드밸런서)가 오브젝트의 원하는 상태와 일치하는지 확인하는 것이다. 각 컨트롤러는 하나의 루트 Kind 에 중점을 두지만 다른 Kind 와 상호 작용할 수 있다.

이 프로세스를 reconciling 이라고 부른다.

controller-runtime 에서 특정 kind 에 대한 reconciling 을 구현하는 로직을 Reconciler 라고 한다.

internal/controller/cronjob_controller.go 파일을 살펴 보자.

기본으로 임포트하는 모듈이 있는데, core controller-runtime 라이브러리와 client 패키지, API 타입 패키지가 있다.

package controllers

import (
    "context"

    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"

    batchv1 "tutorial.kubebuilder.io/project/api/v1"
)

컨트롤러의 기본 로직은 다음과 같다.

  1. 명명된 CronJob을 로드한다.
  2. 모든 active job 을 나열하고, status 를 업데이트 한다.
  3. 히스토리 수 제한에 따라 오래된 job 을 정리한다.
  4. Suspend 값이 세팅되었는지 확인 (값이 세팅된 경우 다른 작업을 수행하지 않음)
  5. 다음 예약된 실행 가져오기
  6. 새로운 job 이 스케줄에 맞고, deadline 이 지나지 않았으며, 동시성 정책에 의해 차단되지 않은 경우 실행
  7. 실행 중인 job 이 보이거나 (자동으로 수행됨) 다음 예약된 실행 시간이 되면 Requeue 한다.

임포트 모듈을 추가한다.

package controller

import (
    "context"
    "fmt"
    "sort"
    "time"

    "github.com/robfig/cron"
    kbatch "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ref "k8s.io/client-go/tools/reference"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"

    batchv1 "tutorial.kubebuilder.io/project/api/v1"
)

테스트를 위해서 Clock 을 추가한다.

// CronJobReconciler reconciles a CronJob object
type CronJobReconciler struct {
    client.Client
    Scheme *runtime.Scheme
    Clock
}

type realClock struct{}

func (_ realClock) Now() time.Time { return time.Now() }

// clock knows how to get the current time.
// It can be used to fake out timing for testing.
type Clock interface {
    Now() time.Time
}

RBAC 을 위해 batch group 의 job 을 핸들링 할 수 있는 권한을 추가한다.

//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get

annotation 을 위한 변수를 추가한다.

var (
    scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
)

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the CronJob object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)

1. 이름으로 CronJob 을 로드한다

client 를 사용하여 CronJob 을 가져온다. 모든 client 의 메소드에는 취소가 가능하게 context 를 아규먼트로 받는다.

    var cronJob batchv1.CronJob
    if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
        log.Error(err, "unable to fetch CronJob")
        // we'll ignore not-found errors, since they can't be fixed by an immediate
        // requeue (we'll need to wait for a new notification), and we can get them
        // on deleted requests.
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

2. 모든 active job 을 나열하고 status 를 업데이트 한다.

    var childJobs kbatch.JobList
    if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
        log.Error(err, "unable to list child Jobs")
        return ctrl.Result{}, err
    }

active job 을 조회했으면 이를 active, successful, failded job 으로 분류한다.

    // find the active list of jobs
    var activeJobs []*kbatch.Job
    var successfulJobs []*kbatch.Job
    var failedJobs []*kbatch.Job
    var mostRecentTime *time.Time // find the last run so we can update the status
    isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
        for _, c := range job.Status.Conditions {
            if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
                return true, c.Type
            }
        }

        return false, ""
    }
    getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
        timeRaw := job.Annotations[scheduledTimeAnnotation]
        if len(timeRaw) == 0 {
            return nil, nil
        }

        timeParsed, err := time.Parse(time.RFC3339, timeRaw)
        if err != nil {
            return nil, err
        }
        return &timeParsed, nil
    }
    for i, job := range childJobs.Items {
        _, finishedType := isJobFinished(&job)
        switch finishedType {
        case "": // ongoing
            activeJobs = append(activeJobs, &childJobs.Items[i])
        case kbatch.JobFailed:
            failedJobs = append(failedJobs, &childJobs.Items[i])
        case kbatch.JobComplete:
            successfulJobs = append(successfulJobs, &childJobs.Items[i])
        }

        // We'll store the launch time in an annotation, so we'll reconstitute that from
        // the active jobs themselves.
        scheduledTimeForJob, err := getScheduledTimeForJob(&job)
        if err != nil {
            log.Error(err, "unable to parse schedule time for child job", "job", &job)
            continue
        }
        if scheduledTimeForJob != nil {
            if mostRecentTime == nil {
                mostRecentTime = scheduledTimeForJob
            } else if mostRecentTime.Before(*scheduledTimeForJob) {
                mostRecentTime = scheduledTimeForJob
            }
        }
    }

    if mostRecentTime != nil {
        cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
    } else {
        cronJob.Status.LastScheduleTime = nil
    }
    cronJob.Status.Active = nil
    for _, activeJob := range activeJobs {
        jobRef, err := ref.GetReference(r.Scheme, activeJob)
        if err != nil {
            log.Error(err, "unable to make reference to active job", "job", activeJob)
            continue
        }
        cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
    }

디버깅을 위해서 log 를 남긴다.

    log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))

status 를 업데이트 한다.

    if err := r.Status().Update(ctx, &cronJob); err != nil {
        log.Error(err, "unable to update CronJob status")
        return ctrl.Result{}, err
    }

3. 히스토리 수 제한에 따른 오래된 job 삭제하기

    // NB: deleting these are "best effort" -- if we fail on a particular one,
    // we won't requeue just to finish the deleting.
    if cronJob.Spec.FailedJobsHistoryLimit != nil {
        sort.Slice(failedJobs, func(i, j int) bool {
            if failedJobs[i].Status.StartTime == nil {
                return failedJobs[j].Status.StartTime != nil
            }
            return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
        })
        for i, job := range failedJobs {
            if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
                break
            }
            if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
                log.Error(err, "unable to delete old failed job", "job", job)
            } else {
                log.V(0).Info("deleted old failed job", "job", job)
            }
        }
    }

    if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
        sort.Slice(successfulJobs, func(i, j int) bool {
            if successfulJobs[i].Status.StartTime == nil {
                return successfulJobs[j].Status.StartTime != nil
            }
            return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
        })
        for i, job := range successfulJobs {
            if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
                break
            }
            if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
                log.Error(err, "unable to delete old successful job", "job", job)
            } else {
                log.V(0).Info("deleted old successful job", "job", job)
            }
        }
    }

4. Suspend 값이 세팅되었는지 확인

CronJob 객체에 suspend 값이 세팅되어 있다면 CronJob 을 일시 중단한다. CronJob 을 삭제하지 않고 잠시 멈추고 싶을 때 사용할 수 있다.

    if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
        log.V(1).Info("cronjob suspended, skipping")
        return ctrl.Result{}, nil
    }

5. 다음 예약된 실행 가져오기

잠시 멈춤 상태가 아니라면 다음 스케줄을 가져온다.

    getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
        sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
        if err != nil {
            return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
        }

        // for optimization purposes, cheat a bit and start from our last observed run time
        // we could reconstitute this here, but there's not much point, since we've
        // just updated it.
        var earliestTime time.Time
        if cronJob.Status.LastScheduleTime != nil {
            earliestTime = cronJob.Status.LastScheduleTime.Time
        } else {
            earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
        }
        if cronJob.Spec.StartingDeadlineSeconds != nil {
            // controller is not going to schedule anything below this point
            schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))

            if schedulingDeadline.After(earliestTime) {
                earliestTime = schedulingDeadline
            }
        }
        if earliestTime.After(now) {
            return time.Time{}, sched.Next(now), nil
        }

        starts := 0
        for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
            lastMissed = t
            // An object might miss several starts. For example, if
            // controller gets wedged on Friday at 5:01pm when everyone has
            // gone home, and someone comes in on Tuesday AM and discovers
            // the problem and restarts the controller, then all the hourly
            // jobs, more than 80 of them for one hourly scheduledJob, should
            // all start running with no further intervention (if the scheduledJob
            // allows concurrency and late starts).
            //
            // However, if there is a bug somewhere, or incorrect clock
            // on controller's server or apiservers (for setting creationTimestamp)
            // then there could be so many missed start times (it could be off
            // by decades or more), that it would eat up all the CPU and memory
            // of this controller. In that case, we want to not try to list
            // all the missed start times.
            starts++
            if starts > 100 {
                // We can't get the most recent times so just return an empty slice
                return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
            }
        }
        return lastMissed, sched.Next(now), nil
    }
    // figure out the next times that we need to create
    // jobs at (or anything we missed).
    missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
    if err != nil {
        log.Error(err, "unable to figure out CronJob schedule")
        // we don't really care about requeuing until we get an update that
        // fixes the schedule, so don't return an error
        return ctrl.Result{}, nil
    }

requeue 할 값을 준비만 해 놓는다.

    scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
    log = log.WithValues("now", r.Now(), "next run", nextRun)

6. 새로운 job 이 스케줄에 맞고, deadline 이 지나지 않았으며, 동시성 정책에 의해 차단되지 않은 경우 실행

    if missedRun.IsZero() {
        log.V(1).Info("no upcoming scheduled times, sleeping until next")
        return scheduledResult, nil
    }

    // make sure we're not too late to start the run
    log = log.WithValues("current run", missedRun)
    tooLate := false
    if cronJob.Spec.StartingDeadlineSeconds != nil {
        tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
    }
    if tooLate {
        log.V(1).Info("missed starting deadline for last run, sleeping till next")
        // TODO(directxman12): events
        return scheduledResult, nil
    }
    // figure out how to run this job -- concurrency policy might forbid us from running
    // multiple at the same time...
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
        log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
        return scheduledResult, nil
    }

    // ...or instruct us to replace existing ones...
    if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
        for _, activeJob := range activeJobs {
            // we don't care if the job was already deleted
            if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
                log.Error(err, "unable to delete active job", "job", activeJob)
                return ctrl.Result{}, err
            }
        }
    }
    constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
        // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
        name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())

        job := &kbatch.Job{
            ObjectMeta: metav1.ObjectMeta{
                Labels:      make(map[string]string),
                Annotations: make(map[string]string),
                Name:        name,
                Namespace:   cronJob.Namespace,
            },
            Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
        }
        for k, v := range cronJob.Spec.JobTemplate.Annotations {
            job.Annotations[k] = v
        }
        job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
        for k, v := range cronJob.Spec.JobTemplate.Labels {
            job.Labels[k] = v
        }
        if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
            return nil, err
        }

        return job, nil
    }
    // actually make the job...
    job, err := constructJobForCronJob(&cronJob, missedRun)
    if err != nil {
        log.Error(err, "unable to construct job from template")
        // don't bother requeuing until we get a change to the spec
        return scheduledResult, nil
    }

    // ...and create it on the cluster
    if err := r.Create(ctx, job); err != nil {
        log.Error(err, "unable to create Job for CronJob", "job", job)
        return ctrl.Result{}, err
    }

    log.V(1).Info("created Job for CronJob run", "job", job)

7. 실행 중인 job 이 보이거나 (자동으로 수행됨) 다음 예약된 실행 시간이 되면 Requeue 한다.

    // we'll requeue once we see the running job, and update our status
    return scheduledResult, nil
}

Setup

var (
    jobOwnerKey = ".metadata.controller"
    apiGVStr    = batchv1.GroupVersion.String()
)

// SetupWithManager sets up the controller with the Manager.
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
    // set up a real clock, since we're not in a test
    if r.Clock == nil {
        r.Clock = realClock{}
    }

    if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj client.Object) []string {
        // grab the job object, extract the owner...
        job := rawObj.(*kbatch.Job)
        owner := metav1.GetControllerOf(job)
        if owner == nil {
            return nil
        }
        // ...make sure it's a CronJob...
        if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
            return nil
        }

        // ...and if so, return it
        return []string{owner.Name}
    }); err != nil {
        return err
    }

    return ctrl.NewControllerManagedBy(mgr).
        For(&batchv1.CronJob{}).
        Owns(&kbatch.Job{}).
        Complete(r)
}

Webhook 생성

$ kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation

api/v1/cronjob_webhook.go 파일이 생성된다. 해당 파일에 체크 로직을 추가한다.

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *CronJob) Default() {
    cronjoblog.Info("default", "name", r.Name)

    if r.Spec.ConcurrencyPolicy == "" {
        r.Spec.ConcurrencyPolicy = AllowConcurrent
    }
    if r.Spec.Suspend == nil {
        r.Spec.Suspend = new(bool)
    }
    if r.Spec.SuccessfulJobsHistoryLimit == nil {
        r.Spec.SuccessfulJobsHistoryLimit = new(int32)
        *r.Spec.SuccessfulJobsHistoryLimit = 3
    }
    if r.Spec.FailedJobsHistoryLimit == nil {
        r.Spec.FailedJobsHistoryLimit = new(int32)
        *r.Spec.FailedJobsHistoryLimit = 1
    }
}
var _ webhook.Validator = &CronJob{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateCreate() error {
    cronjoblog.Info("validate create", "name", r.Name)

    return r.validateCronJob()
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateUpdate(old runtime.Object) error {
    cronjoblog.Info("validate update", "name", r.Name)

    return r.validateCronJob()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateDelete() error {
    cronjoblog.Info("validate delete", "name", r.Name)

    // TODO(user): fill in your validation logic upon object deletion.
    return nil
}
func (r *CronJob) validateCronJob() error {
    var allErrs field.ErrorList
    if err := r.validateCronJobName(); err != nil {
        allErrs = append(allErrs, err)
    }
    if err := r.validateCronJobSpec(); err != nil {
        allErrs = append(allErrs, err)
    }
    if len(allErrs) == 0 {
        return nil
    }

    return apierrors.NewInvalid(
        schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"},
        r.Name, allErrs)
}
func (r *CronJob) validateCronJobSpec() *field.Error {
    // The field helpers from the kubernetes API machinery help us return nicely
    // structured validation errors.
    return validateScheduleFormat(
        r.Spec.Schedule,
        field.NewPath("spec").Child("schedule"))
}
func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error {
    if _, err := cron.ParseStandard(schedule); err != nil {
        return field.Invalid(fldPath, schedule, err.Error())
    }
    return nil
}
func (r *CronJob) validateCronJobName() *field.Error {
    if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 {
        // The job name length is 63 character like all Kubernetes objects
        // (which must fit in a DNS subdomain). The cronjob controller appends
        // a 11-character suffix to the cronjob (`-$TIMESTAMP`) when creating
        // a job. The job name length limit is 63 characters. Therefore cronjob
        // names must have length <= 63-11=52. If we don't validate this here,
        // then job creation will fail later.
        return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters")
    }
    return nil
}

Controller 배포 및 실행

CR 과 CRD yaml 을 만드는 명령어를 수행한다.

$ make manifests

CRD 를 배포한다.


$ make install

WebHook 를 로컬에서 다른 터미널로 실행한다.

$ export ENABLE_WEBHOOKS=false
$ make run

config/samples/batch_v1_cronjob.yaml 파일에 값을 추가한다.

apiVersion: batch.tutorial.kubebuilder.io/v1
kind: CronJob
metadata:
  labels:
    app.kubernetes.io/name: cronjob
    app.kubernetes.io/instance: cronjob-sample
    app.kubernetes.io/part-of: cronjob-kubebuilder
    app.kubernetes.io/managed-by: kustomize
    app.kubernetes.io/created-by: cronjob-kubebuilder
  name: cronjob-sample
spec:
  schedule: "*/1 * * * *"
  startingDeadlineSeconds: 60
  concurrencyPolicy: Allow # explicitly specify, but Allow is also default.
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: hello
              image: busybox
              args:
                - /bin/sh
                - -c
                - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

Reference site

반응형