Skip to content

Commit f166fb9

Browse files
authored
Introduce metricsConsumer and gaugeMetricConsumer. (#5426)
* Introduce metricsConsumer and gaugeMetricConsumer. * Add a test to get 100% test coverage. * Mostly Formatting changes. Test for duplicate consumers. * Fix lint errors.
1 parent de841b9 commit f166fb9

File tree

2 files changed

+483
-0
lines changed

2 files changed

+483
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tanzuobservabilityexporter
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
22+
"go.opentelemetry.io/collector/model/pdata"
23+
"go.uber.org/multierr"
24+
)
25+
26+
// metricsConsumer instances consume OTEL metrics
27+
type metricsConsumer struct {
28+
consumerMap map[pdata.MetricDataType]metricConsumer
29+
sender flushCloser
30+
}
31+
32+
// newMetricsConsumer returns a new metricsConsumer. consumers are the
33+
// consumers responsible for consuming each type metric. The Consume method
34+
// of returned consumer calls the Flush method on sender after consuming
35+
// all the metrics. Calling Close on the returned metricsConsumer calls Close
36+
// on sender. sender can be nil.
37+
func newMetricsConsumer(consumers []metricConsumer, sender flushCloser) *metricsConsumer {
38+
consumerMap := make(map[pdata.MetricDataType]metricConsumer, len(consumers))
39+
for _, consumer := range consumers {
40+
if consumerMap[consumer.Type()] != nil {
41+
panic("duplicate consumer type detected: " + consumer.Type().String())
42+
}
43+
consumerMap[consumer.Type()] = consumer
44+
}
45+
return &metricsConsumer{
46+
consumerMap: consumerMap,
47+
sender: sender,
48+
}
49+
}
50+
51+
// Consume consumes OTEL metrics. For each metric in md, it delegates to the
52+
// metricConsumer that consumes that type of metric. Once Consume consumes
53+
// all the metrics, it calls Flush() on the sender passed to
54+
// newMetricsConsumer.
55+
func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error {
56+
var errs []error
57+
rms := md.ResourceMetrics()
58+
for i := 0; i < rms.Len(); i++ {
59+
ilms := rms.At(i).InstrumentationLibraryMetrics()
60+
for j := 0; j < ilms.Len(); j++ {
61+
ms := ilms.At(j).Metrics()
62+
for k := 0; k < ms.Len(); k++ {
63+
m := ms.At(k)
64+
select {
65+
case <-ctx.Done():
66+
return multierr.Combine(append(errs, errors.New("context canceled"))...)
67+
default:
68+
c.pushSingleMetric(m, &errs)
69+
}
70+
}
71+
}
72+
}
73+
if c.sender != nil {
74+
if err := c.sender.Flush(); err != nil {
75+
errs = append(errs, err)
76+
}
77+
}
78+
return multierr.Combine(errs...)
79+
}
80+
81+
// Close closes this metricsConsumer by calling Close on the sender passed
82+
// to newMetricsConsumer.
83+
func (c *metricsConsumer) Close() {
84+
if c.sender != nil {
85+
c.sender.Close()
86+
}
87+
}
88+
89+
func (c *metricsConsumer) pushSingleMetric(m pdata.Metric, errs *[]error) {
90+
dataType := m.DataType()
91+
consumer := c.consumerMap[dataType]
92+
if consumer == nil {
93+
*errs = append(
94+
*errs, fmt.Errorf("no support for metric type %v", dataType))
95+
96+
} else {
97+
consumer.Consume(m, errs)
98+
}
99+
}
100+
101+
// metricConsumer consumes one specific type of OTEL metric
102+
type metricConsumer interface {
103+
104+
// Type returns the type of metric this consumer consumes. For example
105+
// Gauge, Sum, or Histogram
106+
Type() pdata.MetricDataType
107+
108+
// Consume consumes the metric and appends any errors encountered to errs
109+
Consume(m pdata.Metric, errs *[]error)
110+
}
111+
112+
// flushCloser is the interface for the Flush and Close method
113+
type flushCloser interface {
114+
Flush() error
115+
Close()
116+
}
117+
118+
// gaugeSender sends gauge metrics to tanzu observability
119+
type gaugeSender interface {
120+
SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error
121+
}
122+
123+
type gaugeConsumer struct {
124+
sender gaugeSender
125+
}
126+
127+
// newGaugeConsumer returns a metricConsumer that consumes gauge metrics
128+
// by sending them to tanzu observability
129+
func newGaugeConsumer(sender gaugeSender) metricConsumer {
130+
return &gaugeConsumer{sender: sender}
131+
}
132+
133+
func (g *gaugeConsumer) Type() pdata.MetricDataType {
134+
return pdata.MetricDataTypeGauge
135+
}
136+
137+
func (g *gaugeConsumer) Consume(metric pdata.Metric, errs *[]error) {
138+
gauge := metric.Gauge()
139+
numberDataPoints := gauge.DataPoints()
140+
for i := 0; i < numberDataPoints.Len(); i++ {
141+
g.pushSingleNumberDataPoint(metric, numberDataPoints.At(i), errs)
142+
}
143+
}
144+
145+
func (g *gaugeConsumer) pushSingleNumberDataPoint(
146+
metric pdata.Metric, numberDataPoint pdata.NumberDataPoint, errs *[]error) {
147+
tags := attributesToTags(numberDataPoint.Attributes())
148+
ts := numberDataPoint.Timestamp().AsTime().Unix()
149+
value, err := getValue(numberDataPoint)
150+
if err != nil {
151+
*errs = append(*errs, err)
152+
return
153+
}
154+
err = g.sender.SendMetric(metric.Name(), value, ts, "", tags)
155+
if err != nil {
156+
*errs = append(*errs, err)
157+
}
158+
}
159+
160+
func getValue(numberDataPoint pdata.NumberDataPoint) (float64, error) {
161+
switch numberDataPoint.Type() {
162+
case pdata.MetricValueTypeInt:
163+
return float64(numberDataPoint.IntVal()), nil
164+
case pdata.MetricValueTypeDouble:
165+
return numberDataPoint.DoubleVal(), nil
166+
default:
167+
return 0.0, errors.New("unsupported metric value type")
168+
}
169+
}

0 commit comments

Comments
 (0)