Custom Controller 3 - CronJob 구현하기
Kubernetes 에는 이미 CronJob 이라는 리소스 타입이 있지만, Kubebuilder 을 이용하여 Custom Controller 로 재작성 해보는 연습을 해보도록 하자.
Project 구조 만들기
먼저, Project 구조를 만들기 위해 아래와 같이 kubebuilder init
명령어를 실행한다.
$ mkdir -p cronjob-kubebuilder
$ cd cronjob-kubebuilder
$ kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project
도메인을 tutorial.kubebuilder.io
로 했으므로 모든 API Group 은 <group>.tutorial.kubebuilder.io
방식으로 정해지게 된다. 또한 특별히 프로젝트 이름은 지정하지 않았는데, --project-name=<dns1123-label-string>
과 같이 옵션을 추가하지 않으면 폴더의 이름이 기본적으로 프로젝트 이름이 된다. (여기서 프로젝트명은 DNS-1123 label 규칙을 따라야 한다)
한가지 주의해야 할 점은 cronjob-kubebuilder
디렉토리는 $GOPATH
경로 아래에 있어서는 안된다. 이는 Go modules
의 규칙 때문인데 좀 더 자세히 알고 싶으면 https://go.dev/blog/using-go-modules 블로그 포스트를 읽어보자.
만들어진 프로젝트의 구조는 다음과 같다.
$ tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── cmd
│ └── main.go
├── config
│ ├── default
│ ├── manager
│ ├── prometheus
│ └── rbac
├── go.mod
├── go.sum
└── hack
└── boilerplate.go.txt
7 directories, 8 files
go.mod
파일은 모듈 디펜던시를 표시하고, Makefile
은 custom controller 를 빌드하고 디플로이 할 수 있다.
config
디렉토리 아래에는 Kustomize 로 작성되어 CustomResourceDefinition
, RBAC
, WebhookConfiguration
등의 yaml 파일들이 정의되어 있다.
특히, config/manager
디렉토리에는 Cluster 에 Custom Controller 를 파드 형태로 배포할 수 있는 Kustomize yaml 이 있고, config/rbac
디렉토리에는 서비스 어카운트로 Custom Controller 의 권한이 정의되어 있다.
Custom Controller 의 Entrypoint 는 cmd/main.go
파일이다.
처음 필요한 모듈을 임포트 한 것을 보면 아래 2개가 보인다.
- core
controller-runtime
라이브러리 - 기본 controller-runtime 로깅인
Zap
package main
import (
"flag"
"fmt"
"os"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
// +kubebuilder:scaffold:imports
)
모든 컨트롤러에는 Scheme
이 필요하다. 스킴은 Kind
와 Go types
간의 매핑을 제공해 준다.
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
main function
에는 아래의 내용들이 들어가 있다.
- 기본 플래그 셋업
manager
를 생성하여 모든 Custom Controller 의 실행을 추적하고, shared cache 세팅하고,scheme
을 아규먼트로 넘기주어 클라이언트를 API 서버에 설정한다.- manager 를 실행하면 manager 가 모든 컨트롤러와 웹혹을 실행한다.
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "80807133.tutorial.kubebuilder.io",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
manager
생성 시에 컨트롤러가 특정 네임스페이스의 리소스만을 감시할 수 있도록 할 수 있다.
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "80807133.tutorial.kubebuilder.io",
})
이렇게 특정 네임스페이스를 지정한 경우에는 권한을 ClusterRole
과 ClusterRoleBinding
에서 Role
과 RoleBinding
으로 변경하는 것을 권장한다.
그리고 MutiNamespacedCacheBuilder
를 사용하면 특정 네임스페이스의 묶음의 리소스만을 감시하게 제한할 수 있다.
var namespaces []string // List of Namespaces
cache.Options.Namespaces = namespaces
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
NewCache: cache.MultiNamespacedCacheBuilder(namespaces),
MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort),
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "80807133.tutorial.kubebuilder.io",
})
MultiNamespacedCacheBuilder
는 deprecated api 이므로 cache.Options.Namespaces
를 사용한다. (https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/cache#Options)
Groups, Versions, Kinds and Resources
쿠버네티스에서 API 에 대해서 이야기할 때는 groups, versions, kinds and resources 4개의 용어를 사용한다.
쿠버네티스의 API Group 은 단순히 관련 기능의 모음이다. 각 Group 에는 하나 이상의 Version 이 있으며, 이름에서 알 수 있듯이 시간이 지남에 따라 API의 작동 방식을 변경할 수 있다.
각 API group-version 에는 하나 이상의 API type 이 포함되며, 이를 Kind 라고 부른다. Kind 는 Version 간에 양식을 변경할 수 있지만, 각 양식은 어떻게든 다른 양식의 모든 데이터를 저장할 수 있어야 한다(데이터를 필드 또는 주석에 저장할 수 있음). 즉, 이전 API 버전을 사용해도 최신 데이터가 손실되거나 손상되지 않는다.
Resource 란 간단히 말해서 API 안에서 Kind 를 사용하는 것이다. 종종, Kind 와 Resource 는 일대일로 매핑된다. 예를 들어, Pod Resource 는 Pod Kind 에 해당한다. 그러나 때로는 여러 Resource 에서 동일한 Kind를 반환할 수도 있다. 예를 들어, Scale Kind 는 deployments/scale 또는 replicasets/scale 과 같은 모든 scale 하위 리소스에 의해 반환된다. 이것이 바로 Kubernetes HorizontalPodAutoscaler 가 서로 다른 resource 와 상호 작용할 수 있는 이유다. 그러나 CRD를 사용하면 각 Kind 는 단일 resource 에 해당한다.
resource 는 항상 소문자이며, 관례에 따라 소문자 형태의 Kind를 사용한다.
특정 group-version 에서 어떤 kind 를 지칭할 때는 줄여서 GroupVersionKind 혹은 줄여서 GVK 라고 부른다. 같은 방식으로 resource 도 GroupVersionResource 혹은 GVR 이라고 부른다.
GVK 는 패키지에서 Go type 에 해당한다.
API 는 왜 만들어야 할까?
Kind 에 대해서 Custom Resource (CR) 과 Custom Resource Definition (CRD) 을 만들어야 한다. 그 이유는 CustomResourceDefinitions 으로 Kubernetes API 를 확장할 수 있기 때문이다.
새롭게 만드는 API 는 쿠버네티스에게 custom object 를 가리치는 방법이다.
기본으로 CRD 는 customized Objects 의 정의이며, CR 은 그것에 대한 인스턴스이다.
API 추가
아래 명령으로 새로운 Kind 를 추가하자.
$ kubebuilder create api --group batch --version v1 --kind CronJob
Create Resource 와 Create Controller 를 하겠냐고 물으면 y
로 대답한다.
$ tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│ └── v1
├── bin
│ └── controller-gen
├── cmd
│ └── main.go
├── config
│ ├── crd
│ ├── default
│ ├── manager
│ ├── prometheus
│ ├── rbac
│ └── samples
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
└── internal
└── controller
이 경우 batch.tutorial.kubebuilder.io/v1
에 해당하는 api/v1
디렉토리가 생성된다.
api/v1/cronjob_types.go
파일을 보면, 모든 쿠버네티스 Kind 에 공통으로 포함된 metadata 를 가리키는 meta/v1
API group 을 임포트 하고 있다.
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
다음으로 Kind 의 Spec
과 Status
에 대한 type 을 정의 한다.
쿠버네티스는 원하는 상태(Spec)를 실제 클러스터 상태(Status) 및 외부 상태와 조정한 다음 관찰한 것(Status)를 기록하는 방식으로 작동한다. 따라서 모든 기능 object 는 spec 과 status 를 포함한다. ConfigMap 과 같은 몇몇 타입은 원하는 상태를 인코딩하지 않기 때문에 이 패턴을 따르지 않지만 대부분의 타입은 이 패턴을 따른다.
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// CronJobSpec defines the desired state of CronJob
type CronJobSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
실제 Kind 에 해당하는 타입인 CronJob 과 CronJobList 를 정의한다. CronJob 은 루트 타입이며, CronJob kind를 설명한다. 모든 쿠버네티스 오브젝트와 마찬가지로, API version 과 Kind 를 설명하는 TypeMeta를 포함하며, name, namespace, labes 과 같은 것을 보유하는 ObjectMeta 도 포함한다.
CronJobList 는 단순히 여러 CronJob 을 위한 컨테이너이다. LIST와 같은 대량 작업에 사용되는 Kind 이다.
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// CronJob is the Schema for the cronjobs API
type CronJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CronJobSpec `json:"spec,omitempty"`
Status CronJobStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// CronJobList contains a list of CronJob
type CronJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []CronJob `json:"items"`
}
마지막으로 API group 에 Go 타입을 추가한다. 이렇게 하면 이 API group 의 타입을 모든 Scheme 에 추가할 수 있다.
func init() {
SchemeBuilder.Register(&CronJob{}, &CronJobList{})
}
API 설계
쿠버네티스에는 API를 설계하는 방법에 대한 몇 가지 규칙이 있다. 즉, 직렬화된 모든 필드는 camelCase
여야 하며 JSON 구조체 태그를 사용하여 이를 지정한다. 또한, 필드가 비어 있을 때 직렬화에서 필드를 생략해야 한다는 것을 표시하기 위해 omitempty
구조체 태그를 사용할 수도 있다.
필드는 대부분의 기본 유형을 사용할 수 있다. 다만 숫자는 예외이다. API 호환성을 위해 정수의 경우 int32
및 int64
, 소수의 경우 resource.Quantity
와 같이 3가지 형식의 숫자를 허용한다.
Quantity 는 10진수에 대한 특수 표기법으로, 머신 간에 이식성을 높이기 위해 명시적으로 고정된 표현을 가지고 있다.
예를 들어 2m
값은 십진수 표기법에서 0.002
를 의미한다. 2Ki
는 십진수로 2048
을 의미하고, 2K
는 십진수로 2000
을 의미한다. 분수를 지정하려면 정수를 사용할 수 있는 접미사로 전환하면 된다(예: 2.5
는 2500m
).
지원되는 베이스는 두 가지이다: 10과 2(각각 10진수 및 2진수라고 함)이다. 10진수는 "nomal" SI 접미사(예: M
및 K
)로 표시되며, 2진수는 "mebi" 표기법(예: Mi
및 Ki
)으로 지정된다. 메가바이트와 메비바이트를 생각하면 된다.
우리가 사용하는 또 다른 특수 유형이 하나 더 있는데, 바로 metav1.Time
이다. 이것은 고정된 이식 가능한 직렬화 형식을 가지고 있다는 점을 제외하면 time.Time
과 동일하게 작동한다.
package v1
import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to b
CronJob 을 세부적으로 살펴보자.
먼저 spec 을 보면, spec 에는 원하는 상태가 저장되므로 controller 에 대한 모든 "입력" 은 여기에 저장된다.
기본적으로 크론잡에는 다음과 같은 요소가 필요하다:
- 스케줄 (CronJob 내의 cron)
- 실행할 Job 에 대한 template (CronJob 내의 job)
편하게 만들어줄 몇 가지 추가 기능도 필요하다:
- job 시작에 대한 deadline (이 deadline 을 놓치면 다음 예정된 시간까지 기다리게 된다)
- 여러 job 이 한 번에 실행될 경우 어떻게 할 것인가(기다릴 것인가? 기존 job 을 중지할 것인가? 둘 다 실행할 것인가?)
- CronJob 에 문제가 있을 경우, CronJob 실행을 일시 중지하는 방법
- 이전 job 기록에 대한 limit
자신의 상태를 읽지 않기 때문에 job 이 실행되었는지 여부를 추적할 수 있는 다른 방법이 필요하다. 이를 위해 적어도 하나의 이전 job 을 사용할 수 있다.
// CronJobSpec defines the desired state of CronJob
type CronJobSpec struct {
//+kubebuilder:validation:MinLength=0
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
Schedule string `json:"schedule"`
//+kubebuilder:validation:Minimum=0
// Optional deadline in seconds for starting the job if it misses scheduled
// time for any reason. Missed jobs executions will be counted as failed ones.
// +optional
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Allow" (default): allows CronJobs to run concurrently;
// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
// - "Replace": cancels currently running job and replaces it with a new one
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
// This flag tells the controller to suspend subsequent executions, it does
// not apply to already started executions. Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty"`
// Specifies the job that will be created when executing a CronJob.
JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`
//+kubebuilder:validation:Minimum=0
// The number of successful finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
//+kubebuilder:validation:Minimum=0
// The number of failed finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
}
ConcurrencyPolicy 는 실제로는 string 이지만, 재사용과 유효성 검사를 쉽게 할 수 있으므로 타입을 재정의 했다.
// ConcurrencyPolicy describes how the job will be handled.
// Only one of the following concurrent policies may be specified.
// If none of the following policies is specified, the default one
// is AllowConcurrent.
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string
const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"
// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"
// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)
다음은 관찰된 상태를 저장하는 status 를 디자인해 보자.
현재 실행중인 job 목록과 마지막으로 job 을 성공적으로 실행한 시간을 유지한다. 그리고 직렬화를 위해서 time.Time
대신 metav1.Time
을 사용한다.
// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// A list of pointers to currently running jobs.
// +optional
Active []corev1.ObjectReference `json:"active,omitempty"`
// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}
Controller 구현
컨트롤러는 쿠버네티스와 모든 operator 의 핵심이다.
컨트롤러의 역할은 주어진 오브젝트에 대해 실세계의 실제 상태(클러스터 상태와 잠재적으로 외부 상태(예: Kubelet의 경우 컨테이너 실행 또는 Cloud Provider 의 경우 로드밸런서)가 오브젝트의 원하는 상태와 일치하는지 확인하는 것이다. 각 컨트롤러는 하나의 루트 Kind 에 중점을 두지만 다른 Kind 와 상호 작용할 수 있다.
이 프로세스를 reconciling
이라고 부른다.
controller-runtime 에서 특정 kind 에 대한 reconciling 을 구현하는 로직을 Reconciler
라고 한다.
internal/controller/cronjob_controller.go
파일을 살펴 보자.
기본으로 임포트하는 모듈이 있는데, core controller-runtime 라이브러리와 client 패키지, API 타입 패키지가 있다.
package controllers
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
batchv1 "tutorial.kubebuilder.io/project/api/v1"
)
컨트롤러의 기본 로직은 다음과 같다.
- 명명된 CronJob을 로드한다.
- 모든 active job 을 나열하고, status 를 업데이트 한다.
- 히스토리 수 제한에 따라 오래된 job 을 정리한다.
- Suspend 값이 세팅되었는지 확인 (값이 세팅된 경우 다른 작업을 수행하지 않음)
- 다음 예약된 실행 가져오기
- 새로운 job 이 스케줄에 맞고, deadline 이 지나지 않았으며, 동시성 정책에 의해 차단되지 않은 경우 실행
- 실행 중인 job 이 보이거나 (자동으로 수행됨) 다음 예약된 실행 시간이 되면 Requeue 한다.
임포트 모듈을 추가한다.
package controller
import (
"context"
"fmt"
"sort"
"time"
"github.com/robfig/cron"
kbatch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ref "k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
batchv1 "tutorial.kubebuilder.io/project/api/v1"
)
테스트를 위해서 Clock 을 추가한다.
// CronJobReconciler reconciles a CronJob object
type CronJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Clock
}
type realClock struct{}
func (_ realClock) Now() time.Time { return time.Now() }
// clock knows how to get the current time.
// It can be used to fake out timing for testing.
type Clock interface {
Now() time.Time
}
RBAC 을 위해 batch group 의 job 을 핸들링 할 수 있는 권한을 추가한다.
//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get
annotation 을 위한 변수를 추가한다.
var (
scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
)
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the CronJob object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
1. 이름으로 CronJob 을 로드한다
client 를 사용하여 CronJob 을 가져온다. 모든 client 의 메소드에는 취소가 가능하게 context 를 아규먼트로 받는다.
var cronJob batchv1.CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "unable to fetch CronJob")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
2. 모든 active job 을 나열하고 status 를 업데이트 한다.
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
return ctrl.Result{}, err
}
active job 을 조회했으면 이를 active, successful, failded job 으로 분류한다.
// find the active list of jobs
var activeJobs []*kbatch.Job
var successfulJobs []*kbatch.Job
var failedJobs []*kbatch.Job
var mostRecentTime *time.Time // find the last run so we can update the status
isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
for _, c := range job.Status.Conditions {
if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
timeRaw := job.Annotations[scheduledTimeAnnotation]
if len(timeRaw) == 0 {
return nil, nil
}
timeParsed, err := time.Parse(time.RFC3339, timeRaw)
if err != nil {
return nil, err
}
return &timeParsed, nil
}
for i, job := range childJobs.Items {
_, finishedType := isJobFinished(&job)
switch finishedType {
case "": // ongoing
activeJobs = append(activeJobs, &childJobs.Items[i])
case kbatch.JobFailed:
failedJobs = append(failedJobs, &childJobs.Items[i])
case kbatch.JobComplete:
successfulJobs = append(successfulJobs, &childJobs.Items[i])
}
// We'll store the launch time in an annotation, so we'll reconstitute that from
// the active jobs themselves.
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
if err != nil {
log.Error(err, "unable to parse schedule time for child job", "job", &job)
continue
}
if scheduledTimeForJob != nil {
if mostRecentTime == nil {
mostRecentTime = scheduledTimeForJob
} else if mostRecentTime.Before(*scheduledTimeForJob) {
mostRecentTime = scheduledTimeForJob
}
}
}
if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}
cronJob.Status.Active = nil
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.Scheme, activeJob)
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}
디버깅을 위해서 log 를 남긴다.
log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
status 를 업데이트 한다.
if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return ctrl.Result{}, err
}
3. 히스토리 수 제한에 따른 오래된 job 삭제하기
// NB: deleting these are "best effort" -- if we fail on a particular one,
// we won't requeue just to finish the deleting.
if cronJob.Spec.FailedJobsHistoryLimit != nil {
sort.Slice(failedJobs, func(i, j int) bool {
if failedJobs[i].Status.StartTime == nil {
return failedJobs[j].Status.StartTime != nil
}
return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
})
for i, job := range failedJobs {
if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete old failed job", "job", job)
} else {
log.V(0).Info("deleted old failed job", "job", job)
}
}
}
if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
sort.Slice(successfulJobs, func(i, j int) bool {
if successfulJobs[i].Status.StartTime == nil {
return successfulJobs[j].Status.StartTime != nil
}
return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
})
for i, job := range successfulJobs {
if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
log.Error(err, "unable to delete old successful job", "job", job)
} else {
log.V(0).Info("deleted old successful job", "job", job)
}
}
}
4. Suspend 값이 세팅되었는지 확인
CronJob 객체에 suspend 값이 세팅되어 있다면 CronJob 을 일시 중단한다. CronJob 을 삭제하지 않고 잠시 멈추고 싶을 때 사용할 수 있다.
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
log.V(1).Info("cronjob suspended, skipping")
return ctrl.Result{}, nil
}
5. 다음 예약된 실행 가져오기
잠시 멈춤 상태가 아니라면 다음 스케줄을 가져온다.
getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
}
// for optimization purposes, cheat a bit and start from our last observed run time
// we could reconstitute this here, but there's not much point, since we've
// just updated it.
var earliestTime time.Time
if cronJob.Status.LastScheduleTime != nil {
earliestTime = cronJob.Status.LastScheduleTime.Time
} else {
earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
}
if cronJob.Spec.StartingDeadlineSeconds != nil {
// controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return time.Time{}, sched.Next(now), nil
}
starts := 0
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
lastMissed = t
// An object might miss several starts. For example, if
// controller gets wedged on Friday at 5:01pm when everyone has
// gone home, and someone comes in on Tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
starts++
if starts > 100 {
// We can't get the most recent times so just return an empty slice
return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return lastMissed, sched.Next(now), nil
}
// figure out the next times that we need to create
// jobs at (or anything we missed).
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
if err != nil {
log.Error(err, "unable to figure out CronJob schedule")
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
}
requeue 할 값을 준비만 해 놓는다.
scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
log = log.WithValues("now", r.Now(), "next run", nextRun)
6. 새로운 job 이 스케줄에 맞고, deadline 이 지나지 않았으며, 동시성 정책에 의해 차단되지 않은 경우 실행
if missedRun.IsZero() {
log.V(1).Info("no upcoming scheduled times, sleeping until next")
return scheduledResult, nil
}
// make sure we're not too late to start the run
log = log.WithValues("current run", missedRun)
tooLate := false
if cronJob.Spec.StartingDeadlineSeconds != nil {
tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
}
if tooLate {
log.V(1).Info("missed starting deadline for last run, sleeping till next")
// TODO(directxman12): events
return scheduledResult, nil
}
// figure out how to run this job -- concurrency policy might forbid us from running
// multiple at the same time...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
return scheduledResult, nil
}
// ...or instruct us to replace existing ones...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete active job", "job", activeJob)
return ctrl.Result{}, err
}
}
}
constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())
job := &kbatch.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: make(map[string]string),
Annotations: make(map[string]string),
Name: name,
Namespace: cronJob.Namespace,
},
Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
}
for k, v := range cronJob.Spec.JobTemplate.Annotations {
job.Annotations[k] = v
}
job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
for k, v := range cronJob.Spec.JobTemplate.Labels {
job.Labels[k] = v
}
if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
return nil, err
}
return job, nil
}
// actually make the job...
job, err := constructJobForCronJob(&cronJob, missedRun)
if err != nil {
log.Error(err, "unable to construct job from template")
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}
// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
return ctrl.Result{}, err
}
log.V(1).Info("created Job for CronJob run", "job", job)
7. 실행 중인 job 이 보이거나 (자동으로 수행됨) 다음 예약된 실행 시간이 되면 Requeue 한다.
// we'll requeue once we see the running job, and update our status
return scheduledResult, nil
}
Setup
var (
jobOwnerKey = ".metadata.controller"
apiGVStr = batchv1.GroupVersion.String()
)
// SetupWithManager sets up the controller with the Manager.
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
// set up a real clock, since we're not in a test
if r.Clock == nil {
r.Clock = realClock{}
}
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj client.Object) []string {
// grab the job object, extract the owner...
job := rawObj.(*kbatch.Job)
owner := metav1.GetControllerOf(job)
if owner == nil {
return nil
}
// ...make sure it's a CronJob...
if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
return nil
}
// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.CronJob{}).
Owns(&kbatch.Job{}).
Complete(r)
}
Webhook 생성
$ kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
api/v1/cronjob_webhook.go
파일이 생성된다. 해당 파일에 체크 로직을 추가한다.
// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *CronJob) Default() {
cronjoblog.Info("default", "name", r.Name)
if r.Spec.ConcurrencyPolicy == "" {
r.Spec.ConcurrencyPolicy = AllowConcurrent
}
if r.Spec.Suspend == nil {
r.Spec.Suspend = new(bool)
}
if r.Spec.SuccessfulJobsHistoryLimit == nil {
r.Spec.SuccessfulJobsHistoryLimit = new(int32)
*r.Spec.SuccessfulJobsHistoryLimit = 3
}
if r.Spec.FailedJobsHistoryLimit == nil {
r.Spec.FailedJobsHistoryLimit = new(int32)
*r.Spec.FailedJobsHistoryLimit = 1
}
}
var _ webhook.Validator = &CronJob{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateCreate() error {
cronjoblog.Info("validate create", "name", r.Name)
return r.validateCronJob()
}
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateUpdate(old runtime.Object) error {
cronjoblog.Info("validate update", "name", r.Name)
return r.validateCronJob()
}
// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *CronJob) ValidateDelete() error {
cronjoblog.Info("validate delete", "name", r.Name)
// TODO(user): fill in your validation logic upon object deletion.
return nil
}
func (r *CronJob) validateCronJob() error {
var allErrs field.ErrorList
if err := r.validateCronJobName(); err != nil {
allErrs = append(allErrs, err)
}
if err := r.validateCronJobSpec(); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) == 0 {
return nil
}
return apierrors.NewInvalid(
schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"},
r.Name, allErrs)
}
func (r *CronJob) validateCronJobSpec() *field.Error {
// The field helpers from the kubernetes API machinery help us return nicely
// structured validation errors.
return validateScheduleFormat(
r.Spec.Schedule,
field.NewPath("spec").Child("schedule"))
}
func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error {
if _, err := cron.ParseStandard(schedule); err != nil {
return field.Invalid(fldPath, schedule, err.Error())
}
return nil
}
func (r *CronJob) validateCronJobName() *field.Error {
if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 {
// The job name length is 63 character like all Kubernetes objects
// (which must fit in a DNS subdomain). The cronjob controller appends
// a 11-character suffix to the cronjob (`-$TIMESTAMP`) when creating
// a job. The job name length limit is 63 characters. Therefore cronjob
// names must have length <= 63-11=52. If we don't validate this here,
// then job creation will fail later.
return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters")
}
return nil
}
Controller 배포 및 실행
CR 과 CRD yaml 을 만드는 명령어를 수행한다.
$ make manifests
CRD 를 배포한다.
$ make install
WebHook 를 로컬에서 다른 터미널로 실행한다.
$ export ENABLE_WEBHOOKS=false
$ make run
config/samples/batch_v1_cronjob.yaml
파일에 값을 추가한다.
apiVersion: batch.tutorial.kubebuilder.io/v1
kind: CronJob
metadata:
labels:
app.kubernetes.io/name: cronjob
app.kubernetes.io/instance: cronjob-sample
app.kubernetes.io/part-of: cronjob-kubebuilder
app.kubernetes.io/managed-by: kustomize
app.kubernetes.io/created-by: cronjob-kubebuilder
name: cronjob-sample
spec:
schedule: "*/1 * * * *"
startingDeadlineSeconds: 60
concurrencyPolicy: Allow # explicitly specify, but Allow is also default.
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: busybox
args:
- /bin/sh
- -c
- date; echo Hello from the Kubernetes cluster
restartPolicy: OnFailure
Reference site
- https://book.kubebuilder.io/cronjob-tutorial/
- cronJob webhook source:
https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/book/src/cronjob-tutorial/testdata/project/api/v1/cronjob_webhook.go - Writing a Kubernetes Operator from Scratch Using Kubebuilder:
https://www.youtube.com/watch?v=LLVoyXjYlYM&list=PL8pIiPkgexmmHAppEre9eHAiYqLxGFbOY&index=7 - PVC Operator Sample Source:
https://github.com/civo/operator-demo/blob/main/controllers/demovolume_controller.go - elastic cloud-on-k8s source:
https://github.com/elastic/cloud-on-k8s/blob/main/pkg/apis/elasticsearch/v1/elasticsearch_types.go
https://github.com/elastic/cloud-on-k8s/blob/main/pkg/controller/elasticsearch/elasticsearch_controller.go - Develop on Kubernetes Series — Operator Dev — Understanding and Dissecting Kubebuilder:
https://yash-kukreja-98.medium.com/develop-on-kubernetes-series-operator-dev-understanding-and-dissecting-kubebuilder-4321d3ecd7d6 - Learning Concurrent Reconciling:
https://openkruise.io/blog/learning-concurrent-reconciling/ - Operator Pattern: https://kubernetes.io/docs/concepts/extend-kubernetes/operator/
- Best practices for building Kubernetes Operators and stateful apps:
https://cloud.google.com/blog/products/containers-kubernetes/best-practices-for-building-kubernetes-operators-and-stateful-apps - Kubernetes Controllers at Scale: Clients, Caches, Conflicts, Patches Explained:
- https://medium.com/@timebertt/kubernetes-controllers-at-scale-clients-caches-conflicts-patches-explained-aa0f7a8b4332
- Kubernetes API guidelines: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md
- Golang controller-runtime: https://pkg.go.dev/sigs.k8s.io/controller-runtime
- Extend the Kubernetes API with CustomResourceDefinitions: https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/