Skip to content

Commit 68a18b8

Browse files
cbandyFiery-Fenix
authored andcommitted
[receiver/sqlqueryreceiver] Use one connection pool per receiver (open-telemetry#39312)
#### Description Rather than creating a separate `*sql.DB` provider function for each query, this creates one provider for the receiver and passes a method value to each query. The method uses a mutex to ensure exactly one `*sql.DB` is created when it is called concurrently. The `max_open_conn` config is defined in `sqlqueryreceiver`, so I put the implementation in its `internal` package. #### Link to tracking issue Fixes open-telemetry#39270 #### Testing Using the configuration and steps in open-telemetry#39270, I see only one connection to Postgres: ```console $ psql -h localhost -U postgres -c 'SELECT pid, backend_type, datname, usename, query, state FROM pg_stat_activity' ┌─────┬──────────────────────────────┬──────────┬──────────┬────────────────────────────────────────────────────────────────────────────────┬────────┐ │ pid │ backend_type │ datname │ usename │ query │ state │ ├─────┼──────────────────────────────┼──────────┼──────────┼────────────────────────────────────────────────────────────────────────────────┼────────┤ │ 68 │ client backend │ postgres │ postgres │ SELECT 3 AS number │ idle │ │ 70 │ client backend │ postgres │ postgres │ SELECT pid, backend_type, datname, usename, query, state FROM pg_stat_activity │ active │ │ 65 │ autovacuum launcher │ ␀ │ ␀ │ │ ␀ │ │ 66 │ logical replication launcher │ ␀ │ postgres │ │ ␀ │ │ 61 │ checkpointer │ ␀ │ ␀ │ │ ␀ │ │ 62 │ background writer │ ␀ │ ␀ │ │ ␀ │ │ 64 │ walwriter │ ␀ │ ␀ │ │ ␀ │ └─────┴──────────────────────────────┴──────────┴──────────┴────────────────────────────────────────────────────────────────────────────────┴────────┘ (7 rows) ``` Signed-off-by: Chris Bandy <[email protected]>
1 parent 9c8ddab commit 68a18b8

File tree

3 files changed

+66
-12
lines changed

3 files changed

+66
-12
lines changed

.chloggen/39270-sqlquery-pool.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
change_type: bug_fix
2+
component: sqlqueryreceiver
3+
issues: [39270]
4+
change_logs: [user]
5+
6+
note: respect `max_open_conn` configuration for multiple queries
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal"
5+
6+
import (
7+
"database/sql"
8+
"sync"
9+
10+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
11+
)
12+
13+
func NewPool(opener sqlquery.SQLOpenerFunc, driver string, dsn string, maxOpenConns int) interface {
14+
DB() (*sql.DB, error)
15+
} {
16+
return &sqlPool{
17+
DriverName: driver,
18+
DataSourceName: dsn,
19+
MaxOpenConns: maxOpenConns,
20+
Opener: opener,
21+
}
22+
}
23+
24+
type sqlPool struct {
25+
DriverName string
26+
DataSourceName string
27+
MaxOpenConns int
28+
Opener sqlquery.SQLOpenerFunc
29+
30+
mutex sync.Mutex
31+
db *sql.DB
32+
}
33+
34+
// DB initializes and returns the [sql.DB] pool. It is safe to call concurrently.
35+
// Depending on the driver, this might also connect to the database backend.
36+
//
37+
// This method exists to satisfy [sqlquery.DbProviderFunc], but the way
38+
// [sqlquery.Scraper] closes [sql.DB] can interfere with other Scrapers.
39+
func (sp *sqlPool) DB() (*sql.DB, error) {
40+
sp.mutex.Lock()
41+
defer sp.mutex.Unlock()
42+
43+
if sp.db == nil {
44+
db, err := sp.Opener(sp.DriverName, sp.DataSourceName)
45+
46+
if err == nil && db != nil {
47+
db.SetMaxOpenConns(sp.MaxOpenConns)
48+
sp.db = db
49+
}
50+
51+
return sp.db, err
52+
}
53+
54+
return sp.db, nil
55+
}

receiver/sqlqueryreceiver/receiver.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-coll
55

66
import (
77
"context"
8-
"database/sql"
98
"fmt"
109

1110
"go.opentelemetry.io/collector/component"
@@ -15,6 +14,7 @@ import (
1514
"go.opentelemetry.io/collector/scraper/scraperhelper"
1615

1716
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal"
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal/metadata"
1919
)
2020

@@ -39,29 +39,22 @@ func createMetricsReceiverFunc(sqlOpenerFunc sqlquery.SQLOpenerFunc, clientProvi
3939
) (receiver.Metrics, error) {
4040
sqlCfg := cfg.(*Config)
4141
var opts []scraperhelper.ControllerOption
42+
pool := internal.NewPool(sqlOpenerFunc, sqlCfg.Driver, sqlCfg.DataSource, sqlCfg.MaxOpenConn)
43+
4244
for i, query := range sqlCfg.Queries {
4345
if len(query.Metrics) == 0 {
4446
continue
4547
}
4648
id := component.MustNewIDWithName("sqlqueryreceiver", fmt.Sprintf("query-%d: %s", i, query.SQL))
47-
dbProviderFunc := func() (*sql.DB, error) {
48-
dbPool, err := sqlOpenerFunc(sqlCfg.Driver, sqlCfg.DataSource)
49-
if err != nil {
50-
return nil, err
51-
}
5249

53-
if dbPool != nil {
54-
dbPool.SetMaxOpenConns(sqlCfg.MaxOpenConn)
55-
}
56-
return dbPool, nil
57-
}
5850
scope := pcommon.NewInstrumentationScope()
5951
scope.SetName(metadata.ScopeName)
60-
mp := sqlquery.NewScraper(id, query, sqlCfg.ControllerConfig, settings.Logger, sqlCfg.Telemetry, dbProviderFunc, clientProviderFunc, scope)
52+
mp := sqlquery.NewScraper(id, query, sqlCfg.ControllerConfig, settings.Logger, sqlCfg.Telemetry, pool.DB, clientProviderFunc, scope)
6153

6254
opt := scraperhelper.AddScraper(metadata.Type, mp)
6355
opts = append(opts, opt)
6456
}
57+
6558
return scraperhelper.NewMetricsController(
6659
&sqlCfg.ControllerConfig,
6760
settings,

0 commit comments

Comments
 (0)