Skip to content

Commit bfc1f6e

Browse files
committed
Fix token passthrough for HEC
1 parent e8dee66 commit bfc1f6e

File tree

11 files changed

+212
-53
lines changed

11 files changed

+212
-53
lines changed

exporter/splunkhecexporter/client.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ func (c *client) pushMetricsData(
9090
req.Header.Set("Content-Encoding", "gzip")
9191
}
9292

93+
if md.ResourceMetrics().Len() != 0 {
94+
accessToken, found := md.ResourceMetrics().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
95+
if found {
96+
req.Header.Set("Authorization", splunk.HECTokenHeader+" "+accessToken.StringVal())
97+
}
98+
}
99+
93100
resp, err := c.client.Do(req)
94101
if err != nil {
95102
return err
@@ -136,6 +143,18 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
136143

137144
// Callback when each batch is to be sent.
138145
send := func(ctx context.Context, buf *bytes.Buffer, headers map[string]string) (err error) {
146+
localHeaders := headers
147+
if ld.ResourceLogs().Len() != 0 {
148+
accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
149+
if found {
150+
localHeaders = map[string]string{}
151+
for k, v := range headers {
152+
localHeaders[k] = v
153+
}
154+
localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.StringVal()
155+
}
156+
}
157+
139158
shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression
140159

141160
if shouldCompress {
@@ -150,10 +169,10 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
150169
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
151170
}
152171

153-
return c.postEvents(ctx, gzipBuffer, headers, shouldCompress)
172+
return c.postEvents(ctx, gzipBuffer, localHeaders, shouldCompress)
154173
}
155174

156-
return c.postEvents(ctx, buf, headers, shouldCompress)
175+
return c.postEvents(ctx, buf, localHeaders, shouldCompress)
157176
}
158177

159178
return c.pushLogDataInBatches(ctx, ld, send)

exporter/splunkhecexporter/factory.go

+37-2
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121

2222
"go.opentelemetry.io/collector/component"
2323
"go.opentelemetry.io/collector/config"
24+
"go.opentelemetry.io/collector/consumer"
2425
"go.opentelemetry.io/collector/exporter/exporterhelper"
2526
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
2627

2728
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
29+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"
2830
)
2931

3032
const (
@@ -34,6 +36,18 @@ const (
3436
defaultHTTPTimeout = 10 * time.Second
3537
)
3638

39+
// TODO: Find a place for this to be shared.
40+
type baseMetricsExporter struct {
41+
component.Component
42+
consumer.Metrics
43+
}
44+
45+
// TODO: Find a place for this to be shared.
46+
type baseLogsExporter struct {
47+
component.Component
48+
consumer.Logs
49+
}
50+
3751
// NewFactory creates a factory for Splunk HEC exporter.
3852
func NewFactory() component.ExporterFactory {
3953
return exporterhelper.NewFactory(
@@ -112,7 +126,7 @@ func createMetricsExporter(
112126
return nil, err
113127
}
114128

115-
return exporterhelper.NewMetricsExporter(
129+
exporter, err := exporterhelper.NewMetricsExporter(
116130
expCfg,
117131
set,
118132
exp.pushMetricsData,
@@ -122,6 +136,16 @@ func createMetricsExporter(
122136
exporterhelper.WithQueue(expCfg.QueueSettings),
123137
exporterhelper.WithStart(exp.start),
124138
exporterhelper.WithShutdown(exp.stop))
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
wrapped := &baseMetricsExporter{
144+
Component: exporter,
145+
Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.HecTokenLabel, exporter),
146+
}
147+
148+
return wrapped, nil
125149
}
126150

127151
func createLogsExporter(
@@ -140,7 +164,7 @@ func createLogsExporter(
140164
return nil, err
141165
}
142166

143-
return exporterhelper.NewLogsExporter(
167+
logsExporter, err := exporterhelper.NewLogsExporter(
144168
expCfg,
145169
set,
146170
exp.pushLogData,
@@ -150,4 +174,15 @@ func createLogsExporter(
150174
exporterhelper.WithQueue(expCfg.QueueSettings),
151175
exporterhelper.WithStart(exp.start),
152176
exporterhelper.WithShutdown(exp.stop))
177+
178+
if err != nil {
179+
return nil, err
180+
}
181+
182+
wrapped := &baseLogsExporter{
183+
Component: logsExporter,
184+
Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, logsExporter),
185+
}
186+
187+
return wrapped, nil
153188
}

exporter/splunkhecexporter/go.mod

+4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ go 1.17
55
require (
66
github.com/census-instrumentation/opencensus-proto v0.3.0
77
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
8+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0
10+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0
911
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.39.0
1012
github.com/stretchr/testify v1.7.0
1113
go.opentelemetry.io/collector v0.39.0
@@ -48,4 +50,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/splun
4850

4951
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus => ../../pkg/translator/opencensus
5052

53+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr => ../../pkg/batchperresourceattr
54+
5155
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

exporter/splunkhecexporter/logdata_to_splunk.go

+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C
8282
sourcetype = v.StringVal()
8383
case indexKey:
8484
index = v.StringVal()
85+
case splunk.HecTokenLabel:
86+
// ignore
8587
default:
8688
fields[k] = convertAttributeValue(v, logger)
8789
}
@@ -98,6 +100,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C
98100
sourcetype = v.StringVal()
99101
case indexKey:
100102
index = v.StringVal()
103+
case splunk.HecTokenLabel:
104+
// ignore
101105
default:
102106
fields[k] = convertAttributeValue(v, logger)
103107
}

exporter/splunkhecexporter/logdata_to_splunk_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,27 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
8585
"myhost", "myapp", "myapp-type"),
8686
},
8787
},
88+
{
89+
name: "with_hec_token",
90+
logRecordFn: func() pdata.LogRecord {
91+
logRecord := pdata.NewLogRecord()
92+
logRecord.Body().SetStringVal("mylog")
93+
logRecord.Attributes().InsertString(splunk.HecTokenLabel, "mytoken")
94+
logRecord.SetTimestamp(ts)
95+
return logRecord
96+
},
97+
logResourceFn: pdata.NewResource,
98+
configDataFn: func() *Config {
99+
config := createDefaultConfig().(*Config)
100+
config.Source = "source"
101+
config.SourceType = "sourcetype"
102+
return config
103+
},
104+
wantSplunkEvents: []*splunk.Event{
105+
commonLogSplunkEvent("mylog", ts, map[string]interface{}{},
106+
"unknown", "source", "sourcetype"),
107+
},
108+
},
88109
{
89110
name: "non-string attribute",
90111
logRecordFn: func() pdata.LogRecord {

exporter/splunkhecexporter/metricdata_to_splunk.go

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func metricDataToSplunk(logger *zap.Logger, data pdata.Metrics, config *Config)
8686
sourceType = v.StringVal()
8787
case indexKey:
8888
index = v.StringVal()
89+
case splunk.HecTokenLabel:
90+
// ignore
8991
default:
9092
commonFields[k] = v.AsString()
9193
}

exporter/splunkhecexporter/tracedata_to_splunk.go

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ func traceDataToSplunk(logger *zap.Logger, data pdata.Traces, config *Config) ([
8585
sourceType = v.StringVal()
8686
case indexKey:
8787
index = v.StringVal()
88+
case splunk.HecTokenLabel:
89+
// ignore
8890
default:
8991
commonFields[k] = v.AsString()
9092
}

receiver/splunkhecreceiver/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.39.0
88
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
99
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0
10+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0
1011
github.com/stretchr/testify v1.7.0
1112
go.opentelemetry.io/collector v0.39.0
1213
go.opentelemetry.io/collector/model v0.39.0

receiver/splunkhecreceiver/go.sum

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/splunkhecreceiver/receiver.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"io/ioutil"
2525
"net"
2626
"net/http"
27+
"strings"
2728
"sync"
2829
"time"
2930

@@ -367,10 +368,11 @@ func (r *splunkReceiver) consumeLogs(ctx context.Context, events []*splunk.Event
367368

368369
func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pdata.Resource) {
369370
if r.config.AccessTokenPassthrough {
370-
accessToken := req.Header.Get(splunk.HECTokenHeader)
371-
if accessToken != "" {
371+
accessToken := req.Header.Get("Authorization")
372+
if strings.HasPrefix(accessToken, splunk.HECTokenHeader+" ") {
373+
accessTokenValue := accessToken[len(splunk.HECTokenHeader)+1:]
372374
return func(resource pdata.Resource) {
373-
resource.Attributes().InsertString(splunk.HecTokenLabel, accessToken)
375+
resource.Attributes().InsertString(splunk.HecTokenLabel, accessTokenValue)
374376
}
375377
}
376378
}

0 commit comments

Comments
 (0)