Skip to content

Commit fe395c2

Browse files
authored
Merge branch 'main' into s3exporter-canned-acl
2 parents fa49b11 + 2c186cc commit fe395c2

13 files changed

+280
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support retrieval of available components via the OpAMP supervisor.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37247]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ For a list of open issues related to the Supervisor, see [these issues](https://
7777
| AcceptsRestartCommand | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077> |
7878
| ReportsHealth | ⚠️ |
7979
| ReportsRemoteConfig | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079> |
80+
| ReportsAvailableComponents | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37247> |
8081

8182
### Supervisor specification features
8283

cmd/opampsupervisor/e2e_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,104 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
598598
}, 5*time.Second, 250*time.Millisecond)
599599
}
600600

601+
func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) {
602+
agentDescription := atomic.Value{}
603+
availableComponents := atomic.Value{}
604+
605+
// Load the Supervisor config so we can get the location of
606+
// the Collector that will be run.
607+
var cfg config.Supervisor
608+
cfgFile := getSupervisorConfig(t, "reports_available_components", map[string]string{})
609+
k := koanf.New("::")
610+
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
611+
require.NoError(t, err)
612+
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
613+
Tag: "mapstructure",
614+
})
615+
require.NoError(t, err)
616+
617+
// Get the binary name and version from the Collector binary
618+
// using the `components` command that prints a YAML-encoded
619+
// map of information about the Collector build. Some of this
620+
// information will be used as defaults for the telemetry
621+
// attributes.
622+
agentPath := cfg.Agent.Executable
623+
componentsInfo, err := exec.Command(agentPath, "components").Output()
624+
require.NoError(t, err)
625+
k = koanf.New("::")
626+
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
627+
require.NoError(t, err)
628+
buildinfo := k.StringMap("buildinfo")
629+
command := buildinfo["command"]
630+
version := buildinfo["version"]
631+
632+
server := newOpAMPServer(
633+
t,
634+
defaultConnectingHandler,
635+
types.ConnectionCallbacks{
636+
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
637+
if message.AgentDescription != nil {
638+
agentDescription.Store(message.AgentDescription)
639+
}
640+
641+
response := &protobufs.ServerToAgent{}
642+
if message.AvailableComponents != nil {
643+
availableComponents.Store(message.AvailableComponents)
644+
645+
if message.GetAvailableComponents().GetComponents() == nil {
646+
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
647+
}
648+
}
649+
650+
return response
651+
},
652+
})
653+
654+
s := newSupervisor(t, "reports_available_components", map[string]string{"url": server.addr})
655+
656+
require.Nil(t, s.Start())
657+
defer s.Shutdown()
658+
659+
waitForSupervisorConnection(server.supervisorConnected, true)
660+
661+
require.Eventually(t, func() bool {
662+
ac, ok := availableComponents.Load().(*protobufs.AvailableComponents)
663+
if !ok {
664+
return false
665+
}
666+
667+
if ac.GetComponents() == nil {
668+
return false
669+
}
670+
671+
require.Len(t, ac.GetComponents(), 5) // connectors, exporters, extensions, processors, receivers
672+
require.NotNil(t, ac.GetComponents()["extensions"])
673+
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap())
674+
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap()["opamp"])
675+
676+
ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
677+
if !ok {
678+
return false
679+
}
680+
681+
var agentName, agentVersion string
682+
identAttr := ad.IdentifyingAttributes
683+
for _, attr := range identAttr {
684+
switch attr.Key {
685+
case semconv.AttributeServiceName:
686+
agentName = attr.Value.GetStringValue()
687+
case semconv.AttributeServiceVersion:
688+
agentVersion = attr.Value.GetStringValue()
689+
}
690+
}
691+
692+
// By default the Collector should report its name and version
693+
// from the component.BuildInfo struct built into the Collector
694+
// binary.
695+
return agentName == command && agentVersion == version
696+
}, 10*time.Second, 250*time.Millisecond)
697+
}
698+
601699
func TestSupervisorReportsEffectiveConfig(t *testing.T) {
602700
var agentConfig atomic.Value
603701
server := newOpAMPServer(

cmd/opampsupervisor/supervisor/config/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ type Capabilities struct {
9595
ReportsOwnMetrics bool `mapstructure:"reports_own_metrics"`
9696
ReportsHealth bool `mapstructure:"reports_health"`
9797
ReportsRemoteConfig bool `mapstructure:"reports_remote_config"`
98+
ReportsAvailableComponents bool `mapstructure:"reports_available_components"`
9899
}
99100

100101
func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
@@ -128,6 +129,10 @@ func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
128129
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
129130
}
130131

132+
if c.ReportsAvailableComponents {
133+
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
134+
}
135+
131136
return supportedCapabilities
132137
}
133138

@@ -245,6 +250,7 @@ func DefaultSupervisor() Supervisor {
245250
ReportsOwnMetrics: true,
246251
ReportsHealth: true,
247252
ReportsRemoteConfig: false,
253+
ReportsAvailableComponents: false,
248254
},
249255
Storage: Storage{
250256
Directory: defaultStorageDir,

cmd/opampsupervisor/supervisor/config/config_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
460460
ReportsOwnMetrics: true,
461461
ReportsHealth: true,
462462
ReportsRemoteConfig: true,
463+
ReportsAvailableComponents: true,
463464
},
464465
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
465466
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
@@ -468,7 +469,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
468469
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig |
469470
protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |
470471
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand |
471-
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
472+
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
473+
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
472474
},
473475
}
474476

cmd/opampsupervisor/supervisor/server.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
type flattenedSettings struct {
16-
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer)
16+
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent
1717
onConnecting func(request *http.Request) (shouldConnect bool, rejectStatusCode int)
1818
onConnectionClose func(conn serverTypes.Connection)
1919
endpoint string
@@ -54,7 +54,7 @@ func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connect
5454

5555
func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
5656
if fs.onMessage != nil {
57-
fs.onMessage(conn, message)
57+
return fs.onMessage(conn, message)
5858
}
5959

6060
return &protobufs.ServerToAgent{}

cmd/opampsupervisor/supervisor/server_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) {
6060
func Test_flattenedSettings_OnMessage(t *testing.T) {
6161
onMessageFuncCalled := false
6262
fs := flattenedSettings{
63-
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) {
63+
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
6464
onMessageFuncCalled = true
65+
return &protobufs.ServerToAgent{}
6566
},
6667
}
6768

cmd/opampsupervisor/supervisor/supervisor.go

+61-11
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ type Supervisor struct {
103103
// Supervisor's own config.
104104
config config.Supervisor
105105

106-
agentDescription *atomic.Value
106+
agentDescription *atomic.Value
107+
availableComponents *atomic.Value
107108

108109
// Supervisor's persistent state
109110
persistentState *persistentState
@@ -174,6 +175,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
174175
cfgState: &atomic.Value{},
175176
effectiveConfig: &atomic.Value{},
176177
agentDescription: &atomic.Value{},
178+
availableComponents: &atomic.Value{},
177179
doneChan: make(chan struct{}),
178180
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
179181
agentConn: &atomic.Value{},
@@ -300,16 +302,22 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
300302

301303
done := make(chan error, 1)
302304
var connected atomic.Bool
305+
var doneReported atomic.Bool
303306

304307
// Start a one-shot server to get the Collector's agent description
305-
// using the Collector's OpAMP extension.
308+
// and available components using the Collector's OpAMP extension.
306309
err = srv.Start(flattenedSettings{
307310
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
308311
onConnecting: func(_ *http.Request) (bool, int) {
309312
connected.Store(true)
310313
return true, http.StatusOK
311314
},
312-
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) {
315+
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
316+
response := &protobufs.ServerToAgent{}
317+
if message.GetAvailableComponents() != nil {
318+
s.setAvailableComponents(message.AvailableComponents)
319+
}
320+
313321
if message.AgentDescription != nil {
314322
instanceIDSeen := false
315323
s.setAgentDescription(message.AgentDescription)
@@ -324,19 +332,47 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
324332
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)",
325333
attr.Value.GetStringValue(),
326334
s.persistentState.InstanceID.String())
327-
return
335+
return response
328336
}
329337
instanceIDSeen = true
330338
}
331339
}
332340

333341
if !instanceIDSeen {
334342
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
335-
return
343+
return response
336344
}
345+
}
346+
347+
// agent description must be defined
348+
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
349+
if !ok {
350+
return response
351+
}
352+
353+
// if available components have not been reported, agent description is sufficient to continue
354+
availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
355+
if availableComponentsOk {
356+
// must have a full list of components if available components have been reported
357+
if availableComponents.GetComponents() != nil {
358+
if !doneReported.Load() {
359+
done <- nil
360+
doneReported.Store(true)
361+
}
362+
} else {
363+
// if we don't have a full component list, ask for it
364+
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
365+
}
366+
return response
367+
}
337368

369+
// need to only report done once, not on each message - otherwise, we get a hung thread
370+
if !doneReported.Load() {
338371
done <- nil
372+
doneReported.Store(true)
339373
}
374+
375+
return response
340376
},
341377
}.toServerSettings())
342378
if err != nil {
@@ -456,6 +492,12 @@ func (s *Supervisor) startOpAMPClient() error {
456492
return err
457493
}
458494

495+
if ac, ok := s.availableComponents.Load().(*protobufs.AvailableComponents); ok && ac != nil {
496+
if err = s.opampClient.SetAvailableComponents(ac); err != nil {
497+
return err
498+
}
499+
}
500+
459501
s.logger.Debug("Starting OpAMP client...")
460502
if err = s.opampClient.Start(context.Background(), settings); err != nil {
461503
return err
@@ -505,7 +547,7 @@ func (s *Supervisor) startOpAMPServer() error {
505547
return nil
506548
}
507549

508-
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) {
550+
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
509551
s.agentConn.Store(conn)
510552

511553
s.logger.Debug("Received OpAMP message from the agent")
@@ -551,6 +593,8 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
551593
s.logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
552594
s.lastHealthFromClient = message.Health
553595
}
596+
597+
return &protobufs.ServerToAgent{}
554598
}
555599

556600
func (s *Supervisor) forwardCustomMessagesToServerLoop() {
@@ -584,6 +628,11 @@ func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
584628
s.agentDescription.Store(ad)
585629
}
586630

631+
// setAvailableComponents sets the available components of the OpAMP agent
632+
func (s *Supervisor) setAvailableComponents(ac *protobufs.AvailableComponents) {
633+
s.availableComponents.Store(ac)
634+
}
635+
587636
// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
588637
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
589638
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
@@ -766,10 +815,11 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte {
766815

767816
var cfg bytes.Buffer
768817
tplVars := map[string]any{
769-
"InstanceUid": s.persistentState.InstanceID.String(),
770-
"SupervisorPort": s.opampServerPort,
771-
"PID": s.pidProvider.PID(),
772-
"PPIDPollInterval": orphanPollInterval,
818+
"InstanceUid": s.persistentState.InstanceID.String(),
819+
"SupervisorPort": s.opampServerPort,
820+
"PID": s.pidProvider.PID(),
821+
"PPIDPollInterval": orphanPollInterval,
822+
"ReportsAvailableComponents": s.config.Capabilities.ReportsAvailableComponents,
773823
}
774824
err := s.opampextensionTemplate.Execute(
775825
&cfg,
@@ -1020,7 +1070,7 @@ func (s *Supervisor) startAgent() (agentStartStatus, error) {
10201070
err := s.commander.Start(context.Background())
10211071
if err != nil {
10221072
s.logger.Error("Cannot start the agent", zap.Error(err))
1023-
startErr := fmt.Errorf("Cannot start the agent: %w", err)
1073+
startErr := fmt.Errorf("cannot start the agent: %w", err)
10241074
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: startErr.Error()})
10251075
if err != nil {
10261076
s.logger.Error("Failed to report OpAMP client health", zap.Error(err))

0 commit comments

Comments
 (0)