Skip to content

⚠️ [Warm Replicas] Implement warm replica support for controllers. #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type Controller struct {
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// NeedWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// When set to true, the controller will start its sources without transitioning to be leader.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// When set to true, the controller will start its sources without transitioning to be leader.
// When set to true, the controller will start its sources without waiting to become leader.

Maybe?

// Defaults to false.
NeedWarmup *bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also document what happens if NeedLeaderElection = false and NeedWarmup = true?

If I see correctly the Warmup will be a no-op then? (because non-leader election runnables are started before we call warmup)


// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
Copyright 2018 The Kubernetes Authors.
/* Copyright 2018 The Kubernetes Authors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should not be changed

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,6 +92,16 @@ type TypedOptions[request comparable] struct {
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool

// NeedWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// When set to true, the controller will start its sources without transitioning to be leader.
// Defaults to false.
NeedWarmup *bool
}

// DefaultFromConfig defaults the config from a config.Controller
Expand Down Expand Up @@ -124,6 +133,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller
if options.NeedLeaderElection == nil {
options.NeedLeaderElection = config.NeedLeaderElection
}

if options.NeedWarmup == nil {
options.NeedWarmup = config.NeedWarmup
}
}

// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -253,6 +266,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
NeedWarmup: options.NeedWarmup,
}, nil
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,75 @@ var _ = Describe("controller.Controller", func() {
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})

It("should set ShouldWarmupWithoutLeadership correctly", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
It("should set ShouldWarmupWithoutLeadership correctly", func() {
It("should set NeedWarmup correctly", func() {

nit, same below (also in a few comments)

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

// Test with ShouldWarmupWithoutLeadership set to true
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(true),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue())
Comment on lines +491 to +492
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can achieve this more succinctly with HaveValue, which IIRC checks that a pointer is not nil and then passes the dereferenced value through to the next matcher


// Test with ShouldWarmupWithoutLeadership set to false
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse())

// Test with ShouldWarmupWithoutLeadership not set (should default to nil)
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil())
})

It("should inherit ShouldWarmupWithoutLeadership from manager config", func() {
// Test with manager default setting ShouldWarmupWithoutLeadership to true
managerWithWarmup, err := manager.New(cfg, manager.Options{
Controller: config.Controller{
NeedWarmup: ptr.To(true),
},
})
Expect(err).NotTo(HaveOccurred())
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue())

// Test that explicit controller setting overrides manager setting
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse())
})
})
})
69 changes: 67 additions & 2 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand All @@ -38,6 +40,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
// syncedPollPeriod is the period to poll for cache sync
syncedPollPeriod = 100 * time.Millisecond
)

// Controller implements controller.Controller.
type Controller[request comparable] struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Expand Down Expand Up @@ -83,6 +90,16 @@ type Controller[request comparable] struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []source.TypedSource[request]

// didStartEventSources is used to indicate whether the event sources have been started.
didStartEventSources atomic.Bool

// didEventSourcesFinishSync is used to indicate whether the event sources have finished
// successfully. It stores a *bool where
// - nil: not finished syncing
// - true: finished syncing without error
// - false: finished syncing with error
didEventSourcesFinishSync atomic.Value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
didEventSourcesFinishSync atomic.Value
didEventSourcesFinishSyncSuccessfully atomic.Value

nit: Would this be slightly better? (I think it might fit better to the values)


// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
Expand All @@ -95,6 +112,12 @@ type Controller[request comparable] struct {

// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected *bool

// NeedWarmup specifies whether the controller should start its sources
// when the manager is not the leader.
// Defaults to false, which means that the controller will wait for leader election to start
// before starting sources.
NeedWarmup *bool
}

// Reconcile implements reconcile.Reconciler.
Expand Down Expand Up @@ -144,6 +167,38 @@ func (c *Controller[request]) NeedLeaderElection() bool {
return *c.LeaderElected
}

// Warmup implements the manager.WarmupRunnable interface.
func (c *Controller[request]) Warmup(ctx context.Context) error {
if c.NeedWarmup == nil || !*c.NeedWarmup {
return nil
}
return c.startEventSources(ctx)
}

// DidFinishWarmup implements the manager.WarmupRunnable interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not match the method name on the interface

func (c *Controller[request]) DidFinishWarmup(ctx context.Context) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably dead code, but should we handle the if c.NeedWarmup == nil || !*c.NeedWarmup case here in some way specifically?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Maybe we have to.

Controller will always be added to the Warmup runnable group.

If Warmup is a no-op, DidFinishWarmup will never return, right?

Which means the manager will get stuck when starting the warmup runnable group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we need some way to short-circuit this code if the warmup is not enabled

err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this func currently return when the context is cancelled?

kind.WaitForSync is returning nil, should this func here do the same?

didFinishSync, ok := c.didEventSourcesFinishSync.Load().(*bool)
if !ok {
return false, errors.New("unexpected error: didEventSourcesFinishSync is not a bool pointer")
}

if didFinishSync == nil {
// event sources not finished syncing
return false, nil
}

if !*didFinishSync {
// event sources finished syncing with an error
return true, errors.New("event sources did not finish syncing successfully")
}

return true, nil
})

return err == nil
}

// Start implements controller.Controller.
func (c *Controller[request]) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
Expand Down Expand Up @@ -221,13 +276,19 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// startEventSources launches all the sources registered with this controller and waits
// for them to sync. It returns an error if any of the sources fail to start or sync.
func (c *Controller[request]) startEventSources(ctx context.Context) error {
// CAS returns false if value is already true, so early exit since another goroutine must have
// called startEventSources previously
if !c.didStartEventSources.CompareAndSwap(false, true) {
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started")
c.LogConstructor(nil).Info("Skipping starting event sources since they were already started")

return nil
}

errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})

if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
Expand Down Expand Up @@ -274,7 +335,11 @@ func (c *Controller[request]) startEventSources(ctx context.Context) error {
}
})
}
return errGroup.Wait()
err := errGroup.Wait()

c.didEventSourcesFinishSync.Store(ptr.To(err == nil))

return err
}

// processNextWorkItem will read a single work item off the workqueue and
Expand Down
85 changes: 84 additions & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -403,7 +404,7 @@ var _ = Describe("controller", func() {
return expectedErr
})

// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
ctrl.CacheSyncTimeout = 5 * time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
err := ctrl.startEventSources(ctx)
Expand Down Expand Up @@ -1014,6 +1015,88 @@ var _ = Describe("controller", func() {
})
})
})

Describe("Warmup", func() {
JustBeforeEach(func() {
ctrl.NeedWarmup = ptr.To(true)
})

It("should track warmup status correctly with successful sync", func() {
// Setup controller with sources that complete successfully
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
return nil
}),
}

err := ctrl.Warmup(ctx)
Expect(err).NotTo(HaveOccurred())

// Verify DidFinishWarmup returns true for successful sync
result := ctrl.DidFinishWarmup(ctx)
Expect(result).To(BeTrue())
})

It("should track warmup status correctly with unsuccessful sync", func() {
// Setup controller with sources that complete with error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
return errors.New("sync error")
}),
}

err := ctrl.Warmup(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sync error"))

// Verify DidFinishWarmup returns false for unsuccessful sync
result := ctrl.DidFinishWarmup(ctx)
Expect(result).To(BeFalse())
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test case where the context is cancelled while the source start is still onging

})

Describe("Warmup with warmup disabled", func() {
JustBeforeEach(func() {
ctrl.NeedWarmup = ptr.To(false)
})

It("should not start sources when Warmup is called if warmup is disabled but start it when Start is called.", func() {
// Setup controller with sources that complete successfully
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
var isSourceStarted atomic.Bool
isSourceStarted.Store(false)
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
isSourceStarted.Store(true)
return nil
}),
}

By("Calling Warmup when NeedWarmup is false")
err := ctrl.Warmup(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(isSourceStarted.Load()).To(BeFalse())

By("Calling Start when NeedWarmup is false")
// Now call Start
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).To(Succeed())
}()
Eventually(isSourceStarted.Load).Should(BeTrue())
})
})
})

var _ = Describe("ReconcileIDFromContext function", func() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
return fmt.Errorf("failed to start other runnables: %w", err)
}

// Start and wait for sources to start.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Start and wait for sources to start.
// Start WarmupRunnables and wait for warmup to complete.

if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil {
return fmt.Errorf("failed to start warmup runnables: %w", err)
}

// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.runnables.LeaderElection.startOnce.Do(func() {})
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)

// Stop the warmup runnables
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done after l.545? (and before we stop leaderelection runnables)

cm.logger.Info("Stopping and waiting for warmup runnables")
cm.runnables.Warmup.StopAndWait(cm.shutdownCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding. This will:

  • cancel the context that was used to call Warmup()
  • wait until all Warmup funcs returned

On the happy path where all Warmup funcs completed during controller startup this will do effectively nothing


// Stop the caches before the leader election runnables, this is an important
// step to make sure that we don't race with the reconcilers by receiving more events
// from the API servers and enqueueing them.
Expand Down
Loading
Loading