rrdnsd/
lib.rs

1// rrdnsd - monitoring and failover for Round Robin DNS records
2// Copyright 2024-2025 Federico Ceratto <federico@debian.org>
3// Released under AGPLv3
4
5use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8// TODO: switch to tracing?
9use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; // TODO: tokio exit?
19use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod api_clients;
31mod core;
32mod dash;
33
34#[cfg(test)]
35mod tests;
36
37const VERSION: &str = env!("CARGO_PKG_VERSION");
38
39#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
40pub enum Status {
41    Unknown,
42    Down,
43    Up,
44}
45
46#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
47struct Enst {
48    status: Status,
49    last_update_time: SystemTime,
50}
51
52// Used to exchange message with peers
53#[derive(Debug, Serialize, Deserialize, Clone)]
54struct QuorumMsg {
55    format_version: u8,
56    fqdn: String,
57    ipaddr: IpAddr,
58    status: Status,
59    tstamp: SystemTime,
60    node_addr: NodeAddr,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
64struct NodeAddr {
65    ipa: IpAddr,
66    port: u16,
67}
68
69/*
70Arrows represent hashmaps:
71
72ServiceStatus
73 computed_status: Status
74 computed_status_time: SystemTime
75 dns_ttl: u32
76 dns_zone: String
77 fqdn: String
78 endpoint_statuses[IpAddr] -> EndpointStatus
79                                computed_status: Status
80                                computed_status_time: SystemTime
81                                node_to_ens[node_name] -> Enst
82                                                            status: Status
83                                                            last_update_time: SystemTime
84*/
85
86type NodeToEnst = HashMap<NodeAddr, Enst>;
87
88#[derive(Debug, Clone)]
89struct EndpointStatus {
90    node_to_ens: NodeToEnst,
91    computed_status: Status,
92    computed_status_time: SystemTime,
93    probe_cancel: tokio_util::sync::CancellationToken,
94}
95
96impl Drop for EndpointStatus {
97    // send cancellation token to the related probe task
98    // when an endpoint or the whole service are deleted
99    fn drop(&mut self) {
100        self.probe_cancel.cancel();
101    }
102}
103
104type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
105
106#[derive(Debug, Clone)]
107struct ServiceStatus {
108    dns_ttl: u32,
109    dns_zone: String,
110    endpoint_statuses: EndpointStatuses,
111    probe_interval: Duration,
112    probe_target_url: String,
113}
114
115type FqdnToStatuses = HashMap<String, ServiceStatus>;
116
117#[derive(Debug)]
118enum UpWrap {
119    Update {
120        update: Update,
121    },
122    Fetch {
123        chan_dash_tx: ServiceStatusCloneSend,
124    },
125    APIChangeRequestWrap {
126        request: APIChangeRequest,
127        resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
128    },
129}
130
131#[derive(Debug)]
132struct Update {
133    fqdn: String,
134    ipaddr: IpAddr,
135    status: Status,
136    tstamp: SystemTime,
137    node_addr: NodeAddr,
138}
139
140// Channels
141type ServiceStatusCloneSend = tokio::sync::oneshot::Sender<FqdnToStatuses>;
142type UpdateSend = Sender<UpWrap>;
143type UpdateRec = Receiver<UpWrap>;
144type OutboxSend = Sender<QuorumMsg>;
145type OutboxRec = Receiver<QuorumMsg>;
146// TODO: harmonize send/rec tx/rx naming
147
148#[derive(Debug, Clone)]
149struct Shared {
150    chan_update_tx: UpdateSend,
151}
152type SharedState = axum::extract::State<Shared>;
153
154macro_rules! unwrap_or_return {
155    ($res:expr) => {
156        match $res {
157            Ok(val) => val,
158            Err(e) => {
159                error!("error: {}", e);
160                return;
161            }
162        }
163    };
164}
165
166// Like expect("..")
167macro_rules! exp {
168    ($res:expr) => {
169        match $res {
170            Ok(val) => val,
171            Err(e) => {
172                error!("{:?}", e);
173                process::exit(1);
174            }
175        }
176    };
177}
178
179// UNIX signal handlers
180
181/// Shuts down the daemon when requested
182async fn handle_sig(sig: SignalKind) {
183    exp!(signal(sig)).recv().await;
184    info!("[main] exiting");
185    process::exit(0);
186}
187
188/// Statsd timer
189fn timer(statsd: &Statsd, name: &str, t0: Instant) {
190    let delta = t0.elapsed().as_millis() as f64;
191    statsd.timer(name, delta);
192}
193
194// Configuration handling //
195
196/// Service-specific configuration block
197#[derive(Deserialize, Debug, Clone)]
198struct ServiceConf {
199    fqdn: String,
200    healthcheck: String,
201    ipaddrs: Vec<String>,
202    ttl: u32,
203    zone: String,
204    probe_interval_ms: Option<u64>,
205}
206
207/// API configuration block
208
209#[derive(Debug, Deserialize, Clone)]
210struct APIConfig {
211    enabled: bool,
212}
213
214/// How to send an update to the DNS resolver e.g. nsupdate
215#[derive(Deserialize, Debug, Clone)]
216#[serde(rename_all = "lowercase")]
217enum ResolverUpdateMethod {
218    Dynu,
219    Nsupdate,
220    Knsupdate,
221    Script { path: String },
222}
223
224/// Configuration
225#[derive(Deserialize, Debug, Clone)]
226struct Conf {
227    conf_version: u8,
228    local_node: String,
229    #[serde(skip_deserializing)]
230    parsed_local_node: Option<NodeAddr>,
231    nodes: Vec<String>,
232    probe_interval_ms: Option<u64>,
233    #[serde(skip_deserializing)]
234    parsed_nodes: Vec<NodeAddr>,
235    services: Vec<ServiceConf>,
236    update_method: ResolverUpdateMethod,
237    update_resolvers: Vec<String>,
238    // TODO: parsed_resolvers
239    update_credentials: String,
240    enable_fail_open: bool,
241    api: APIConfig,
242}
243
244fn trim_dots(s: &str) -> String {
245    s.trim_start_matches('.').trim_end_matches('.').to_owned()
246}
247
248fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
249    let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
250    debug!("[main] conf loaded");
251    if conf.conf_version != 1 {
252        bail!("Invalid configuration format version");
253    }
254    if let Some(ln) = local_node {
255        conf.local_node = ln;
256    }
257    conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
258    conf.parsed_nodes = conf
259        .nodes
260        .iter()
261        .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
262        .collect();
263    for n in &conf.update_resolvers {
264        parse_ipaddr_port(n)?;
265    }
266    for sv in &mut conf.services {
267        sv.fqdn = trim_dots(&sv.fqdn);
268        sv.zone = trim_dots(&sv.zone);
269        if sv.fqdn.ends_with(&sv.zone) {
270            continue;
271        }
272        bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
273    }
274    Ok(conf)
275}
276
277/// Loads configuration from file. Supports overriding the filename and local node name.
278fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
279    info!("[load_conf] reading {conf_fn}");
280    let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
281    prepare_conf(&jc, local_node)
282}
283
284/// Outbox: receives from `chan_outbox_rx` and send updates to peers
285async fn outbox_sender(mut chan_outbox_rx: OutboxRec, nodes: Vec<String>) {
286    // let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
287    let client_ = Reqw::builder()
288        .user_agent("rrdnsd")
289        .connect_timeout(Duration::new(5, 0))
290        .timeout(Duration::new(5, 0))
291        .tcp_keepalive(Duration::new(15, 0))
292        .build();
293    let client = unwrap_or_return!(client_);
294
295    while let Some(msg) = chan_outbox_rx.recv().await {
296        for node in &nodes {
297            let client = client.clone();
298            let msg = msg.clone();
299            let url = format!("http://{node}/qmsg");
300            let node = node.clone();
301            tokio::spawn(async move {
302                if let Err(e) = client.post(&url).json(&msg).send().await {
303                    info!("Error sending msg to {node}: {e:?}");
304                }
305                // TODO generate metrics // statsd.incr("outbox_sent");
306            });
307        }
308    }
309    info!("[outbox] exiting");
310    drop(chan_outbox_rx);
311}
312
313// Probe
314
315/// Runs an HTTP[S] probe. Sends datapoints into `chan_outbox_tx`
316async fn probe(
317    interval: Duration,
318    local_node: NodeAddr,
319    endpoint_ipaddr: IpAddr,
320    target_uri: String,
321    fqdn: String,
322    chan_outbox_tx: OutboxSend,
323    cancel_probe: CancellationToken,
324) {
325    // FIXME stagger runs!
326    // TODO make statsd ipaddr/port configurable
327    // TODO export service status in statsd?
328    let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
329
330    // Port number in endpoint_saddr is ignored. Conventionally set to 0.
331    // See https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.resolve
332    let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
333
334    // TODO configure timeouts
335    info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
336    let client_ = Reqw::builder()
337        .user_agent("rrdnsd")
338        .connect_timeout(Duration::new(5, 0))
339        .timeout(Duration::new(5, 0))
340        .tcp_keepalive(Duration::new(15, 0))
341        .redirect(Policy::none())
342        .resolve(&fqdn, endpoint_saddr)
343        .build();
344    let client = unwrap_or_return!(client_);
345
346    debug!("[probe] starting");
347    let mut ticker = time::interval(interval);
348    loop {
349        if cancel_probe.is_cancelled() {
350            info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
351            return;
352        }
353        let t0 = Instant::now();
354        ticker.tick().await;
355        let t1 = Instant::now();
356
357        debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
358        let resp = client.get(&target_uri).send().await;
359        let success = resp.is_ok();
360        // statsd metric `probe.duration` - internal
361        timer(&statsd, "probe.duration", t0);
362        if success {
363            // statsd metric `probe.success.cnt` - Successful service probes count
364            statsd.incr("probe.success.cnt");
365        } else {
366            // statsd metric `probe.failure.cnt` - Failed service probes count
367            statsd.incr("probe.failure.cnt");
368        }
369        let msg = QuorumMsg {
370            format_version: 1,
371            fqdn: fqdn.clone(),
372            ipaddr: endpoint_ipaddr,
373            status: if success { Status::Up } else { Status::Down },
374            tstamp: SystemTime::now(),
375            node_addr: local_node,
376        };
377        // debug!("[probe] enqueing msg to send to nodes");
378        // TODO send datapoint to myself using channel instead of HTTP
379        // let x = exp!(serde_json::to_string(&msg));
380        if chan_outbox_tx.send(msg).await.is_err() {
381            info!("[probe] send error");
382            drop(chan_outbox_tx);
383            return;
384        }
385
386        let loop_elapsed = t0.elapsed();
387        let probing = t1.elapsed();
388        // statsd metric `probe_time_msec` - Service probing elapsed time in milliseconds
389        statsd.timer("probe_time_msec", probing.as_millis() as f64);
390        let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
391        // statsd metric `probe.load_factor` - debugging
392        statsd.histogram("probe.load_factor", lf);
393    }
394}
395
396/// Parses an `<ipaddr>:<port>` string
397fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
398    let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
399    let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
400    let port = port_str.parse::<u16>().context("Invalid port number")?;
401    Ok(NodeAddr { ipa, port })
402}
403
404/// Generates a config/script file for nsupdate to add or delete a record
405fn create_nsupdate_blob(
406    resolver_addr: NodeAddr,
407    fqdn: &str,
408    zone: &str,
409    ttl: u32,
410    endpoint_ipaddr: &IpAddr,
411    status: Status,
412) -> String {
413    let record_type = if endpoint_ipaddr.is_ipv4() {
414        "A"
415    } else {
416        "AAAA"
417    };
418
419    let operation = match status {
420        Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
421        Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
422        Status::Unknown => "exit".to_string(),
423    };
424    format!(
425        "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
426        ipa = resolver_addr.ipa,
427        port = resolver_addr.port,
428        zone = zone,
429        ttl = ttl,
430        op = operation,
431    )
432}
433
434/// Updates resolver[s] using nsupdate or knsupdate
435async fn run_nsupdates(
436    update_method: &ResolverUpdateMethod,
437    resolvers: &Vec<String>,
438    srv: &ServiceStatus,
439    fqdn: &str,
440    endpoint_ipaddr: &IpAddr,
441) -> Result<()> {
442    // TODO pass status from caller
443    let endpoint_that_changed = srv
444        .endpoint_statuses
445        .get(endpoint_ipaddr)
446        .context("Endpoint not found")?;
447    let status = endpoint_that_changed.computed_status;
448
449    let cmd = match update_method {
450        ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
451        ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
452        _ => bail!("Incorrect function call"),
453    };
454
455    for resolver in resolvers {
456        let nsupdate_blob = create_nsupdate_blob(
457            parse_ipaddr_port(resolver)?,
458            fqdn,
459            &srv.dns_zone,
460            srv.dns_ttl,
461            endpoint_ipaddr,
462            status,
463        );
464        run_nsupdate_once(cmd, nsupdate_blob).await?;
465    }
466    Ok(())
467}
468
469/// Write configuration file and execute [k]nsupdate
470#[cfg(not(test))]
471async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
472    use tokio::process::Command;
473    let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
474    let conf_fn = tmpfile.path();
475    fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
476    info!(
477        "[dns] calling {} {} with commands:\n{}",
478        cmd,
479        conf_fn.display(),
480        conf
481    );
482    let out = Command::new(cmd).arg(conf_fn).output().await;
483    match out {
484        Ok(_) => info!("[dns] success"),
485        Err(_) => warn!("[dns] failure"),
486    }
487    Ok(())
488}
489
490#[cfg(test)]
491#[allow(clippy::unused_async)]
492async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
493    info!("--cmd--{cmd}--conf--\n{conf}--");
494    Ok(())
495}
496
497// Logic
498
499/// Calculates quorum value e.g. 5 -> 3, 3 -> 1
500#[must_use]
501const fn quorum(n: u16) -> u16 {
502    n.div_ceil(2)
503}
504
505/// Computes endpoint status using quorum (majority)
506#[must_use]
507#[allow(clippy::cast_possible_truncation)]
508#[allow(clippy::match_same_arms)]
509fn compute_status(endp_statuses: &NodeToEnst) -> Status {
510    if endp_statuses.is_empty() {
511        return Status::Unknown;
512    }
513    let q = quorum(endp_statuses.len() as u16);
514    let up_cnt = endp_statuses
515        .values()
516        .filter(|ens| ens.status == Status::Up)
517        .count() as u16;
518    let down_cnt = endp_statuses
519        .values()
520        .filter(|ens| ens.status == Status::Down)
521        .count() as u16;
522
523    match (up_cnt >= q, down_cnt >= q) {
524        (true, false) => Status::Up,
525        (false, true) => Status::Down,
526        (true, true) => Status::Up, // possible when the number of nodes is even
527        (false, false) => Status::Unknown, // not enough datapoints yet
528    }
529}
530
531/// Stores a probe datapoint. Returns `true` if a service endpoint change is detected.
532fn ingest_datapoint(end: &mut EndpointStatus, msg: &Update) -> bool {
533    let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
534        warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
535        return false;
536    };
537
538    s.status = msg.status;
539    s.last_update_time = msg.tstamp;
540
541    // Computes updated status
542    let prev_computed_status = end.computed_status;
543    end.computed_status = compute_status(&end.node_to_ens);
544    end.computed_status_time = SystemTime::now();
545
546    if prev_computed_status == Status::Unknown {
547        return false;
548    }
549    if prev_computed_status == end.computed_status {
550        return false;
551    }
552    true
553}
554
555/// This is called when an enpdoint is going down. Decide if we want to
556/// delete its record from DNS or fail open (aka do nothing and keep it around).
557#[must_use]
558fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
559    let up = estatuses
560        .values()
561        .filter(|s| s.computed_status == Status::Up)
562        .count();
563    let down = estatuses
564        .values()
565        .filter(|s| s.computed_status == Status::Down)
566        .count();
567    let unknown = estatuses
568        .values()
569        .filter(|s| s.computed_status == Status::Unknown)
570        .count();
571
572    // Threshold for majority
573    let th = estatuses.len().div_ceil(2);
574    let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
575    {
576        if unknown >= th {
577            info!("{msg}: too many endpoints in unknown state");
578            true
579        } else if down >= th {
580            info!("{msg}: too many endpoints in down state");
581            true
582        } else if up <= th {
583            info!("{msg}: not enough endpoints in up state");
584            true
585        } else {
586            debug!("No {msg}");
587            false
588        }
589    }
590}
591
592/// Runs user-supplied command or script
593async fn run_update_command(
594    script_path: &str,
595    endpoint_ipaddr: IpAddr,
596    fqdn: &str,
597    dns_zone: &str,
598    status: Status,
599) {
600    let args = [
601        &endpoint_ipaddr.to_string(),
602        fqdn,
603        dns_zone,
604        &format!("{status:?}"),
605    ];
606    info!("[script] running with {:?}", &args);
607
608    let mut cmd = tokio::process::Command::new(script_path);
609    cmd.args(args);
610    let Ok(output) = cmd.output().await else {
611        error!("[script] failed to execute '{script_path}'");
612        return;
613    };
614    if !output.status.success() {
615        error!("[script] failed with {}", output.status);
616    }
617    let stderr = String::from_utf8_lossy(&output.stderr);
618    let stdout = String::from_utf8_lossy(&output.stdout);
619    if !stderr.is_empty() {
620        error!("[script] stderr: {stderr}");
621    }
622    if !stdout.is_empty() {
623        info!("[script] output: {stdout}");
624    }
625}
626
627fn run_dynu_update(
628    conf: &Conf,
629    srv: &ServiceStatus,
630    endpoint_ipaddr: IpAddr,
631    fqdn: &str,
632    status: Status,
633) {
634    let client = api_clients::Dynu::new(conf.update_credentials.to_string());
635    let fqdn = fqdn.to_string();
636    let zone = srv.dns_zone.clone();
637
638    tokio::spawn(async move {
639        match status {
640            Status::Up => client.add_record(&fqdn, &zone, endpoint_ipaddr).await,
641            Status::Down => client.delete_record(&fqdn, &zone, endpoint_ipaddr).await,
642            Status::Unknown => {}
643        }
644    });
645}
646
647/// Endpoint status change detected: Update DNS
648async fn handle_endpoint_change(
649    conf: &Conf,
650    srv: &ServiceStatus,
651    endpoint_ipaddr: IpAddr,
652    fqdn: &str,
653) {
654    let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
655        return;
656    };
657    let status = endpoint_that_changed.computed_status;
658    if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
659        // Do not delete the endpoint record
660        return;
661    }
662
663    match &conf.update_method {
664        ResolverUpdateMethod::Dynu => {
665            run_dynu_update(conf, srv, endpoint_ipaddr, fqdn, status);
666        }
667        ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
668            let _ = run_nsupdates(
669                &conf.update_method,
670                &conf.update_resolvers,
671                srv,
672                fqdn,
673                &endpoint_ipaddr,
674            )
675            .await;
676        }
677        ResolverUpdateMethod::Script { path } => {
678            run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
679        }
680    }
681}
682
683// HTTP routes //
684
685async fn fetch_status(chan_update_tx: UpdateSend) -> Result<FqdnToStatuses> {
686    // send a request to updater_task and wait
687    let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
688    let wu = UpWrap::Fetch { chan_dash_tx };
689    chan_update_tx.send(wu).await?;
690    Ok(chan_dash_rx.await?)
691}
692
693async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
694    let resp = fetch_status(shared.chan_update_tx).await;
695    match resp {
696        Err(e) => {
697            error!("dashboard: failed to fetch: {e}");
698            axum::response::Html("Error".into())
699        }
700        Ok(r) => {
701            let dash = dash::render_html_dashboard(&r);
702            axum::response::Html(dash)
703        }
704    }
705}
706
707async fn handle_http_get_health() -> &'static str {
708    "ok"
709}
710
711async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
712    // send to updater
713    let wu = UpWrap::Update {
714        update: Update {
715            fqdn: msg.fqdn.to_string(),
716            ipaddr: msg.ipaddr,
717            status: msg.status,
718            tstamp: msg.tstamp,
719            node_addr: msg.node_addr,
720        },
721    };
722    if shared.chan_update_tx.send(wu).await.is_err() {
723        info!("[http] exiting");
724        // TODO handle error
725    };
726}
727
728// HTTP routes: API //
729
730// API - List endpoints
731
732/// Represents a service under monitoring including the state of each endpoint
733#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
734pub struct APIListItem {
735    pub ipaddrs: HashMap<IpAddr, Status>,
736    dns_ttl: u32,
737    dns_zone: String,
738    probe_interval_ms: u128,
739    probe_target_url: String,
740}
741
742pub type APIListOut = BTreeMap<String, APIListItem>;
743
744/// Lists monitored services. Includes the current state of each endpoint
745async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
746    let resp = fetch_status(shared.chan_update_tx).await.unwrap();
747
748    let out = resp
749        .into_iter()
750        .map(|(fqdn, srvs)| {
751            // collect ipaddr -> status hashmap
752            let ipaddrs = srvs
753                .endpoint_statuses
754                .into_iter()
755                .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
756                .collect();
757
758            (
759                fqdn,
760                APIListItem {
761                    ipaddrs,
762                    dns_ttl: srvs.dns_ttl,
763                    dns_zone: srvs.dns_zone,
764                    probe_interval_ms: srvs.probe_interval.as_millis(),
765                    probe_target_url: srvs.probe_target_url,
766                },
767            )
768        })
769        .collect();
770
771    Json(out)
772}
773
774// API - Update endpoints
775
776/// Struct sent to the API to modify the running configuration: add or remove services or endpoints
777#[derive(Debug, Serialize, Deserialize, Clone)]
778#[serde(tag = "method", rename_all = "snake_case")]
779pub enum APIRPCUpdateAction {
780    CreateService {
781        fqdn: String,
782        healthcheck: String,
783        ttl: u32,
784        zone: String,
785        probe_interval_ms: u64,
786    },
787    DeleteService {
788        fqdn: String,
789    },
790    AddIpAddr {
791        fqdn: String,
792        ipaddr: IpAddr,
793    },
794    DeleteIpAddr {
795        fqdn: String,
796        ipaddr: IpAddr,
797    },
798}
799
800#[derive(Deserialize, Serialize, Debug, Clone)]
801pub struct APIChangeRequest {
802    pub actions: Vec<APIRPCUpdateAction>,
803}
804
805#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
806#[serde(tag = "status", rename_all = "lowercase")]
807pub enum APIChangeResult {
808    Ok,
809    Error { error: String },
810}
811
812#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
813pub struct APIChangeResponse {
814    results: Vec<APIChangeResult>,
815}
816
817impl APIChangeResponse {
818    pub fn all_ok(&self) -> bool {
819        self.results
820            .iter()
821            .all(|r| matches!(r, APIChangeResult::Ok))
822    }
823}
824
825// HTTP API receives an update request and sends it into chan_update_tx
826async fn handle_http_api_update(
827    State(shared): SharedState,
828    axum::Json(req): Json<APIChangeRequest>,
829) -> axum::Json<APIChangeResponse> {
830    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
831    let wu = UpWrap::APIChangeRequestWrap {
832        request: req,
833        resp_chan_tx,
834    };
835    debug!("sending API posnt into chan_update_tx");
836    let _ = shared.chan_update_tx.send(wu).await;
837    debug!("waiting from resp_chan_rx");
838    let resp = resp_chan_rx
839        .await
840        .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
841
842    debug!("received a reply from resp_chan_rx");
843    Json(resp)
844}
845
846// end of API routes
847
848fn create_router(chan_update_tx: UpdateSend, api_enabled: bool) -> axum::Router {
849    let r = axum::Router::new()
850        .route("/dash", axum::routing::get(handle_http_get_dashboard))
851        .route("/health", axum::routing::get(handle_http_get_health))
852        .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
853
854    if api_enabled {
855        r.route(
856            "/api/v0/list_endpoints",
857            axum::routing::get(handle_http_api_list),
858        )
859        .route(
860            "/api/v0/update_endpoints",
861            axum::routing::post(handle_http_api_update),
862        )
863    } else {
864        r
865    }
866    .with_state(Shared { chan_update_tx })
867}
868
869async fn wait_for_shutdown(token: CancellationToken) {
870    token.cancelled().await;
871    debug!("Shutdown token triggered. Axum server exiting.");
872}
873
874async fn run_axum_server(
875    http_listener: tokio::net::TcpListener,
876    http_router: axum::Router,
877    shutdown_token: CancellationToken,
878) {
879    if let Err(e) = axum::serve(http_listener, http_router)
880        .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
881        .await
882    {
883        error!("Axum server failed: {e}");
884        process::exit(1);
885    }
886}
887
888// init section //
889pub async fn init(
890    conf_fn: &str,
891    local_node: Option<String>,
892    tasks: &mut Vec<tokio::task::JoinHandle<()>>,
893) -> Result<CancellationToken> {
894    // Load and parse conf
895    let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
896        error!("Failed to load config: {err}");
897        process::exit(1)
898    });
899
900    // Init
901    let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
902    let service_statuses = HashMap::new();
903
904    // Setup channels
905    let (chan_update_tx, chan_update_rx): (UpdateSend, UpdateRec) = mpsc::channel(10);
906    let (chan_outbox_tx, chan_outbox_rx): (OutboxSend, OutboxRec) = mpsc::channel(100);
907
908    // Outbox sender
909    let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
910    tasks.push(tokio::spawn(sender));
911
912    // Updater
913    let updater_t = core::updater(
914        statsd,
915        conf.clone(),
916        service_statuses,
917        chan_update_rx,
918        chan_outbox_tx,
919    );
920    tasks.push(tokio::spawn(updater_t));
921
922    time::sleep(Duration::from_millis(100)).await;
923
924    // Create cancellation token
925    let axum_shutdown = CancellationToken::new();
926    let shutdown_token_child = axum_shutdown.child_token();
927
928    // HTTP receiver setup
929    debug!("creating listener");
930    let http_router = create_router(chan_update_tx.clone(), conf.api.enabled);
931    let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
932
933    // Spawn Axum server
934    debug!("starting axum");
935    let fut = tokio::spawn(run_axum_server(
936        http_listener,
937        http_router,
938        shutdown_token_child,
939    ));
940
941    tasks.push(fut);
942
943    // Scan config to setup services and spawn probes
944    debug!("starting probes");
945    setup_and_spawn_probes(conf, chan_update_tx).await?;
946    debug!("starting probes: done");
947
948    Ok(axum_shutdown)
949}
950
951pub async fn run() -> Result<()> {
952    // Setup logging and metrics
953    JournalLog::new()
954        .context("Failed to create JournalLog")?
955        .install()
956        .context("Failed to setup journald logging")?;
957
958    // TODO: configurable loglevel
959    log::set_max_level(LevelFilter::Info);
960    info!("[main] starting");
961
962    // Start signal handlers
963    let mut tasks = Vec::new();
964    for s in [SignalKind::interrupt(), SignalKind::hangup()] {
965        tasks.push(tokio::spawn(handle_sig(s)));
966    }
967
968    let default_conf_fn = "/etc/rrdnsd.json";
969    let conf_fn = std::env::var("CONF").ok();
970    let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
971
972    let local_node = std::env::var("LOCAL_NODE").ok();
973
974    init(conf_fn, local_node, &mut tasks).await?;
975
976    join_all(tasks).await;
977    Ok(())
978}
979
980/// Setup services from conf file and spawn related probes
981async fn setup_and_spawn_probes(conf: Conf, chan_update_tx: Sender<UpWrap>) -> Result<()> {
982    for service in &conf.services {
983        let probe_interval_ms = service
984            .probe_interval_ms
985            .or(conf.probe_interval_ms)
986            .unwrap_or(1000);
987
988        let request = APIChangeRequest {
989            actions: vec![APIRPCUpdateAction::CreateService {
990                fqdn: service.fqdn.clone(),
991                healthcheck: service.healthcheck.clone(),
992                ttl: service.ttl,
993                zone: service.zone.clone(),
994                probe_interval_ms,
995            }],
996        };
997        send_api_change_request_then_check_resp(&chan_update_tx, request).await?;
998
999        for ipaddr_str in &service.ipaddrs {
1000            let request = APIChangeRequest {
1001                actions: vec![APIRPCUpdateAction::AddIpAddr {
1002                    fqdn: service.fqdn.clone(),
1003                    ipaddr: ipaddr_str.parse()?,
1004                }],
1005            };
1006            send_api_change_request_then_check_resp(&chan_update_tx, request).await?;
1007        }
1008    }
1009    Ok(())
1010}
1011
1012async fn send_api_change_request_then_check_resp(
1013    chan_update_tx: &Sender<UpWrap>,
1014    request: APIChangeRequest,
1015) -> Result<(), anyhow::Error> {
1016    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1017    let msg = UpWrap::APIChangeRequestWrap {
1018        request: request.clone(),
1019        resp_chan_tx,
1020    };
1021    chan_update_tx.send(msg).await?;
1022    let resp = resp_chan_rx.await?;
1023
1024    for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1025        if let APIChangeResult::Error { error } = result {
1026            error!("Action {action:?} failed: {error}");
1027        }
1028    }
1029
1030    Ok(())
1031}