[GH-ISSUE #1947] Question: Why does trustdns open and close a socket each time it is sending a request? #827

Open
opened 2026-03-16 00:27:22 +03:00 by kerem · 4 comments
Owner

Originally created by @CalderWhite on GitHub (May 28, 2023).
Original GitHub issue: https://github.com/hickory-dns/hickory-dns/issues/1947

PLATFORM: x86_64
OS: Ubuntu 22
Misc: Running in linode cloud on a 4vCPU machine with 8GB of ram.

I am running some performance tests on this library. I already ran into issues with the resolver copying memory too much (5GB+). I tracked that down to it calling .clone() on the lru cache too much. So instead I have been using the client directly and I noticed after profiling that much of the runtime of the program (~50%) is being spent on opening and closing UDP sockets.

My code creates one client for each DNS resolver server then distributes calls amongst them. I can achieve 40k QPS with 4096 resolver servers but after that the overhead of the UDP socket creation and destruction is too great for the server. This can be seen in the flamegraph of my code as well as the kernel usage in htop (mostly red bars, meaning the kernel is doing a ton of work instead of the user threads).

Screenshot 2023-05-28 at 1 13 24 AM Screenshot 2023-05-28 at 1 15 47 AM

Here's my code for reference:

struct MassDnsWorker<T>
where
    T: Iterator<Item = String>,
{
    nameserver_str: String,
    buffer_size: usize,
    reader: Arc<Mutex<T>>,
    tx: Sender<(Option<DnsResponse>, String, Option<String>)>,
}
impl<T> MassDnsWorker<T>
where
    T: Iterator<Item = String>,
{
    pub fn new(
        nameserver_str: String,
        reader: Arc<Mutex<T>>,
        tx: Sender<(Option<DnsResponse>, String, Option<String>)>,
    ) -> Self {
        Self {
            nameserver_str,
            buffer_size: 128,
            reader,
            tx,
        }
    }

    pub async fn run(&self) {
        let address = self.nameserver_str.parse().expect(&format!(
            "Nameserver [{}] address could not be parsed. Should be [ip:port].",
            self.nameserver_str,
        ));
        let conn = UdpClientConnection::new(address).expect(&format!(
            "Could not make UdpClientConnection for address [{}]",
            self.nameserver_str
        ));
        let stream = conn.new_stream(None);
        let (mut client, bg) = AsyncClient::connect(stream).await.expect(&format!(
            "Could not create AyncClient via ::connect for domain {}",
            self.nameserver_str
        ));
        let background_future = tokio::spawn(bg);
        let mut has_more_domains = true;
        while has_more_domains {
            let mut read_lock = self.reader.lock().await;
            let mut domain_chunk = Vec::with_capacity(self.buffer_size);
            for _i in 0..self.buffer_size {
                domain_chunk.push(read_lock.next());
            }
            drop(read_lock);

            for domain_wrapped in domain_chunk {
                let domain = match domain_wrapped {
                    Some(inner) => inner,
                    None => {
                        // If we get None from the iterator then we are done reading!
                        has_more_domains = false;
                        continue;
                    }
                };
                self.process_domain(&mut client, domain).await;
            }
        }

        background_future.abort();
    }

    pub async fn process_domain(&self, client: &mut AsyncClient, domain: String) {
        // we own domain now. Time to look it up and send it back to the channel
        let domain_as_name = match Name::from_str(&format!("{domain}.")) {
            Ok(inner) => inner,
            Err(_error) => {
                self.tx
                    .send((None, domain, Some(_error.to_string())))
                    .await
                    .expect("Could not send failure to failure channel");
                return;
            } // TODO: Write these to a mpsc channel so the caller knows which domains couldn't be parsed
        };

        match client
            .query(domain_as_name, DNSClass::IN, RecordType::A)
            .await
        {
            Ok(dns_message) => {
                self.tx
                    .send((Some(dns_message), domain, None))
                    .await
                    .expect("Couldn't send dns_message to success channel.");
            }
            Err(_error) => {
                self.tx
                    .send((
                        None,
                        domain,
                        Some(format!("[{}] {}", self.nameserver_str, _error.to_string())),
                    ))
                    .await
                    .expect("Could not send failure to failure channel");
                return;
            }
        }
    }
}

struct MassDnsResolver {
    nameservers: Vec<String>,
}

impl MassDnsResolver {
    pub fn new(nameservers: std::vec::IntoIter<String>) -> Self {
        Self {
            nameservers: nameservers.collect(),
        }
    }

    pub fn run(
        &self,
        domains_iter: std::vec::IntoIter<String>,
        result_tx: Sender<(Option<DnsResponse>, String, Option<String>)>,
    ) {
        let locked_domains_iter = Arc::new(Mutex::new(domains_iter));
        // init the background tasks that handle each DNS server
        self.nameservers
            .iter()
            .map(|s| format!("{s}:53"))
            .for_each(move |nameserver| {
                let domains_ref = locked_domains_iter.clone();
                let tx_ref = result_tx.clone();
                tokio::spawn(async move {
                    let worker = MassDnsWorker::new(nameserver, domains_ref, tx_ref);
                    worker.run().await;
                });
            });
    }
}

async fn run() {
    let mut start = SystemTime::now();
    let (tx, mut rx) = mpsc::channel(10_000);
    let domains = get_cb_domains();
    info!("Reading domains: {:?} elapsed", start.elapsed());
    start = SystemTime::now();
    let num_domains = domains.len();
    let resolver_strs = get_resolvers();
    resolver_strs
        .iter()
        .for_each(|domain| info!("Using resolver: {}", domain));
    let resolver = MassDnsResolver::new(resolver_strs.into_iter());
    resolver.run(domains.into_iter(), tx);

    let mut good = 0;
    let mut last = SystemTime::now();
    for total in 0..num_domains {
        if total % 10_000 == 0 {
            info!(
                "{total} {:.1} good: {:.1}%",
                10_000.0 / last.elapsed().unwrap().as_secs_f64(),
                100.0 * good as f64 / total as f64
            );
            last = SystemTime::now();
        }

        let (wrapped_response, _domain, _err_str) = match rx.recv().await {
            Some(msg) => msg,
            None => continue,
        };

        if wrapped_response.is_none() {
            // handle error
            // warn!("Bad domain: {domain}, {}", err_str.unwrap());
            continue;
        }

        good += 1;
    }

    info!("processing domains: {:?} elapsed", start.elapsed());
}

fn main() {
    env_logger::Builder::new()
        .filter_level(LevelFilter::Info)
        .init();

    let rt = runtime::Builder::new_multi_thread()
        .worker_threads(4)
        .enable_all()
        .build()
        .expect("Could not create tokio runtime");

    rt.block_on(async {
        run().await;
    })
}

Originally created by @CalderWhite on GitHub (May 28, 2023). Original GitHub issue: https://github.com/hickory-dns/hickory-dns/issues/1947 PLATFORM: x86_64 OS: Ubuntu 22 Misc: Running in linode cloud on a 4vCPU machine with 8GB of ram. I am running some performance tests on this library. I already ran into issues with the resolver copying memory too much (5GB+). I tracked that down to it calling `.clone()` on the lru cache too much. So instead I have been using the client directly and I noticed after profiling that much of the runtime of the program (~50%) is being spent on opening and closing UDP sockets. My code creates one client for each DNS resolver server then distributes calls amongst them. I can achieve 40k QPS with 4096 resolver servers but after that the overhead of the UDP socket creation and destruction is too great for the server. This can be seen in the flamegraph of my code as well as the kernel usage in htop (mostly red bars, meaning the kernel is doing a ton of work instead of the user threads). <img width="1503" alt="Screenshot 2023-05-28 at 1 13 24 AM" src="https://github.com/bluejekyll/trust-dns/assets/15067287/b6fd10e2-d854-4298-938a-e1e9f7a727ee"> <img width="588" alt="Screenshot 2023-05-28 at 1 15 47 AM" src="https://github.com/bluejekyll/trust-dns/assets/15067287/d58ca46b-0b8d-4fa0-b66a-dd43519a2b32"> Here's my code for reference: ```rust struct MassDnsWorker<T> where T: Iterator<Item = String>, { nameserver_str: String, buffer_size: usize, reader: Arc<Mutex<T>>, tx: Sender<(Option<DnsResponse>, String, Option<String>)>, } impl<T> MassDnsWorker<T> where T: Iterator<Item = String>, { pub fn new( nameserver_str: String, reader: Arc<Mutex<T>>, tx: Sender<(Option<DnsResponse>, String, Option<String>)>, ) -> Self { Self { nameserver_str, buffer_size: 128, reader, tx, } } pub async fn run(&self) { let address = self.nameserver_str.parse().expect(&format!( "Nameserver [{}] address could not be parsed. Should be [ip:port].", self.nameserver_str, )); let conn = UdpClientConnection::new(address).expect(&format!( "Could not make UdpClientConnection for address [{}]", self.nameserver_str )); let stream = conn.new_stream(None); let (mut client, bg) = AsyncClient::connect(stream).await.expect(&format!( "Could not create AyncClient via ::connect for domain {}", self.nameserver_str )); let background_future = tokio::spawn(bg); let mut has_more_domains = true; while has_more_domains { let mut read_lock = self.reader.lock().await; let mut domain_chunk = Vec::with_capacity(self.buffer_size); for _i in 0..self.buffer_size { domain_chunk.push(read_lock.next()); } drop(read_lock); for domain_wrapped in domain_chunk { let domain = match domain_wrapped { Some(inner) => inner, None => { // If we get None from the iterator then we are done reading! has_more_domains = false; continue; } }; self.process_domain(&mut client, domain).await; } } background_future.abort(); } pub async fn process_domain(&self, client: &mut AsyncClient, domain: String) { // we own domain now. Time to look it up and send it back to the channel let domain_as_name = match Name::from_str(&format!("{domain}.")) { Ok(inner) => inner, Err(_error) => { self.tx .send((None, domain, Some(_error.to_string()))) .await .expect("Could not send failure to failure channel"); return; } // TODO: Write these to a mpsc channel so the caller knows which domains couldn't be parsed }; match client .query(domain_as_name, DNSClass::IN, RecordType::A) .await { Ok(dns_message) => { self.tx .send((Some(dns_message), domain, None)) .await .expect("Couldn't send dns_message to success channel."); } Err(_error) => { self.tx .send(( None, domain, Some(format!("[{}] {}", self.nameserver_str, _error.to_string())), )) .await .expect("Could not send failure to failure channel"); return; } } } } struct MassDnsResolver { nameservers: Vec<String>, } impl MassDnsResolver { pub fn new(nameservers: std::vec::IntoIter<String>) -> Self { Self { nameservers: nameservers.collect(), } } pub fn run( &self, domains_iter: std::vec::IntoIter<String>, result_tx: Sender<(Option<DnsResponse>, String, Option<String>)>, ) { let locked_domains_iter = Arc::new(Mutex::new(domains_iter)); // init the background tasks that handle each DNS server self.nameservers .iter() .map(|s| format!("{s}:53")) .for_each(move |nameserver| { let domains_ref = locked_domains_iter.clone(); let tx_ref = result_tx.clone(); tokio::spawn(async move { let worker = MassDnsWorker::new(nameserver, domains_ref, tx_ref); worker.run().await; }); }); } } async fn run() { let mut start = SystemTime::now(); let (tx, mut rx) = mpsc::channel(10_000); let domains = get_cb_domains(); info!("Reading domains: {:?} elapsed", start.elapsed()); start = SystemTime::now(); let num_domains = domains.len(); let resolver_strs = get_resolvers(); resolver_strs .iter() .for_each(|domain| info!("Using resolver: {}", domain)); let resolver = MassDnsResolver::new(resolver_strs.into_iter()); resolver.run(domains.into_iter(), tx); let mut good = 0; let mut last = SystemTime::now(); for total in 0..num_domains { if total % 10_000 == 0 { info!( "{total} {:.1} good: {:.1}%", 10_000.0 / last.elapsed().unwrap().as_secs_f64(), 100.0 * good as f64 / total as f64 ); last = SystemTime::now(); } let (wrapped_response, _domain, _err_str) = match rx.recv().await { Some(msg) => msg, None => continue, }; if wrapped_response.is_none() { // handle error // warn!("Bad domain: {domain}, {}", err_str.unwrap()); continue; } good += 1; } info!("processing domains: {:?} elapsed", start.elapsed()); } fn main() { env_logger::Builder::new() .filter_level(LevelFilter::Info) .init(); let rt = runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() .build() .expect("Could not create tokio runtime"); rt.block_on(async { run().await; }) } ```
Author
Owner

@CalderWhite commented on GitHub (May 28, 2023):

I'll also mention that I tried TCP but it would throw a proto error "busy" after a few requests and I'm not sure how I should handle this.

<!-- gh-comment-id:1565907008 --> @CalderWhite commented on GitHub (May 28, 2023): I'll also mention that I tried TCP but it would throw a proto error "busy" after a few requests and I'm not sure how I should handle this.
Author
Owner

@djc commented on GitHub (May 28, 2023):

Thanks for investigating performance with the trust-dns libraries! Sounds like you've run into some interesting results.

I believe the behavior for creating new UDP sockets is because of potential security issues where it's possible to forge DNS responses if a socket is reused over time. Maybe search the issue tracker for previous discussions?

In general the resolver library has seen more work and would probably be a better place to invest in improvements assuming it can meet your performance budget. Would be interesting to dig into the cache cloning issue more to see how it can be mitigated. Maybe you want a set of (one per core) independently caching resolvers rather than resolvers sharing a single cache?

As for the busy issue, I assume this means that the server dislikes the rate at which you're sending requests and asking you to slow down (rate limiting), but I'm a little unclear on what you're using as the server here.

<!-- gh-comment-id:1566054285 --> @djc commented on GitHub (May 28, 2023): Thanks for investigating performance with the trust-dns libraries! Sounds like you've run into some interesting results. I believe the behavior for creating new UDP sockets is because of potential security issues where it's possible to forge DNS responses if a socket is reused over time. Maybe search the issue tracker for previous discussions? In general the resolver library has seen more work and would probably be a better place to invest in improvements assuming it can meet your performance budget. Would be interesting to dig into the cache cloning issue more to see how it can be mitigated. Maybe you want a set of (one per core) independently caching resolvers rather than resolvers sharing a single cache? As for the busy issue, I assume this means that the server dislikes the rate at which you're sending requests and asking you to slow down (rate limiting), but I'm a little unclear on what you're using as the server here.
Author
Owner

@CalderWhite commented on GitHub (May 28, 2023):

Thanks for the quick response!

  1. I will see if there is mention of this before.
  2. I'm not entirely sure what is causing the massive memory usage. The only thing I know for sure is it happened as a result of using many different resolver servers for the same AsyncResolver. When I have 1 server for 1 resolver on seperate tokio tasks everything is fine. Weird.
  3. Yeah makes sense. I suppose I should just ignore them and sleep. In UDP my requests just get slower and slower but in TCP it actually sends back this error response causing my code to panic. I think I could catch the error if I wanted to though.
<!-- gh-comment-id:1566129627 --> @CalderWhite commented on GitHub (May 28, 2023): Thanks for the quick response! 1. I will see if there is mention of this before. 2. I'm not entirely sure what is causing the massive memory usage. The only thing I know for sure is it happened as a result of using many different resolver servers for the same AsyncResolver. When I have 1 server for 1 resolver on seperate tokio tasks everything is fine. Weird. 3. Yeah makes sense. I suppose I should just ignore them and sleep. In UDP my requests just get slower and slower but in TCP it actually sends back this error response causing my code to panic. I think I could catch the error if I wanted to though.
Author
Owner

@bluejekyll commented on GitHub (May 30, 2023):

Yeah, for UDP the default behavior is to select a new UDP socket and port on all requests in order to reduce the chance of spoofed/forged responses. It's something that we could theoretically add an option to disable, but the aim of the library has generally been one of resilience.

On TCP, I'm going to agree with @djc and say you're probably being rate limited.

<!-- gh-comment-id:1568905565 --> @bluejekyll commented on GitHub (May 30, 2023): Yeah, for UDP the default behavior is to select a new UDP socket and port on all requests in order to reduce the chance of spoofed/forged responses. It's something that we could theoretically add an option to disable, but the aim of the library has generally been one of resilience. On TCP, I'm going to agree with @djc and say you're probably being rate limited.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/hickory-dns#827
No description provided.