Skip to content

Commit 5bdea47

Browse files
frzifusdjaglowski
authored andcommitted
[receiver/jaegerreceiver] Remove jaeger remote sampling from receiver (open-telemetry#14163)
Signed-off-by: Benedikt Bongartz <[email protected]>
1 parent 1bb29b8 commit 5bdea47

7 files changed

+41
-347
lines changed

receiver/jaegerreceiver/factory.go

+4-38
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@ package jaegerreceiver // import "github.com/open-telemetry/opentelemetry-collec
1818

1919
import (
2020
"context"
21-
"sync"
2221

2322
"go.opentelemetry.io/collector/component"
2423
"go.opentelemetry.io/collector/config"
2524
"go.opentelemetry.io/collector/config/configgrpc"
2625
"go.opentelemetry.io/collector/config/confighttp"
2726
"go.opentelemetry.io/collector/config/confignet"
2827
"go.opentelemetry.io/collector/consumer"
29-
"go.uber.org/zap"
3028
)
3129

3230
const (
@@ -40,11 +38,10 @@ const (
4038
protoThriftCompact = "thrift_compact"
4139

4240
// Default endpoints to bind to.
43-
defaultGRPCBindEndpoint = "0.0.0.0:14250"
44-
defaultHTTPBindEndpoint = "0.0.0.0:14268"
45-
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
46-
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
47-
defaultAgentRemoteSamplingHTTPEndpoint = "0.0.0.0:5778"
41+
defaultGRPCBindEndpoint = "0.0.0.0:14250"
42+
defaultHTTPBindEndpoint = "0.0.0.0:14268"
43+
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
44+
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
4845
)
4946

5047
// NewFactory creates a new Jaeger receiver factory.
@@ -94,7 +91,6 @@ func createTracesReceiver(
9491
// Error handling for the conversion is done in the Validate function from the Config object itself.
9592

9693
rCfg := cfg.(*Config)
97-
remoteSamplingConfig := rCfg.RemoteSampling
9894

9995
var config configuration
10096
// Set ports
@@ -114,36 +110,6 @@ func createTracesReceiver(
114110
config.AgentCompactThrift = *rCfg.ThriftCompact
115111
}
116112

117-
if remoteSamplingConfig != nil {
118-
logSamplingDeprecation(set.Logger)
119-
120-
config.RemoteSamplingClientSettings = remoteSamplingConfig.GRPCClientSettings
121-
if config.RemoteSamplingClientSettings.Endpoint == "" {
122-
config.RemoteSamplingClientSettings.Endpoint = defaultGRPCBindEndpoint
123-
}
124-
125-
config.AgentHTTPEndpoint = remoteSamplingConfig.HostEndpoint
126-
if config.AgentHTTPEndpoint == "" {
127-
config.AgentHTTPEndpoint = defaultAgentRemoteSamplingHTTPEndpoint
128-
}
129-
130-
// strategies are served over grpc so if grpc is not enabled and strategies are present return an error
131-
if len(remoteSamplingConfig.StrategyFile) != 0 {
132-
config.RemoteSamplingStrategyFile = remoteSamplingConfig.StrategyFile
133-
config.RemoteSamplingStrategyFileReloadInterval = remoteSamplingConfig.StrategyFileReloadInterval
134-
}
135-
}
136-
137113
// Create the receiver.
138114
return newJaegerReceiver(rCfg.ID(), &config, nextConsumer, set), nil
139115
}
140-
141-
var once sync.Once
142-
143-
func logSamplingDeprecation(logger *zap.Logger) {
144-
once.Do(func() {
145-
logger.Warn(
146-
"Jaeger remote sampling support is deprecated and will be removed in release v0.61.0. Use the jaegerremotesampling extension instead.",
147-
)
148-
})
149-
}

receiver/jaegerreceiver/factory_test.go

-69
Original file line numberDiff line numberDiff line change
@@ -187,72 +187,3 @@ func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
187187
assert.NoError(t, err, "unexpected error creating receiver")
188188
assert.Equal(t, defaultThriftCompactBindEndpoint, r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default")
189189
}
190-
191-
func TestDefaultAgentRemoteSamplingEndpointAndPort(t *testing.T) {
192-
factory := NewFactory()
193-
cfg := factory.CreateDefaultConfig()
194-
rCfg := cfg.(*Config)
195-
196-
rCfg.Protocols.ThriftCompact = &ProtocolUDP{
197-
Endpoint: defaultThriftCompactBindEndpoint,
198-
}
199-
rCfg.RemoteSampling = &RemoteSamplingConfig{}
200-
set := componenttest.NewNopReceiverCreateSettings()
201-
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
202-
203-
assert.NoError(t, err, "create trace receiver should not error")
204-
assert.Equal(t, defaultGRPCBindEndpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
205-
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
206-
}
207-
208-
func TestAgentRemoteSamplingEndpoint(t *testing.T) {
209-
factory := NewFactory()
210-
cfg := factory.CreateDefaultConfig()
211-
rCfg := cfg.(*Config)
212-
213-
endpoint := "localhost:1234"
214-
rCfg.Protocols.ThriftCompact = &ProtocolUDP{
215-
Endpoint: defaultThriftCompactBindEndpoint,
216-
}
217-
rCfg.RemoteSampling = &RemoteSamplingConfig{
218-
GRPCClientSettings: configgrpc.GRPCClientSettings{
219-
Endpoint: endpoint,
220-
},
221-
}
222-
set := componenttest.NewNopReceiverCreateSettings()
223-
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
224-
225-
assert.NoError(t, err, "create trace receiver should not error")
226-
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
227-
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
228-
}
229-
230-
func TestRemoteSamplingConfigPropagation(t *testing.T) {
231-
factory := NewFactory()
232-
cfg := factory.CreateDefaultConfig()
233-
rCfg := cfg.(*Config)
234-
235-
hostEndpoint := "localhost:5778"
236-
endpoint := "localhost:1234"
237-
strategyFile := "strategies.json"
238-
rCfg.Protocols.GRPC = &configgrpc.GRPCServerSettings{
239-
NetAddr: confignet.NetAddr{
240-
Endpoint: defaultGRPCBindEndpoint,
241-
Transport: "tcp",
242-
},
243-
}
244-
rCfg.RemoteSampling = &RemoteSamplingConfig{
245-
GRPCClientSettings: configgrpc.GRPCClientSettings{
246-
Endpoint: endpoint,
247-
},
248-
HostEndpoint: hostEndpoint,
249-
StrategyFile: strategyFile,
250-
}
251-
set := componenttest.NewNopReceiverCreateSettings()
252-
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
253-
254-
assert.NoError(t, err, "create trace receiver should not error")
255-
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
256-
assert.Equal(t, hostEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be configured value")
257-
assert.Equal(t, strategyFile, r.(*jReceiver).config.RemoteSamplingStrategyFile)
258-
}

receiver/jaegerreceiver/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ require (
1313
go.opentelemetry.io/collector/pdata v0.60.0
1414
go.opentelemetry.io/collector/semconv v0.60.0
1515
go.uber.org/multierr v1.8.0
16-
go.uber.org/zap v1.23.0
1716
google.golang.org/grpc v1.49.0
1817
)
1918

@@ -72,6 +71,7 @@ require (
7271
go.opentelemetry.io/otel/metric v0.31.0 // indirect
7372
go.opentelemetry.io/otel/trace v1.9.0 // indirect
7473
go.uber.org/atomic v1.10.0 // indirect
74+
go.uber.org/zap v1.23.0 // indirect
7575
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
7676
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
7777
golang.org/x/text v0.3.7 // indirect

receiver/jaegerreceiver/jaeger_agent_test.go

+4-22
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ import (
3333
"github.com/stretchr/testify/require"
3434
"go.opentelemetry.io/collector/component/componenttest"
3535
"go.opentelemetry.io/collector/config"
36-
"go.opentelemetry.io/collector/config/configgrpc"
37-
"go.opentelemetry.io/collector/config/configtls"
3836
"go.opentelemetry.io/collector/consumer/consumertest"
3937
"go.opentelemetry.io/collector/pdata/ptrace"
4038
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
@@ -140,20 +138,14 @@ func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.Samplin
140138
}
141139

142140
func TestJaegerHTTP(t *testing.T) {
143-
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
141+
s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
144142
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
145143
})
146144
defer s.GracefulStop()
147145

148146
endpoint := testutil.GetAvailableLocalAddress(t)
149147
config := &configuration{
150148
AgentHTTPEndpoint: endpoint,
151-
RemoteSamplingClientSettings: configgrpc.GRPCClientSettings{
152-
Endpoint: addr.String(),
153-
TLSSetting: configtls.TLSClientSetting{
154-
Insecure: true,
155-
},
156-
},
157149
}
158150
set := componenttest.NewNopReceiverCreateSettings()
159151
jr := newJaegerReceiver(jaegerAgent, config, nil, set)
@@ -174,20 +166,10 @@ func TestJaegerHTTP(t *testing.T) {
174166
resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
175167
assert.NoError(t, err, "should not have failed to make request")
176168
if resp != nil {
177-
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
178-
}
179-
180-
resp, err = http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
181-
assert.NoError(t, err, "should not have failed to make request")
182-
if resp != nil {
183-
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
184-
}
185-
186-
resp, err = http.Get(fmt.Sprintf("http://%s/baggageRestrictions?service=test", endpoint))
187-
assert.NoError(t, err, "should not have failed to make request")
188-
if resp != nil {
189-
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
169+
assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
170+
return
190171
}
172+
t.Fail()
191173
}
192174

193175
func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {

receiver/jaegerreceiver/trace_receiver.go

+19-58
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,17 @@ import (
2323
"mime"
2424
"net/http"
2525
"sync"
26-
"time"
2726

2827
apacheThrift "github.com/apache/thrift/lib/go/thrift"
2928
"github.com/gorilla/mux"
3029
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
31-
jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
3230
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
3331
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
3432
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
3533
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
3634
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
37-
collectorSampling "github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
3835
"github.com/jaegertracing/jaeger/model"
3936
"github.com/jaegertracing/jaeger/pkg/metrics"
40-
staticStrategyStore "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
4137
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
4238
"github.com/jaegertracing/jaeger/thrift-gen/agent"
4339
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
@@ -51,7 +47,6 @@ import (
5147
"go.opentelemetry.io/collector/consumer"
5248
"go.opentelemetry.io/collector/obsreport"
5349
"go.uber.org/multierr"
54-
"go.uber.org/zap"
5550
"google.golang.org/grpc"
5651

5752
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
@@ -63,12 +58,9 @@ type configuration struct {
6358
CollectorHTTPSettings confighttp.HTTPServerSettings
6459
CollectorGRPCServerSettings configgrpc.GRPCServerSettings
6560

66-
AgentCompactThrift ProtocolUDP
67-
AgentBinaryThrift ProtocolUDP
68-
AgentHTTPEndpoint string
69-
RemoteSamplingClientSettings configgrpc.GRPCClientSettings
70-
RemoteSamplingStrategyFile string
71-
RemoteSamplingStrategyFileReloadInterval time.Duration
61+
AgentCompactThrift ProtocolUDP
62+
AgentBinaryThrift ProtocolUDP
63+
AgentHTTPEndpoint string
7264
}
7365

7466
// Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
@@ -82,9 +74,8 @@ type jReceiver struct {
8274
grpc *grpc.Server
8375
collectorServer *http.Server
8476

85-
agentSamplingManager *jSamplingConfig.SamplingManager
86-
agentProcessors []processors.Processor
87-
agentServer *http.Server
77+
agentProcessors []processors.Processor
78+
agentServer *http.Server
8879

8980
goroutines sync.WaitGroup
9081

@@ -183,7 +174,19 @@ func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.T
183174

184175
var _ agent.Agent = (*agentHandler)(nil)
185176
var _ api_v2.CollectorServiceServer = (*jReceiver)(nil)
186-
var _ configmanager.ClientConfigManager = (*jReceiver)(nil)
177+
var _ configmanager.ClientConfigManager = (*notImplementedConfigManager)(nil)
178+
179+
var errNotImplemented = fmt.Errorf("not implemented")
180+
181+
type notImplementedConfigManager struct{}
182+
183+
func (notImplementedConfigManager) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
184+
return nil, errNotImplemented
185+
}
186+
187+
func (notImplementedConfigManager) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
188+
return nil, errNotImplemented
189+
}
187190

188191
type agentHandler struct {
189192
nextConsumer consumer.Traces
@@ -204,21 +207,6 @@ func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error
204207
return err
205208
}
206209

207-
func (jr *jReceiver) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
208-
return jr.agentSamplingManager.GetSamplingStrategy(ctx, serviceName)
209-
}
210-
211-
func (jr *jReceiver) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
212-
br, err := jr.agentSamplingManager.GetBaggageRestrictions(ctx, serviceName)
213-
if err != nil {
214-
// Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373
215-
// As of today, GetBaggageRestrictions() always returns an error.
216-
// However, we `return nil, nil` here in order to serve a valid `200 OK` response.
217-
return nil, nil
218-
}
219-
return br, nil
220-
}
221-
222210
func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
223211
ctx = jr.grpcObsrecv.StartTracesOp(ctx)
224212

@@ -283,25 +271,8 @@ func (jr *jReceiver) startAgent(host component.Host) error {
283271
}(processor)
284272
}
285273

286-
// Start upstream grpc client before serving sampling endpoints over HTTP
287-
if jr.config.RemoteSamplingClientSettings.Endpoint != "" {
288-
grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions(host, jr.settings.TelemetrySettings)
289-
if err != nil {
290-
jr.settings.Logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err))
291-
return err
292-
}
293-
jr.config.RemoteSamplingClientSettings.SanitizedEndpoint()
294-
conn, err := grpc.Dial(jr.config.RemoteSamplingClientSettings.Endpoint, grpcOpts...)
295-
if err != nil {
296-
jr.settings.Logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", jr.config.RemoteSamplingClientSettings.Endpoint))
297-
return err
298-
}
299-
300-
jr.agentSamplingManager = jSamplingConfig.NewConfigManager(conn)
301-
}
302-
303274
if jr.config.AgentHTTPEndpoint != "" {
304-
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, jr, metrics.NullFactory, jr.settings.Logger)
275+
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, &notImplementedConfigManager{}, metrics.NullFactory, jr.settings.Logger)
305276

306277
jr.goroutines.Add(1)
307278
go func() {
@@ -434,16 +405,6 @@ func (jr *jReceiver) startCollector(host component.Host) error {
434405

435406
api_v2.RegisterCollectorServiceServer(jr.grpc, jr)
436407

437-
// init and register sampling strategy store
438-
ss, err := staticStrategyStore.NewStrategyStore(staticStrategyStore.Options{
439-
StrategiesFile: jr.config.RemoteSamplingStrategyFile,
440-
ReloadInterval: jr.config.RemoteSamplingStrategyFileReloadInterval,
441-
}, jr.settings.Logger)
442-
if err != nil {
443-
return fmt.Errorf("failed to create collector strategy store: %w", err)
444-
}
445-
api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))
446-
447408
jr.goroutines.Add(1)
448409
go func() {
449410
defer jr.goroutines.Done()

0 commit comments

Comments
 (0)