Skip to content

Commit 12a6fb3

Browse files
earwinsbylica-splunk
authored andcommitted
[exporter/clickhouseexporter] Sort attribute maps before insertion open-telemetry#33634 (open-telemetry#35725)
#### Description Our attributes are stored as Map(String, String) in CH. By default the order of keys is undefined and as described in open-telemetry#33634 leads to worse compression and duplicates in `group by` (unless carefully accounted for). This PR uses the `column.IterableOrderedMap` facility from clickhouse-go to ensure fixed attribute key order. It is a reimplementation of open-telemetry#34598 that uses less allocations and is (arguably) somewhat more straightforward. I'm **opening this as a draft**, because this PR (and open-telemetry#34598) are blocked by ClickHouse/clickhouse-go#1365 (fixed in ClickHouse/clickhouse-go#1418) In addition, I'm trying to add the implementation of `column.IterableOrderedMap` used to clickhouse-go upstream: ClickHouse/clickhouse-go#1417 If it is accepted, I will amend this PR accordingly. #### Link to tracking issue Fixes open-telemetry#33634 #### Testing The IOM implementation was used in production independently. I'm planning to build otelcollector with this PR and cut over my production to it in the next few of days.
1 parent ff49142 commit 12a6fb3

12 files changed

+101
-107
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: clickhouseexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Exporter now sorts attribute maps' keys during INSERT, yielding better compression and predictable aggregates"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33634]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/clickhouseexporter/exporter_logs.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ import (
1111

1212
_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
1313
"go.opentelemetry.io/collector/component"
14-
"go.opentelemetry.io/collector/pdata/pcommon"
1514
"go.opentelemetry.io/collector/pdata/plog"
1615
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1716
"go.uber.org/zap"
1817

18+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
2020
)
2121

@@ -76,7 +76,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
7676
logs := ld.ResourceLogs().At(i)
7777
res := logs.Resource()
7878
resURL := logs.SchemaUrl()
79-
resAttr := attributesToMap(res.Attributes())
79+
resAttr := internal.AttributesToMap(res.Attributes())
8080
var serviceName string
8181
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
8282
serviceName = v.Str()
@@ -87,7 +87,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
8787
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
8888
scopeName := logs.ScopeLogs().At(j).Scope().Name()
8989
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
90-
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
90+
scopeAttr := internal.AttributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
9191

9292
for k := 0; k < rs.Len(); k++ {
9393
r := rs.At(k)
@@ -97,7 +97,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
9797
timestamp = r.ObservedTimestamp()
9898
}
9999

100-
logAttr := attributesToMap(r.Attributes())
100+
logAttr := internal.AttributesToMap(r.Attributes())
101101
_, err = statement.ExecContext(ctx,
102102
timestamp.AsTime(),
103103
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
@@ -129,15 +129,6 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
129129
return err
130130
}
131131

132-
func attributesToMap(attributes pcommon.Map) map[string]string {
133-
m := make(map[string]string, attributes.Len())
134-
attributes.Range(func(k string, v pcommon.Value) bool {
135-
m[k] = v.AsString()
136-
return true
137-
})
138-
return m
139-
}
140-
141132
const (
142133
// language=ClickHouse SQL
143134
createLogsTableSQL = `

exporter/clickhouseexporter/exporter_logs_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/ClickHouse/clickhouse-go/v2/lib/column/orderedmap"
1516
"github.com/stretchr/testify/require"
1617
"go.opentelemetry.io/collector/pdata/pcommon"
1718
"go.opentelemetry.io/collector/pdata/plog"
@@ -93,9 +94,9 @@ func TestExporter_pushLogsData(t *testing.T) {
9394
initClickhouseTestServer(t, func(query string, values []driver.Value) error {
9495
if strings.HasPrefix(query, "INSERT") {
9596
require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8])
96-
require.Equal(t, map[string]string{
97+
require.Equal(t, orderedmap.FromMap(map[string]string{
9798
"service.name": "test-service",
98-
}, values[9])
99+
}), values[9])
99100
}
100101
return nil
101102
})
@@ -108,9 +109,9 @@ func TestExporter_pushLogsData(t *testing.T) {
108109
require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10])
109110
require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11])
110111
require.Equal(t, "1.0.0", values[12])
111-
require.Equal(t, map[string]string{
112+
require.Equal(t, orderedmap.FromMap(map[string]string{
112113
"lib": "clickhouse",
113-
}, values[13])
114+
}), values[13])
114115
}
115116
return nil
116117
})

exporter/clickhouseexporter/exporter_metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
7777
metricsMap := internal.NewMetricsModel(e.tablesConfig)
7878
for i := 0; i < md.ResourceMetrics().Len(); i++ {
7979
metrics := md.ResourceMetrics().At(i)
80-
resAttr := attributesToMap(metrics.Resource().Attributes())
80+
resAttr := metrics.Resource().Attributes()
8181
for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
8282
rs := metrics.ScopeMetrics().At(j).Metrics()
8383
scopeInstr := metrics.ScopeMetrics().At(j).Scope()

exporter/clickhouseexporter/exporter_traces.go

+12-24
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
"time"
1212

1313
_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
14+
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
1415
"go.opentelemetry.io/collector/component"
1516
"go.opentelemetry.io/collector/pdata/ptrace"
1617
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
1718
"go.uber.org/zap"
1819

20+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
1921
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
2022
)
2123

@@ -74,18 +76,15 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
7476
for i := 0; i < td.ResourceSpans().Len(); i++ {
7577
spans := td.ResourceSpans().At(i)
7678
res := spans.Resource()
77-
resAttr := attributesToMap(res.Attributes())
78-
var serviceName string
79-
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
80-
serviceName = v.Str()
81-
}
79+
resAttr := internal.AttributesToMap(res.Attributes())
80+
serviceName, _ := res.Attributes().Get(conventions.AttributeServiceName)
8281
for j := 0; j < spans.ScopeSpans().Len(); j++ {
8382
rs := spans.ScopeSpans().At(j).Spans()
8483
scopeName := spans.ScopeSpans().At(j).Scope().Name()
8584
scopeVersion := spans.ScopeSpans().At(j).Scope().Version()
8685
for k := 0; k < rs.Len(); k++ {
8786
r := rs.At(k)
88-
spanAttr := attributesToMap(r.Attributes())
87+
spanAttr := internal.AttributesToMap(r.Attributes())
8988
status := r.Status()
9089
eventTimes, eventNames, eventAttrs := convertEvents(r.Events())
9190
linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links())
@@ -97,7 +96,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
9796
r.TraceState().AsRaw(),
9897
r.Name(),
9998
r.Kind().String(),
100-
serviceName,
99+
serviceName.AsString(),
101100
resAttr,
102101
scopeName,
103102
scopeVersion,
@@ -127,36 +126,25 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
127126
return err
128127
}
129128

130-
func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) {
131-
var (
132-
times []time.Time
133-
names []string
134-
attrs []map[string]string
135-
)
129+
func convertEvents(events ptrace.SpanEventSlice) (times []time.Time, names []string, attrs []column.IterableOrderedMap) {
136130
for i := 0; i < events.Len(); i++ {
137131
event := events.At(i)
138132
times = append(times, event.Timestamp().AsTime())
139133
names = append(names, event.Name())
140-
attrs = append(attrs, attributesToMap(event.Attributes()))
134+
attrs = append(attrs, internal.AttributesToMap(event.Attributes()))
141135
}
142-
return times, names, attrs
136+
return
143137
}
144138

145-
func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) {
146-
var (
147-
traceIDs []string
148-
spanIDs []string
149-
states []string
150-
attrs []map[string]string
151-
)
139+
func convertLinks(links ptrace.SpanLinkSlice) (traceIDs []string, spanIDs []string, states []string, attrs []column.IterableOrderedMap) {
152140
for i := 0; i < links.Len(); i++ {
153141
link := links.At(i)
154142
traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID()))
155143
spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID()))
156144
states = append(states, link.TraceState().AsRaw())
157-
attrs = append(attrs, attributesToMap(link.Attributes()))
145+
attrs = append(attrs, internal.AttributesToMap(link.Attributes()))
158146
}
159-
return traceIDs, spanIDs, states, attrs
147+
return
160148
}
161149

162150
const (

exporter/clickhouseexporter/internal/exponential_histogram_metrics.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -130,27 +130,24 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error {
130130
}()
131131

132132
for _, model := range e.expHistogramModels {
133-
var serviceName string
134-
if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok {
135-
serviceName = v
136-
}
133+
serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName)
137134

138135
for i := 0; i < model.expHistogram.DataPoints().Len(); i++ {
139136
dp := model.expHistogram.DataPoints().At(i)
140137
attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars())
141138
_, err = statement.ExecContext(ctx,
142-
model.metadata.ResAttr,
139+
AttributesToMap(model.metadata.ResAttr),
143140
model.metadata.ResURL,
144141
model.metadata.ScopeInstr.Name(),
145142
model.metadata.ScopeInstr.Version(),
146-
attributesToMap(model.metadata.ScopeInstr.Attributes()),
143+
AttributesToMap(model.metadata.ScopeInstr.Attributes()),
147144
model.metadata.ScopeInstr.DroppedAttributesCount(),
148145
model.metadata.ScopeURL,
149-
serviceName,
146+
serviceName.AsString(),
150147
model.metricName,
151148
model.metricDescription,
152149
model.metricUnit,
153-
attributesToMap(dp.Attributes()),
150+
AttributesToMap(dp.Attributes()),
154151
dp.StartTimestamp().AsTime(),
155152
dp.Timestamp().AsTime(),
156153
dp.Count(),
@@ -190,7 +187,7 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error {
190187
return nil
191188
}
192189

193-
func (e *expHistogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
190+
func (e *expHistogramMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
194191
expHistogram, ok := metrics.(pmetric.ExponentialHistogram)
195192
if !ok {
196193
return fmt.Errorf("metrics param is not type of ExponentialHistogram")

exporter/clickhouseexporter/internal/gauge_metrics.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -109,27 +109,24 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error {
109109
}()
110110

111111
for _, model := range g.gaugeModels {
112-
var serviceName string
113-
if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok {
114-
serviceName = v
115-
}
112+
serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName)
116113

117114
for i := 0; i < model.gauge.DataPoints().Len(); i++ {
118115
dp := model.gauge.DataPoints().At(i)
119116
attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars())
120117
_, err = statement.ExecContext(ctx,
121-
model.metadata.ResAttr,
118+
AttributesToMap(model.metadata.ResAttr),
122119
model.metadata.ResURL,
123120
model.metadata.ScopeInstr.Name(),
124121
model.metadata.ScopeInstr.Version(),
125-
attributesToMap(model.metadata.ScopeInstr.Attributes()),
122+
AttributesToMap(model.metadata.ScopeInstr.Attributes()),
126123
model.metadata.ScopeInstr.DroppedAttributesCount(),
127124
model.metadata.ScopeURL,
128-
serviceName,
125+
serviceName.AsString(),
129126
model.metricName,
130127
model.metricDescription,
131128
model.metricUnit,
132-
attributesToMap(dp.Attributes()),
129+
AttributesToMap(dp.Attributes()),
133130
dp.StartTimestamp().AsTime(),
134131
dp.Timestamp().AsTime(),
135132
getValue(dp.IntValue(), dp.DoubleValue(), dp.ValueType()),
@@ -155,7 +152,7 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error {
155152
return nil
156153
}
157154

158-
func (g *gaugeMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
155+
func (g *gaugeMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
159156
gauge, ok := metrics.(pmetric.Gauge)
160157
if !ok {
161158
return fmt.Errorf("metrics param is not type of Gauge")

exporter/clickhouseexporter/internal/histogram_metrics.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -121,27 +121,24 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error {
121121
}()
122122

123123
for _, model := range h.histogramModel {
124-
var serviceName string
125-
if v, ok := model.metadata.ResAttr[conventions.AttributeServiceName]; ok {
126-
serviceName = v
127-
}
124+
serviceName, _ := model.metadata.ResAttr.Get(conventions.AttributeServiceName)
128125

129126
for i := 0; i < model.histogram.DataPoints().Len(); i++ {
130127
dp := model.histogram.DataPoints().At(i)
131128
attrs, times, values, traceIDs, spanIDs := convertExemplars(dp.Exemplars())
132129
_, err = statement.ExecContext(ctx,
133-
model.metadata.ResAttr,
130+
AttributesToMap(model.metadata.ResAttr),
134131
model.metadata.ResURL,
135132
model.metadata.ScopeInstr.Name(),
136133
model.metadata.ScopeInstr.Version(),
137-
attributesToMap(model.metadata.ScopeInstr.Attributes()),
134+
AttributesToMap(model.metadata.ScopeInstr.Attributes()),
138135
model.metadata.ScopeInstr.DroppedAttributesCount(),
139136
model.metadata.ScopeURL,
140-
serviceName,
137+
serviceName.AsString(),
141138
model.metricName,
142139
model.metricDescription,
143140
model.metricUnit,
144-
attributesToMap(dp.Attributes()),
141+
AttributesToMap(dp.Attributes()),
145142
dp.StartTimestamp().AsTime(),
146143
dp.Timestamp().AsTime(),
147144
dp.Count(),
@@ -177,7 +174,7 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error {
177174
return nil
178175
}
179176

180-
func (h *histogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
177+
func (h *histogramMetrics) Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
181178
histogram, ok := metrics.(pmetric.Histogram)
182179
if !ok {
183180
return fmt.Errorf("metrics param is not type of Histogram")

exporter/clickhouseexporter/internal/metrics_model.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"sync"
1414

1515
"github.com/ClickHouse/clickhouse-go/v2"
16+
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
17+
"github.com/ClickHouse/clickhouse-go/v2/lib/column/orderedmap"
1618
"go.opentelemetry.io/collector/pdata/pcommon"
1719
"go.opentelemetry.io/collector/pdata/pmetric"
1820
"go.uber.org/zap"
@@ -38,14 +40,14 @@ type MetricTypeConfig struct {
3840
// any type of metrics need implement it.
3941
type MetricsModel interface {
4042
// Add used to bind MetricsMetaData to a specific metric then put them into a slice
41-
Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error
43+
Add(resAttr pcommon.Map, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error
4244
// insert is used to insert metric data to clickhouse
4345
insert(ctx context.Context, db *sql.DB) error
4446
}
4547

4648
// MetricsMetaData contain specific metric data
4749
type MetricsMetaData struct {
48-
ResAttr map[string]string
50+
ResAttr pcommon.Map
4951
ResURL string
5052
ScopeURL string
5153
ScopeInstr pcommon.InstrumentationScope
@@ -118,7 +120,7 @@ func convertExemplars(exemplars pmetric.ExemplarSlice) (clickhouse.ArraySet, cli
118120
)
119121
for i := 0; i < exemplars.Len(); i++ {
120122
exemplar := exemplars.At(i)
121-
attrs = append(attrs, attributesToMap(exemplar.FilteredAttributes()))
123+
attrs = append(attrs, AttributesToMap(exemplar.FilteredAttributes()))
122124
times = append(times, exemplar.Timestamp().AsTime())
123125
values = append(values, getValue(exemplar.IntValue(), exemplar.DoubleValue(), exemplar.ValueType()))
124126

@@ -165,13 +167,12 @@ func getValue(intValue int64, floatValue float64, dataType any) float64 {
165167
}
166168
}
167169

168-
func attributesToMap(attributes pcommon.Map) map[string]string {
169-
m := make(map[string]string, attributes.Len())
170-
attributes.Range(func(k string, v pcommon.Value) bool {
171-
m[k] = v.AsString()
172-
return true
173-
})
174-
return m
170+
func AttributesToMap(attributes pcommon.Map) column.IterableOrderedMap {
171+
return orderedmap.CollectN(func(yield func(string, string) bool) {
172+
attributes.Range(func(k string, v pcommon.Value) bool {
173+
return yield(k, v.AsString())
174+
})
175+
}, attributes.Len())
175176
}
176177

177178
func convertSliceToArraySet[T any](slice []T) clickhouse.ArraySet {

0 commit comments

Comments
 (0)