-
Notifications
You must be signed in to change notification settings - Fork 1.2k
⚠️ [Warm Replicas] Implement warm replica support for controllers. #3192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8239300
73fc8fa
be1b1c2
c9b99eb
e7a2bbf
854987c
072ad4b
43118a3
b67bc65
fc7c8c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,16 @@ type Controller struct { | |
// Defaults to true, which means the controller will use leader election. | ||
NeedLeaderElection *bool | ||
|
||
// NeedWarmup specifies whether the controller should start its sources when the manager is not | ||
// the leader. This is useful for cases where sources take a long time to start, as it allows | ||
// for the controller to warm up its caches even before it is elected as the leader. This | ||
// improves leadership failover time, as the caches will be prepopulated before the controller | ||
// transitions to be leader. | ||
// | ||
// When set to true, the controller will start its sources without transitioning to be leader. | ||
// Defaults to false. | ||
NeedWarmup *bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also document what happens if NeedLeaderElection = false and NeedWarmup = true? If I see correctly the Warmup will be a no-op then? (because non-leader election runnables are started before we call warmup) |
||
|
||
// UsePriorityQueue configures the controllers queue to use the controller-runtime provided | ||
// priority queue. | ||
// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
/* Copyright 2018 The Kubernetes Authors. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should not be changed |
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
|
@@ -93,6 +92,16 @@ type TypedOptions[request comparable] struct { | |
// | ||
// Note: This flag is disabled by default until a future version. It's currently in beta. | ||
UsePriorityQueue *bool | ||
|
||
// NeedWarmup specifies whether the controller should start its sources when the manager is not | ||
// the leader. This is useful for cases where sources take a long time to start, as it allows | ||
// for the controller to warm up its caches even before it is elected as the leader. This | ||
// improves leadership failover time, as the caches will be prepopulated before the controller | ||
// transitions to be leader. | ||
// | ||
// When set to true, the controller will start its sources without transitioning to be leader. | ||
// Defaults to false. | ||
NeedWarmup *bool | ||
} | ||
|
||
// DefaultFromConfig defaults the config from a config.Controller | ||
|
@@ -124,6 +133,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller | |
if options.NeedLeaderElection == nil { | ||
options.NeedLeaderElection = config.NeedLeaderElection | ||
} | ||
|
||
if options.NeedWarmup == nil { | ||
options.NeedWarmup = config.NeedWarmup | ||
} | ||
} | ||
|
||
// Controller implements an API. A Controller manages a work queue fed reconcile.Requests | ||
|
@@ -253,6 +266,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req | |
LogConstructor: options.LogConstructor, | ||
RecoverPanic: options.RecoverPanic, | ||
LeaderElected: options.NeedLeaderElection, | ||
NeedWarmup: options.NeedWarmup, | ||
}, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -474,5 +474,75 @@ var _ = Describe("controller.Controller", func() { | |||||
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) | ||||||
Expect(ok).To(BeFalse()) | ||||||
}) | ||||||
|
||||||
It("should set ShouldWarmupWithoutLeadership correctly", func() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit, same below (also in a few comments) |
||||||
m, err := manager.New(cfg, manager.Options{}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
// Test with ShouldWarmupWithoutLeadership set to true | ||||||
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ | ||||||
Reconciler: reconcile.Func(nil), | ||||||
NeedWarmup: ptr.To(true), | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request]) | ||||||
Expect(ok).To(BeTrue()) | ||||||
Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil()) | ||||||
Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue()) | ||||||
Comment on lines
+491
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can achieve this more succinctly with |
||||||
|
||||||
// Test with ShouldWarmupWithoutLeadership set to false | ||||||
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ | ||||||
Reconciler: reconcile.Func(nil), | ||||||
NeedWarmup: ptr.To(false), | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request]) | ||||||
Expect(ok).To(BeTrue()) | ||||||
Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil()) | ||||||
Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse()) | ||||||
|
||||||
// Test with ShouldWarmupWithoutLeadership not set (should default to nil) | ||||||
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{ | ||||||
Reconciler: reconcile.Func(nil), | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request]) | ||||||
Expect(ok).To(BeTrue()) | ||||||
Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil()) | ||||||
}) | ||||||
|
||||||
It("should inherit ShouldWarmupWithoutLeadership from manager config", func() { | ||||||
// Test with manager default setting ShouldWarmupWithoutLeadership to true | ||||||
managerWithWarmup, err := manager.New(cfg, manager.Options{ | ||||||
Controller: config.Controller{ | ||||||
NeedWarmup: ptr.To(true), | ||||||
}, | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{ | ||||||
Reconciler: reconcile.Func(nil), | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request]) | ||||||
Expect(ok).To(BeTrue()) | ||||||
Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil()) | ||||||
Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue()) | ||||||
|
||||||
// Test that explicit controller setting overrides manager setting | ||||||
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ | ||||||
Reconciler: reconcile.Func(nil), | ||||||
NeedWarmup: ptr.To(false), | ||||||
}) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
|
||||||
internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request]) | ||||||
Expect(ok).To(BeTrue()) | ||||||
Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil()) | ||||||
Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse()) | ||||||
}) | ||||||
}) | ||||||
}) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -29,7 +29,9 @@ import ( | |||||
"k8s.io/apimachinery/pkg/types" | ||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||||
"k8s.io/apimachinery/pkg/util/uuid" | ||||||
"k8s.io/apimachinery/pkg/util/wait" | ||||||
"k8s.io/client-go/util/workqueue" | ||||||
"k8s.io/utils/ptr" | ||||||
|
||||||
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" | ||||||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" | ||||||
|
@@ -38,6 +40,11 @@ import ( | |||||
"sigs.k8s.io/controller-runtime/pkg/source" | ||||||
) | ||||||
|
||||||
const ( | ||||||
// syncedPollPeriod is the period to poll for cache sync | ||||||
syncedPollPeriod = 100 * time.Millisecond | ||||||
) | ||||||
|
||||||
// Controller implements controller.Controller. | ||||||
type Controller[request comparable] struct { | ||||||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. | ||||||
|
@@ -83,6 +90,16 @@ type Controller[request comparable] struct { | |||||
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. | ||||||
startWatches []source.TypedSource[request] | ||||||
|
||||||
// didStartEventSources is used to indicate whether the event sources have been started. | ||||||
didStartEventSources atomic.Bool | ||||||
|
||||||
// didEventSourcesFinishSync is used to indicate whether the event sources have finished | ||||||
// successfully. It stores a *bool where | ||||||
// - nil: not finished syncing | ||||||
// - true: finished syncing without error | ||||||
// - false: finished syncing with error | ||||||
didEventSourcesFinishSync atomic.Value | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: Would this be slightly better? (I think it might fit better to the values) |
||||||
|
||||||
// LogConstructor is used to construct a logger to then log messages to users during reconciliation, | ||||||
// or for example when a watch is started. | ||||||
// Note: LogConstructor has to be able to handle nil requests as we are also using it | ||||||
|
@@ -95,6 +112,12 @@ type Controller[request comparable] struct { | |||||
|
||||||
// LeaderElected indicates whether the controller is leader elected or always running. | ||||||
LeaderElected *bool | ||||||
|
||||||
// NeedWarmup specifies whether the controller should start its sources | ||||||
// when the manager is not the leader. | ||||||
// Defaults to false, which means that the controller will wait for leader election to start | ||||||
// before starting sources. | ||||||
NeedWarmup *bool | ||||||
} | ||||||
|
||||||
// Reconcile implements reconcile.Reconciler. | ||||||
|
@@ -144,6 +167,38 @@ func (c *Controller[request]) NeedLeaderElection() bool { | |||||
return *c.LeaderElected | ||||||
} | ||||||
|
||||||
// Warmup implements the manager.WarmupRunnable interface. | ||||||
func (c *Controller[request]) Warmup(ctx context.Context) error { | ||||||
if c.NeedWarmup == nil || !*c.NeedWarmup { | ||||||
return nil | ||||||
} | ||||||
return c.startEventSources(ctx) | ||||||
} | ||||||
|
||||||
// DidFinishWarmup implements the manager.WarmupRunnable interface. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does not match the method name on the interface |
||||||
func (c *Controller[request]) DidFinishWarmup(ctx context.Context) bool { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably dead code, but should we handle the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. Maybe we have to. Controller will always be added to the Warmup runnable group. If Warmup is a no-op, DidFinishWarmup will never return, right? Which means the manager will get stuck when starting the warmup runnable group There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we need some way to short-circuit this code if the warmup is not enabled |
||||||
err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (bool, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this func currently return when the context is cancelled? kind.WaitForSync is returning nil, should this func here do the same? |
||||||
didFinishSync, ok := c.didEventSourcesFinishSync.Load().(*bool) | ||||||
if !ok { | ||||||
return false, errors.New("unexpected error: didEventSourcesFinishSync is not a bool pointer") | ||||||
} | ||||||
|
||||||
if didFinishSync == nil { | ||||||
// event sources not finished syncing | ||||||
return false, nil | ||||||
} | ||||||
|
||||||
if !*didFinishSync { | ||||||
// event sources finished syncing with an error | ||||||
return true, errors.New("event sources did not finish syncing successfully") | ||||||
} | ||||||
|
||||||
return true, nil | ||||||
}) | ||||||
|
||||||
return err == nil | ||||||
} | ||||||
|
||||||
// Start implements controller.Controller. | ||||||
func (c *Controller[request]) Start(ctx context.Context) error { | ||||||
// use an IIFE to get proper lock handling | ||||||
|
@@ -221,13 +276,19 @@ func (c *Controller[request]) Start(ctx context.Context) error { | |||||
// startEventSources launches all the sources registered with this controller and waits | ||||||
// for them to sync. It returns an error if any of the sources fail to start or sync. | ||||||
func (c *Controller[request]) startEventSources(ctx context.Context) error { | ||||||
// CAS returns false if value is already true, so early exit since another goroutine must have | ||||||
// called startEventSources previously | ||||||
if !c.didStartEventSources.CompareAndSwap(false, true) { | ||||||
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return nil | ||||||
} | ||||||
|
||||||
errGroup := &errgroup.Group{} | ||||||
for _, watch := range c.startWatches { | ||||||
log := c.LogConstructor(nil) | ||||||
_, ok := watch.(interface { | ||||||
String() string | ||||||
}) | ||||||
|
||||||
if !ok { | ||||||
log = log.WithValues("source", fmt.Sprintf("%T", watch)) | ||||||
} else { | ||||||
|
@@ -274,7 +335,11 @@ func (c *Controller[request]) startEventSources(ctx context.Context) error { | |||||
} | ||||||
}) | ||||||
} | ||||||
return errGroup.Wait() | ||||||
err := errGroup.Wait() | ||||||
|
||||||
c.didEventSourcesFinishSync.Store(ptr.To(err == nil)) | ||||||
|
||||||
return err | ||||||
} | ||||||
|
||||||
// processNextWorkItem will read a single work item off the workqueue and | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
|
@@ -403,7 +404,7 @@ var _ = Describe("controller", func() { | |
return expectedErr | ||
}) | ||
|
||
// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned | ||
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned | ||
ctrl.CacheSyncTimeout = 5 * time.Second | ||
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} | ||
err := ctrl.startEventSources(ctx) | ||
|
@@ -1014,6 +1015,88 @@ var _ = Describe("controller", func() { | |
}) | ||
}) | ||
}) | ||
|
||
Describe("Warmup", func() { | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
JustBeforeEach(func() { | ||
ctrl.NeedWarmup = ptr.To(true) | ||
}) | ||
|
||
It("should track warmup status correctly with successful sync", func() { | ||
// Setup controller with sources that complete successfully | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
ctrl.CacheSyncTimeout = time.Second | ||
ctrl.startWatches = []source.TypedSource[reconcile.Request]{ | ||
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { | ||
return nil | ||
}), | ||
} | ||
|
||
err := ctrl.Warmup(ctx) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
// Verify DidFinishWarmup returns true for successful sync | ||
result := ctrl.DidFinishWarmup(ctx) | ||
Expect(result).To(BeTrue()) | ||
}) | ||
|
||
It("should track warmup status correctly with unsuccessful sync", func() { | ||
// Setup controller with sources that complete with error | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
ctrl.CacheSyncTimeout = time.Second | ||
ctrl.startWatches = []source.TypedSource[reconcile.Request]{ | ||
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { | ||
return errors.New("sync error") | ||
}), | ||
} | ||
|
||
err := ctrl.Warmup(ctx) | ||
Expect(err).To(HaveOccurred()) | ||
Expect(err.Error()).To(ContainSubstring("sync error")) | ||
|
||
// Verify DidFinishWarmup returns false for unsuccessful sync | ||
result := ctrl.DidFinishWarmup(ctx) | ||
Expect(result).To(BeFalse()) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a test case where the context is cancelled while the source start is still onging |
||
}) | ||
|
||
Describe("Warmup with warmup disabled", func() { | ||
JustBeforeEach(func() { | ||
ctrl.NeedWarmup = ptr.To(false) | ||
}) | ||
|
||
It("should not start sources when Warmup is called if warmup is disabled but start it when Start is called.", func() { | ||
// Setup controller with sources that complete successfully | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
ctrl.CacheSyncTimeout = time.Second | ||
var isSourceStarted atomic.Bool | ||
isSourceStarted.Store(false) | ||
ctrl.startWatches = []source.TypedSource[reconcile.Request]{ | ||
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error { | ||
isSourceStarted.Store(true) | ||
return nil | ||
}), | ||
} | ||
|
||
By("Calling Warmup when NeedWarmup is false") | ||
err := ctrl.Warmup(ctx) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(isSourceStarted.Load()).To(BeFalse()) | ||
|
||
By("Calling Start when NeedWarmup is false") | ||
// Now call Start | ||
go func() { | ||
defer GinkgoRecover() | ||
Expect(ctrl.Start(ctx)).To(Succeed()) | ||
}() | ||
Eventually(isSourceStarted.Load).Should(BeTrue()) | ||
}) | ||
}) | ||
}) | ||
|
||
var _ = Describe("ReconcileIDFromContext function", func() { | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { | |||||
return fmt.Errorf("failed to start other runnables: %w", err) | ||||||
} | ||||||
|
||||||
// Start and wait for sources to start. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil { | ||||||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return fmt.Errorf("failed to start warmup runnables: %w", err) | ||||||
} | ||||||
|
||||||
// Start the leader election and all required runnables. | ||||||
{ | ||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
|
@@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e | |||||
cm.runnables.LeaderElection.startOnce.Do(func() {}) | ||||||
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx) | ||||||
|
||||||
// Stop the warmup runnables | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be done after l.545? (and before we stop leaderelection runnables) |
||||||
cm.logger.Info("Stopping and waiting for warmup runnables") | ||||||
cm.runnables.Warmup.StopAndWait(cm.shutdownCtx) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to confirm my understanding. This will:
On the happy path where all Warmup funcs completed during controller startup this will do effectively nothing |
||||||
|
||||||
// Stop the caches before the leader election runnables, this is an important | ||||||
// step to make sure that we don't race with the reconcilers by receiving more events | ||||||
// from the API servers and enqueueing them. | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe?