Skip to content

Commit 0f7acb1

Browse files
committed
Lazily initalize cadence client when sending stats in dogstatsd-client
APMSP-1805. Sidecar potentially has 100s of sessions, each of which creates dogstatsd clients. Defer initalizing new cadence clients until we are actually ready to send. Cadence creates a new OS thread every time a client is initialized.
1 parent 4ccb7b6 commit 0f7acb1

File tree

5 files changed

+137
-73
lines changed

5 files changed

+137
-73
lines changed

Cargo.lock

+1-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline/src/trace_exporter/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ddcommon::header::{
2323
};
2424
use ddcommon::tag::Tag;
2525
use ddcommon::{connector, tag, Endpoint};
26-
use dogstatsd_client::{new_flusher, Client, DogStatsDAction};
26+
use dogstatsd_client::{new, Client, DogStatsDAction};
2727
use either::Either;
2828
use hyper::body::HttpBody;
2929
use hyper::http::uri::PathAndQuery;
@@ -921,8 +921,8 @@ impl TraceExporterBuilder {
921921
.build()?;
922922

923923
let dogstatsd = self.dogstatsd_url.and_then(|u| {
924-
new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
925-
// None
924+
new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
925+
// None
926926
});
927927

928928
let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL);

dogstatsd-client/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ serde = { version = "1.0", features = ["derive", "rc"] }
1616
tracing = { version = "0.1", default-features = false }
1717
anyhow = { version = "1.0" }
1818
http = "0.2"
19+
20+
21+
[dev-dependencies]
22+
tokio = {version = "1.23", features = ["rt", "time", "test-util", "rt-multi-thread"], default-features = false}

dogstatsd-client/src/lib.rs

+127-66
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use ddcommon::tag::Tag;
99
use ddcommon::Endpoint;
1010
use serde::{Deserialize, Serialize};
1111
use std::fmt::Debug;
12-
use tracing::{debug, error, info};
12+
use tracing::error;
1313

1414
use anyhow::anyhow;
1515
use cadence::prelude::*;
@@ -21,6 +21,7 @@ use ddcommon::connector::uds::socket_path_from_uri;
2121
use std::net::{ToSocketAddrs, UdpSocket};
2222
#[cfg(unix)]
2323
use std::os::unix::net::UnixDatagram;
24+
use std::sync::{Arc, Mutex};
2425

2526
// Queue with a maximum capacity of 32K elements
2627
const QUEUE_SIZE: usize = 32 * 1024;
@@ -67,62 +68,57 @@ pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
6768
Set(T, i64, V),
6869
}
6970

70-
/// A dogstatsd-client that flushes stats to a given endpoint. Use `new_flusher` to build one.
71-
#[derive(Debug)]
71+
/// A dogstatsd-client that flushes stats to a given endpoint.
72+
#[derive(Debug, Default)]
7273
pub struct Client {
73-
client: StatsdClient,
74+
client: Arc<Mutex<Option<StatsdClient>>>,
75+
endpoint: Option<Endpoint>,
7476
}
7577

7678
/// Build a new flusher instance pointed at the provided endpoint.
7779
/// Returns error if the provided endpoint is not valid.
78-
pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result<Client> {
80+
pub fn new(endpoint: Endpoint) -> anyhow::Result<Client> {
81+
// defer initialization of the client until the first metric is sent and we definitely know the
82+
// client is going to be used to communicate with the endpoint.
7983
Ok(Client {
80-
client: create_client(&endpoint)?,
84+
endpoint: Some(endpoint),
85+
..Default::default()
8186
})
8287
}
8388

8489
impl Client {
85-
/// Set the destination for dogstatsd metrics, if an API Key is provided the client is disabled
86-
/// as dogstatsd is not allowed in agentless mode. Returns an error if the provided endpoint
87-
/// is invalid.
88-
pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> {
89-
self.client = match endpoint.api_key {
90-
Some(_) => {
91-
info!("DogStatsD is not available in agentless mode");
92-
anyhow::bail!("DogStatsD is not available in agentless mode");
93-
}
94-
None => {
95-
debug!("Updating DogStatsD endpoint to {}", endpoint.url);
96-
create_client(&endpoint)?
97-
}
98-
};
99-
Ok(())
100-
}
101-
10290
/// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned"
10391
/// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details.
10492
pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
105-
let client = &self.client;
93+
let client_guard = match self.get_or_init_client() {
94+
Ok(guard) => guard,
95+
Err(e) => {
96+
error!("Failed to get client: {}", e);
97+
return;
98+
}
99+
};
106100

107-
for action in actions {
108-
if let Err(err) = match action {
109-
DogStatsDActionOwned::Count(metric, value, tags) => {
110-
do_send(client.count_with_tags(metric.as_ref(), value), &tags)
111-
}
112-
DogStatsDActionOwned::Distribution(metric, value, tags) => {
113-
do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
101+
if let Some(client) = &*client_guard {
102+
for action in actions {
103+
if let Err(err) = match action {
104+
DogStatsDActionOwned::Count(metric, value, tags) => {
105+
do_send(client.count_with_tags(metric.as_ref(), value), &tags)
106+
}
107+
DogStatsDActionOwned::Distribution(metric, value, tags) => {
108+
do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
109+
}
110+
DogStatsDActionOwned::Gauge(metric, value, tags) => {
111+
do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
112+
}
113+
DogStatsDActionOwned::Histogram(metric, value, tags) => {
114+
do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
115+
}
116+
DogStatsDActionOwned::Set(metric, value, tags) => {
117+
do_send(client.set_with_tags(metric.as_ref(), value), &tags)
118+
}
119+
} {
120+
error!("Error while sending metric: {}", err);
114121
}
115-
DogStatsDActionOwned::Gauge(metric, value, tags) => {
116-
do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
117-
}
118-
DogStatsDActionOwned::Histogram(metric, value, tags) => {
119-
do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
120-
}
121-
DogStatsDActionOwned::Set(metric, value, tags) => {
122-
do_send(client.set_with_tags(metric.as_ref(), value), &tags)
123-
}
124-
} {
125-
error!("Error while sending metric: {}", err);
126122
}
127123
}
128124
}
@@ -133,31 +129,53 @@ impl Client {
133129
&self,
134130
actions: Vec<DogStatsDAction<'a, T, V>>,
135131
) {
136-
let client = &self.client;
137-
138-
for action in actions {
139-
if let Err(err) = match action {
140-
DogStatsDAction::Count(metric, value, tags) => {
141-
let metric_builder = client.count_with_tags(metric.as_ref(), value);
142-
do_send(metric_builder, tags)
143-
}
144-
DogStatsDAction::Distribution(metric, value, tags) => {
145-
do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
146-
}
147-
DogStatsDAction::Gauge(metric, value, tags) => {
148-
do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
149-
}
150-
DogStatsDAction::Histogram(metric, value, tags) => {
151-
do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
152-
}
153-
DogStatsDAction::Set(metric, value, tags) => {
154-
do_send(client.set_with_tags(metric.as_ref(), value), tags)
132+
let client_guard = match self.get_or_init_client() {
133+
Ok(guard) => guard,
134+
Err(e) => {
135+
error!("Failed to get client: {}", e);
136+
return;
137+
}
138+
};
139+
if let Some(client) = &*client_guard {
140+
for action in actions {
141+
if let Err(err) = match action {
142+
DogStatsDAction::Count(metric, value, tags) => {
143+
let metric_builder = client.count_with_tags(metric.as_ref(), value);
144+
do_send(metric_builder, tags)
145+
}
146+
DogStatsDAction::Distribution(metric, value, tags) => {
147+
do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
148+
}
149+
DogStatsDAction::Gauge(metric, value, tags) => {
150+
do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
151+
}
152+
DogStatsDAction::Histogram(metric, value, tags) => {
153+
do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
154+
}
155+
DogStatsDAction::Set(metric, value, tags) => {
156+
do_send(client.set_with_tags(metric.as_ref(), value), tags)
157+
}
158+
} {
159+
error!("Error while sending metric: {}", err);
155160
}
156-
} {
157-
error!("Error while sending metric: {}", err);
158161
}
159162
}
160163
}
164+
165+
fn get_or_init_client(&self) -> anyhow::Result<std::sync::MutexGuard<Option<StatsdClient>>> {
166+
let mut client_guard = self
167+
.client
168+
.lock()
169+
.map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {}", e))?;
170+
171+
if client_guard.is_none() {
172+
if let Some(endpoint) = &self.endpoint {
173+
*client_guard = Some(create_client(endpoint)?);
174+
}
175+
}
176+
177+
Ok(client_guard)
178+
}
161179
}
162180

163181
fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
@@ -229,13 +247,14 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
229247
#[cfg(test)]
230248
mod test {
231249
use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
232-
use crate::{create_client, new_flusher, DogStatsDActionOwned};
250+
use crate::{create_client, new, DogStatsDActionOwned};
233251
#[cfg(unix)]
234252
use ddcommon::connector::uds::socket_path_to_uri;
235253
use ddcommon::{tag, Endpoint};
236254
#[cfg(unix)]
237255
use http::Uri;
238256
use std::net;
257+
use std::sync::Arc;
239258
use std::time::Duration;
240259

241260
#[test]
@@ -244,7 +263,7 @@ mod test {
244263
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
245264
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
246265

247-
let flusher = new_flusher(Endpoint::from_slice(
266+
let flusher = new(Endpoint::from_slice(
248267
socket.local_addr().unwrap().to_string().as_str(),
249268
))
250269
.unwrap();
@@ -333,12 +352,54 @@ mod test {
333352
Histogram(_, _, _) => {}
334353
Set(_, _, _) => {}
335354
}
336-
337355
// TODO: when std::mem::variant_count is in stable we can do this instead
338356
// assert_eq!(
339357
// std::mem::variant_count::<DogStatsDActionOwned>(),
340358
// std::mem::variant_count::<DogStatsDAction<String, Vec<&Tag>>>(),
341359
// "DogStatsDActionOwned and DogStatsDAction should have the same number of variants,
342360
// did you forget to update one?", );
343361
}
362+
363+
#[tokio::test]
364+
async fn test_thread_safety() {
365+
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
366+
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
367+
let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str());
368+
let flusher = Arc::new(new(endpoint.clone()).unwrap());
369+
370+
{
371+
let client = flusher
372+
.client
373+
.lock()
374+
.expect("failed to obtain lock on client");
375+
assert!(client.is_none());
376+
}
377+
378+
let tasks: Vec<_> = (0..10)
379+
.map(|_| {
380+
let flusher_clone = Arc::clone(&flusher);
381+
tokio::spawn(async move {
382+
flusher_clone.send(vec![
383+
Count("test_count", 3, &vec![tag!("foo", "bar")]),
384+
Count("test_neg_count", -2, &vec![]),
385+
Distribution("test_distribution", 4.2, &vec![]),
386+
Gauge("test_gauge", 7.6, &vec![]),
387+
Histogram("test_histogram", 8.0, &vec![]),
388+
Set("test_set", 9, &vec![tag!("the", "end")]),
389+
Set("test_neg_set", -1, &vec![]),
390+
]);
391+
392+
let client = flusher_clone
393+
.client
394+
.lock()
395+
.expect("failed to obtain lock on client within send thread");
396+
assert!(client.is_some());
397+
})
398+
})
399+
.collect();
400+
401+
for task in tasks {
402+
task.await.unwrap();
403+
}
404+
}
344405
}

sidecar/src/service/sidecar_server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use datadog_live_debugger::sender::DebuggerType;
5454
use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats};
5555
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
5656
use ddcommon::tag::Tag;
57-
use dogstatsd_client::{new_flusher, DogStatsDActionOwned};
57+
use dogstatsd_client::{new, DogStatsDActionOwned};
5858
use tinybytes;
5959

6060
type NoResponse = Ready<()>;
@@ -706,7 +706,7 @@ impl SidecarInterface for SidecarServer {
706706
cfg.set_endpoint(endpoint).ok();
707707
});
708708
session.configure_dogstatsd(|dogstatsd| {
709-
let d = new_flusher(config.dogstatsd_endpoint.clone()).ok();
709+
let d = new(config.dogstatsd_endpoint.clone()).ok();
710710
*dogstatsd = d;
711711
});
712712
session.modify_debugger_config(|cfg| {

0 commit comments

Comments
 (0)