rrdnsd/
lib.rs

1// rrdnsd - monitoring and failover for Round Robin DNS records
2// Copyright 2024-2025 Federico Ceratto <federico@debian.org>
3// Released under AGPLv3
4
5use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8// TODO: switch to tracing?
9use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; // TODO: tokio exit?
19use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod core;
31mod dash;
32mod providers;
33
34use crate::providers::SetProvider;
35
36#[cfg(test)]
37mod tests;
38
39const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
42pub enum Status {
43    Unknown,
44    Down,
45    Up,
46}
47
48#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
49struct Enst {
50    status: Status,
51    last_update_time: SystemTime,
52}
53
54// Used to exchange message with peers
55#[derive(Debug, Serialize, Deserialize, Clone)]
56struct QuorumMsg {
57    format_version: u8,
58    fqdn: String,
59    ipaddr: IpAddr,
60    status: Status,
61    tstamp: SystemTime,
62    node_addr: NodeAddr,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
66struct NodeAddr {
67    ipa: IpAddr,
68    port: u16,
69}
70
71/*
72Arrows represent hashmaps:
73
74ServiceStatus
75 computed_status: Status
76 computed_status_time: SystemTime
77 dns_ttl: u32
78 dns_zone: String
79 fqdn: String
80 endpoint_statuses[IpAddr] -> EndpointStatus
81                                computed_status: Status
82                                computed_status_time: SystemTime
83                                node_to_ens[node_name] -> Enst
84                                                            status: Status
85                                                            last_update_time: SystemTime
86*/
87
88type NodeToEnst = HashMap<NodeAddr, Enst>;
89
90#[derive(Debug, Clone)]
91struct EndpointStatus {
92    node_to_ens: NodeToEnst,
93    computed_status: Status,
94    computed_status_time: SystemTime,
95    probe_cancel: tokio_util::sync::CancellationToken,
96}
97
98impl Drop for EndpointStatus {
99    // send cancellation token to the related probe task
100    // when an endpoint or the whole service are deleted
101    fn drop(&mut self) {
102        self.probe_cancel.cancel();
103    }
104}
105
106type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
107
108#[derive(Debug, Clone)]
109struct ServiceStatus {
110    dns_ttl: u32,
111    dns_zone: String,
112    endpoint_statuses: EndpointStatuses,
113    probe_interval: Duration,
114    probe_target_url: String,
115}
116
117type FqdnToStatuses = HashMap<String, ServiceStatus>;
118
119/// Internal message to the core reactor
120#[derive(Debug)]
121enum CoreMsg {
122    Datapoint {
123        datapoint: Datapoint,
124    },
125    Fetch {
126        chan_dash_tx: ServiceStatusCloneSend,
127    },
128    APIChangeRequestWrap {
129        request: APIChangeRequest,
130        resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
131    },
132}
133
134/// Monitoring datapoint received from or set to other nodes
135#[derive(Debug)]
136struct Datapoint {
137    fqdn: String,
138    ipaddr: IpAddr,
139    status: Status,
140    tstamp: SystemTime,
141    node_addr: NodeAddr,
142}
143
144// Channels
145type ServiceStatusCloneSend = tokio::sync::oneshot::Sender<FqdnToStatuses>;
146type CoreSend = Sender<CoreMsg>;
147type CoreRec = Receiver<CoreMsg>;
148type OutboxSend = Sender<QuorumMsg>;
149type OutboxRec = Receiver<QuorumMsg>;
150// TODO: harmonize send/rec tx/rx naming
151
152#[derive(Debug, Clone)]
153struct Shared {
154    chan_core_tx: CoreSend,
155}
156type SharedState = axum::extract::State<Shared>;
157
158macro_rules! unwrap_or_return {
159    ($res:expr) => {
160        match $res {
161            Ok(val) => val,
162            Err(e) => {
163                error!("error: {}", e);
164                return;
165            }
166        }
167    };
168}
169
170// Like expect("..")
171macro_rules! exp {
172    ($res:expr) => {
173        match $res {
174            Ok(val) => val,
175            Err(e) => {
176                error!("{:?}", e);
177                process::exit(1);
178            }
179        }
180    };
181}
182
183// UNIX signal handlers
184
185/// Shuts down the daemon when requested
186async fn handle_sig(sig: SignalKind) {
187    exp!(signal(sig)).recv().await;
188    info!("[main] exiting");
189    process::exit(0);
190}
191
192/// Statsd timer
193fn timer(statsd: &Statsd, name: &str, t0: Instant) {
194    let delta = t0.elapsed().as_millis() as f64;
195    statsd.timer(name, delta);
196}
197
198// Configuration handling //
199
200/// Service-specific configuration block
201#[derive(Deserialize, Debug, Clone)]
202struct ServiceConf {
203    fqdn: String,
204    healthcheck: String,
205    ipaddrs: Vec<String>,
206    ttl: u32,
207    zone: String,
208    probe_interval_ms: Option<u64>,
209}
210
211/// API configuration block
212
213#[derive(Debug, Deserialize, Clone)]
214struct APIConfig {
215    enabled: bool,
216}
217
218/// How to send an update to the DNS resolver e.g. nsupdate
219#[derive(Deserialize, Debug, Clone)]
220#[serde(rename_all = "lowercase")]
221enum ResolverUpdateMethod {
222    Dynu,
223    Dynv6,
224    Nsupdate,
225    Knsupdate,
226    Script { path: String },
227}
228
229/// Configuration
230#[derive(Deserialize, Debug, Clone)]
231struct Conf {
232    conf_version: u8,
233    local_node: String,
234    #[serde(skip_deserializing)]
235    parsed_local_node: Option<NodeAddr>,
236    nodes: Vec<String>,
237    probe_interval_ms: Option<u64>,
238    #[serde(skip_deserializing)]
239    parsed_nodes: Vec<NodeAddr>,
240    services: Vec<ServiceConf>,
241    update_method: ResolverUpdateMethod,
242    update_resolvers: Vec<String>,
243    // TODO: parsed_resolvers
244    update_credentials: String,
245    enable_fail_open: bool,
246    api: APIConfig,
247}
248
249fn trim_dots(s: &str) -> String {
250    s.trim_start_matches('.').trim_end_matches('.').to_owned()
251}
252
253fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
254    let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
255    debug!("[main] conf loaded");
256    if conf.conf_version != 1 {
257        bail!("Invalid configuration format version");
258    }
259    if let Some(ln) = local_node {
260        conf.local_node = ln;
261    }
262    conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
263    conf.parsed_nodes = conf
264        .nodes
265        .iter()
266        .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
267        .collect();
268    for n in &conf.update_resolvers {
269        parse_ipaddr_port(n)?;
270    }
271    for sv in &mut conf.services {
272        sv.fqdn = trim_dots(&sv.fqdn);
273        sv.zone = trim_dots(&sv.zone);
274        if sv.fqdn.ends_with(&sv.zone) {
275            continue;
276        }
277        bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
278    }
279    Ok(conf)
280}
281
282/// Loads configuration from file. Supports overriding the filename and local node name.
283fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
284    info!("[load_conf] reading {conf_fn}");
285    let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
286    prepare_conf(&jc, local_node)
287}
288
289/// Outbox: receives from `chan_outbox_rx` and send updates to peers
290async fn outbox_sender(mut chan_outbox_rx: OutboxRec, nodes: Vec<String>) {
291    // let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
292    let client_ = Reqw::builder()
293        .user_agent("rrdnsd")
294        .connect_timeout(Duration::new(5, 0))
295        .timeout(Duration::new(5, 0))
296        .tcp_keepalive(Duration::new(15, 0))
297        .build();
298    let client = unwrap_or_return!(client_);
299
300    while let Some(msg) = chan_outbox_rx.recv().await {
301        for node in &nodes {
302            let client = client.clone();
303            let msg = msg.clone();
304            let url = format!("http://{node}/qmsg");
305            let node = node.clone();
306            tokio::spawn(async move {
307                if let Err(e) = client.post(&url).json(&msg).send().await {
308                    info!("Error sending msg to {node}: {e:?}");
309                }
310                // TODO generate metrics // statsd.incr("outbox_sent");
311            });
312        }
313    }
314    info!("[outbox] exiting");
315    drop(chan_outbox_rx);
316}
317
318// Probe
319
320/// Runs an HTTP[S] probe. Sends datapoints into `chan_outbox_tx`
321async fn probe(
322    interval: Duration,
323    local_node: NodeAddr,
324    endpoint_ipaddr: IpAddr,
325    target_uri: String,
326    fqdn: String,
327    chan_outbox_tx: OutboxSend,
328    cancel_probe: CancellationToken,
329) {
330    // FIXME stagger runs!
331    // TODO make statsd ipaddr/port configurable
332    // TODO export service status in statsd?
333    let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
334
335    // Port number in endpoint_saddr is ignored. Conventionally set to 0.
336    // See https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.resolve
337    let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
338
339    // TODO configure timeouts
340    info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
341    let client_ = Reqw::builder()
342        .user_agent("rrdnsd")
343        .connect_timeout(Duration::new(5, 0))
344        .timeout(Duration::new(5, 0))
345        .tcp_keepalive(Duration::new(15, 0))
346        .redirect(Policy::none())
347        .resolve(&fqdn, endpoint_saddr)
348        .build();
349    let client = unwrap_or_return!(client_);
350
351    debug!("[probe] starting");
352    let mut ticker = time::interval(interval);
353    loop {
354        if cancel_probe.is_cancelled() {
355            info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
356            return;
357        }
358        let t0 = Instant::now();
359        ticker.tick().await;
360        let t1 = Instant::now();
361
362        debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
363        let resp = client.get(&target_uri).send().await;
364        let success = resp.is_ok();
365        // statsd metric `probe.duration` - internal
366        timer(&statsd, "probe.duration", t0);
367        if success {
368            // statsd metric `probe.success.cnt` - Successful service probes count
369            statsd.incr("probe.success.cnt");
370        } else {
371            // statsd metric `probe.failure.cnt` - Failed service probes count
372            statsd.incr("probe.failure.cnt");
373        }
374        let msg = QuorumMsg {
375            format_version: 1,
376            fqdn: fqdn.clone(),
377            ipaddr: endpoint_ipaddr,
378            status: if success { Status::Up } else { Status::Down },
379            tstamp: SystemTime::now(),
380            node_addr: local_node,
381        };
382        // debug!("[probe] enqueing msg to send to nodes");
383        // TODO send datapoint to myself using channel instead of HTTP
384        // let x = exp!(serde_json::to_string(&msg));
385        if chan_outbox_tx.send(msg).await.is_err() {
386            info!("[probe] send error");
387            drop(chan_outbox_tx);
388            return;
389        }
390
391        let loop_elapsed = t0.elapsed();
392        let probing = t1.elapsed();
393        // statsd metric `probe_time_msec` - Service probing elapsed time in milliseconds
394        statsd.timer("probe_time_msec", probing.as_millis() as f64);
395        let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
396        // statsd metric `probe.load_factor` - debugging
397        statsd.histogram("probe.load_factor", lf);
398    }
399}
400
401/// Parses an `<ipaddr>:<port>` string
402fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
403    let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
404    let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
405    let port = port_str.parse::<u16>().context("Invalid port number")?;
406    Ok(NodeAddr { ipa, port })
407}
408
409/// Generates a config/script file for nsupdate to add or delete a record
410fn create_nsupdate_blob(
411    resolver_addr: NodeAddr,
412    fqdn: &str,
413    zone: &str,
414    ttl: u32,
415    endpoint_ipaddr: &IpAddr,
416    status: Status,
417) -> String {
418    let record_type = if endpoint_ipaddr.is_ipv4() {
419        "A"
420    } else {
421        "AAAA"
422    };
423
424    let operation = match status {
425        Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
426        Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
427        Status::Unknown => "exit".to_string(),
428    };
429    format!(
430        "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
431        ipa = resolver_addr.ipa,
432        port = resolver_addr.port,
433        zone = zone,
434        ttl = ttl,
435        op = operation,
436    )
437}
438
439/// Updates resolver[s] using nsupdate or knsupdate
440async fn run_nsupdates(
441    update_method: &ResolverUpdateMethod,
442    resolvers: &Vec<String>,
443    srv: &ServiceStatus,
444    fqdn: &str,
445    endpoint_ipaddr: &IpAddr,
446) -> Result<()> {
447    // TODO pass status from caller
448    let endpoint_that_changed = srv
449        .endpoint_statuses
450        .get(endpoint_ipaddr)
451        .context("Endpoint not found")?;
452    let status = endpoint_that_changed.computed_status;
453
454    let cmd = match update_method {
455        ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
456        ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
457        _ => bail!("Incorrect function call"),
458    };
459
460    for resolver in resolvers {
461        let nsupdate_blob = create_nsupdate_blob(
462            parse_ipaddr_port(resolver)?,
463            fqdn,
464            &srv.dns_zone,
465            srv.dns_ttl,
466            endpoint_ipaddr,
467            status,
468        );
469        run_nsupdate_once(cmd, nsupdate_blob).await?;
470    }
471    Ok(())
472}
473
474/// Write configuration file and execute [k]nsupdate
475#[cfg(not(test))]
476async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
477    use tokio::process::Command;
478    let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
479    let conf_fn = tmpfile.path();
480    fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
481    info!(
482        "[dns] calling {} {} with commands:\n{}",
483        cmd,
484        conf_fn.display(),
485        conf
486    );
487    let out = Command::new(cmd).arg(conf_fn).output().await;
488    match out {
489        Ok(_) => info!("[dns] success"),
490        Err(_) => warn!("[dns] failure"),
491    }
492    Ok(())
493}
494
495#[cfg(test)]
496#[allow(clippy::unused_async)]
497async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
498    info!("--cmd--{cmd}--conf--\n{conf}--");
499    Ok(())
500}
501
502// Logic
503
504/// Calculates quorum value e.g. 5 -> 3, 3 -> 1
505#[must_use]
506const fn quorum(n: u16) -> u16 {
507    n.div_ceil(2)
508}
509
510/// Computes endpoint status using quorum (majority)
511#[must_use]
512#[allow(clippy::cast_possible_truncation)]
513#[allow(clippy::match_same_arms)]
514fn compute_status(endp_statuses: &NodeToEnst) -> Status {
515    if endp_statuses.is_empty() {
516        return Status::Unknown;
517    }
518    let q = quorum(endp_statuses.len() as u16);
519    let up_cnt = endp_statuses
520        .values()
521        .filter(|ens| ens.status == Status::Up)
522        .count() as u16;
523    let down_cnt = endp_statuses
524        .values()
525        .filter(|ens| ens.status == Status::Down)
526        .count() as u16;
527
528    match (up_cnt >= q, down_cnt >= q) {
529        (true, false) => Status::Up,
530        (false, true) => Status::Down,
531        (true, true) => Status::Up, // possible when the number of nodes is even
532        (false, false) => Status::Unknown, // not enough datapoints yet
533    }
534}
535
536/// Stores a probe datapoint. Returns `true` if a service endpoint change is detected.
537fn ingest_datapoint(end: &mut EndpointStatus, msg: &Datapoint) -> bool {
538    let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
539        warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
540        return false;
541    };
542
543    s.status = msg.status;
544    s.last_update_time = msg.tstamp;
545
546    // Computes updated status
547    let prev_computed_status = end.computed_status;
548    end.computed_status = compute_status(&end.node_to_ens);
549    end.computed_status_time = SystemTime::now();
550
551    if prev_computed_status == Status::Unknown {
552        return false;
553    }
554    if prev_computed_status == end.computed_status {
555        return false;
556    }
557    true
558}
559
560/// This is called when an enpdoint is going down. Decide if we want to
561/// delete its record from DNS or fail open (aka do nothing and keep it around).
562#[must_use]
563fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
564    let up = estatuses
565        .values()
566        .filter(|s| s.computed_status == Status::Up)
567        .count();
568    let down = estatuses
569        .values()
570        .filter(|s| s.computed_status == Status::Down)
571        .count();
572    let unknown = estatuses
573        .values()
574        .filter(|s| s.computed_status == Status::Unknown)
575        .count();
576
577    // Threshold for majority
578    let th = estatuses.len().div_ceil(2);
579    let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
580    {
581        if unknown >= th {
582            info!("{msg}: too many endpoints in unknown state");
583            true
584        } else if down >= th {
585            info!("{msg}: too many endpoints in down state");
586            true
587        } else if up <= th {
588            info!("{msg}: not enough endpoints in up state");
589            true
590        } else {
591            debug!("No {msg}");
592            false
593        }
594    }
595}
596
597/// Runs user-supplied command or script
598async fn run_update_command(
599    script_path: &str,
600    endpoint_ipaddr: IpAddr,
601    fqdn: &str,
602    dns_zone: &str,
603    status: Status,
604) {
605    let args = [
606        &endpoint_ipaddr.to_string(),
607        fqdn,
608        dns_zone,
609        &format!("{status:?}"),
610    ];
611    info!("[script] running with {:?}", &args);
612
613    let mut cmd = tokio::process::Command::new(script_path);
614    cmd.args(args);
615    let Ok(output) = cmd.output().await else {
616        error!("[script] failed to execute '{script_path}'");
617        return;
618    };
619    if !output.status.success() {
620        error!("[script] failed with {}", output.status);
621    }
622    let stderr = String::from_utf8_lossy(&output.stderr);
623    let stdout = String::from_utf8_lossy(&output.stdout);
624    if !stderr.is_empty() {
625        error!("[script] stderr: {stderr}");
626    }
627    if !stdout.is_empty() {
628        info!("[script] output: {stdout}");
629    }
630}
631
632/// Endpoint status change detected: Update DNS
633async fn handle_endpoint_change(
634    conf: &Conf,
635    srv: &ServiceStatus,
636    endpoint_ipaddr: IpAddr,
637    fqdn: &str,
638) -> Result<()> {
639    let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
640        return Ok(());
641    };
642    let status = endpoint_that_changed.computed_status;
643    if status == Status::Unknown {
644        return Ok(());
645    }
646    if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
647        // Do not delete the endpoint record
648        // TODO: log
649        return Ok(());
650    }
651
652    let zone = &srv.dns_zone;
653    match &conf.update_method {
654        // HTTP DNS APIs
655        ResolverUpdateMethod::Dynu => match status {
656            Status::Up => {
657                providers::Dynu::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
658                    .await?;
659            }
660            Status::Down => {
661                providers::Dynu::delete_record(
662                    &conf.update_credentials,
663                    fqdn,
664                    zone,
665                    endpoint_ipaddr,
666                )
667                .await?;
668            }
669            Status::Unknown => {}
670        },
671        ResolverUpdateMethod::Dynv6 => match status {
672            Status::Up => {
673                providers::Dynv6::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
674                    .await?;
675            }
676            Status::Down => {
677                providers::Dynv6::delete_record(
678                    &conf.update_credentials,
679                    fqdn,
680                    zone,
681                    endpoint_ipaddr,
682                )
683                .await?;
684            }
685            Status::Unknown => {}
686        },
687
688        ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
689            run_nsupdates(
690                &conf.update_method,
691                &conf.update_resolvers,
692                srv,
693                fqdn,
694                &endpoint_ipaddr,
695            )
696            .await?;
697        }
698        ResolverUpdateMethod::Script { path } => {
699            run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
700        }
701    }
702    Ok(())
703}
704
705// HTTP routes //
706
707async fn fetch_status(chan_core_tx: CoreSend) -> Result<FqdnToStatuses> {
708    // send a request to core_task and wait
709    let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
710    let wu = CoreMsg::Fetch { chan_dash_tx };
711    chan_core_tx.send(wu).await?;
712    Ok(chan_dash_rx.await?)
713}
714
715async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
716    let resp = fetch_status(shared.chan_core_tx).await;
717    match resp {
718        Err(e) => {
719            error!("dashboard: failed to fetch: {e}");
720            axum::response::Html("Error".into())
721        }
722        Ok(r) => {
723            let dash = dash::render_html_dashboard(&r);
724            axum::response::Html(dash)
725        }
726    }
727}
728
729async fn handle_http_get_health() -> &'static str {
730    "ok"
731}
732
733async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
734    // send to core
735    let wu = CoreMsg::Datapoint {
736        datapoint: Datapoint {
737            fqdn: msg.fqdn.to_string(),
738            ipaddr: msg.ipaddr,
739            status: msg.status,
740            tstamp: msg.tstamp,
741            node_addr: msg.node_addr,
742        },
743    };
744    if shared.chan_core_tx.send(wu).await.is_err() {
745        info!("[http] exiting");
746        // TODO handle error
747    };
748}
749
750// HTTP routes: API //
751
752// API - List endpoints
753
754/// Represents a service under monitoring including the state of each endpoint
755#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
756pub struct APIListItem {
757    pub ipaddrs: HashMap<IpAddr, Status>,
758    dns_ttl: u32,
759    dns_zone: String,
760    probe_interval_ms: u128,
761    probe_target_url: String,
762}
763
764pub type APIListOut = BTreeMap<String, APIListItem>;
765
766/// Lists monitored services. Includes the current state of each endpoint
767async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
768    let resp = fetch_status(shared.chan_core_tx).await.unwrap();
769
770    let out = resp
771        .into_iter()
772        .map(|(fqdn, srvs)| {
773            // collect ipaddr -> status hashmap
774            let ipaddrs = srvs
775                .endpoint_statuses
776                .into_iter()
777                .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
778                .collect();
779
780            (
781                fqdn,
782                APIListItem {
783                    ipaddrs,
784                    dns_ttl: srvs.dns_ttl,
785                    dns_zone: srvs.dns_zone,
786                    probe_interval_ms: srvs.probe_interval.as_millis(),
787                    probe_target_url: srvs.probe_target_url,
788                },
789            )
790        })
791        .collect();
792
793    Json(out)
794}
795
796// API - Update endpoints
797
798/// Struct sent to the API to modify the running configuration: add or remove services or endpoints
799#[derive(Debug, Serialize, Deserialize, Clone)]
800#[serde(tag = "method", rename_all = "snake_case")]
801pub enum APIRPCUpdateAction {
802    CreateService {
803        fqdn: String,
804        healthcheck: String,
805        ttl: u32,
806        zone: String,
807        probe_interval_ms: u64,
808    },
809    DeleteService {
810        fqdn: String,
811    },
812    AddIpAddr {
813        fqdn: String,
814        ipaddr: IpAddr,
815    },
816    DeleteIpAddr {
817        fqdn: String,
818        ipaddr: IpAddr,
819    },
820}
821
822#[derive(Deserialize, Serialize, Debug, Clone)]
823pub struct APIChangeRequest {
824    pub actions: Vec<APIRPCUpdateAction>,
825}
826
827#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
828#[serde(tag = "status", rename_all = "lowercase")]
829pub enum APIChangeResult {
830    Ok,
831    Error { error: String },
832}
833
834#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
835pub struct APIChangeResponse {
836    results: Vec<APIChangeResult>,
837}
838
839impl APIChangeResponse {
840    pub fn all_ok(&self) -> bool {
841        self.results
842            .iter()
843            .all(|r| matches!(r, APIChangeResult::Ok))
844    }
845}
846
847// HTTP API receives an update request and sends it into chan_core_tx
848async fn handle_http_api_update(
849    State(shared): SharedState,
850    axum::Json(req): Json<APIChangeRequest>,
851) -> axum::Json<APIChangeResponse> {
852    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
853    let wu = CoreMsg::APIChangeRequestWrap {
854        request: req,
855        resp_chan_tx,
856    };
857    debug!("sending API posnt into chan_core_tx");
858    let _ = shared.chan_core_tx.send(wu).await;
859    debug!("waiting from resp_chan_rx");
860    let resp = resp_chan_rx
861        .await
862        .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
863
864    debug!("received a reply from resp_chan_rx");
865    Json(resp)
866}
867
868// end of API routes
869
870fn create_router(chan_core_tx: CoreSend, api_enabled: bool) -> axum::Router {
871    let r = axum::Router::new()
872        .route("/dash", axum::routing::get(handle_http_get_dashboard))
873        .route("/health", axum::routing::get(handle_http_get_health))
874        .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
875
876    if api_enabled {
877        r.route(
878            "/api/v0/list_endpoints",
879            axum::routing::get(handle_http_api_list),
880        )
881        .route(
882            "/api/v0/update_endpoints",
883            axum::routing::post(handle_http_api_update),
884        )
885    } else {
886        r
887    }
888    .with_state(Shared { chan_core_tx })
889}
890
891async fn wait_for_shutdown(token: CancellationToken) {
892    token.cancelled().await;
893    debug!("Shutdown token triggered. Axum server exiting.");
894}
895
896async fn run_axum_server(
897    http_listener: tokio::net::TcpListener,
898    http_router: axum::Router,
899    shutdown_token: CancellationToken,
900) {
901    if let Err(e) = axum::serve(http_listener, http_router)
902        .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
903        .await
904    {
905        error!("Axum server failed: {e}");
906        process::exit(1);
907    }
908}
909
910// init section //
911pub async fn init(
912    conf_fn: &str,
913    local_node: Option<String>,
914    tasks: &mut Vec<tokio::task::JoinHandle<()>>,
915) -> Result<CancellationToken> {
916    // Load and parse conf
917    let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
918        error!("Failed to load config: {err}");
919        process::exit(1)
920    });
921
922    // Init
923    let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
924    let service_statuses = HashMap::new();
925
926    // Setup channels
927    let (chan_core_tx, chan_core_rx): (CoreSend, CoreRec) = mpsc::channel(10);
928    let (chan_outbox_tx, chan_outbox_rx): (OutboxSend, OutboxRec) = mpsc::channel(100);
929
930    // Outbox sender
931    let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
932    tasks.push(tokio::spawn(sender));
933
934    // Core reactor
935    let core_t = core::core(
936        statsd,
937        conf.clone(),
938        service_statuses,
939        chan_core_rx,
940        chan_outbox_tx,
941    );
942    tasks.push(tokio::spawn(core_t));
943
944    time::sleep(Duration::from_millis(100)).await;
945
946    // Create cancellation token
947    let axum_shutdown = CancellationToken::new();
948    let shutdown_token_child = axum_shutdown.child_token();
949
950    // HTTP receiver setup
951    debug!("creating listener");
952    let http_router = create_router(chan_core_tx.clone(), conf.api.enabled);
953    let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
954
955    // Spawn Axum server
956    debug!("starting axum");
957    let fut = tokio::spawn(run_axum_server(
958        http_listener,
959        http_router,
960        shutdown_token_child,
961    ));
962
963    tasks.push(fut);
964
965    // Scan config to setup services and spawn probes
966    debug!("starting probes");
967    setup_and_spawn_probes(conf, chan_core_tx).await?;
968    debug!("starting probes: done");
969
970    Ok(axum_shutdown)
971}
972
973pub async fn run() -> Result<()> {
974    // Setup logging and metrics
975    JournalLog::new()
976        .context("Failed to create JournalLog")?
977        .install()
978        .context("Failed to setup journald logging")?;
979
980    // TODO: configurable loglevel
981    log::set_max_level(LevelFilter::Info);
982    info!("[main] starting");
983
984    // Start signal handlers
985    let mut tasks = Vec::new();
986    for s in [SignalKind::interrupt(), SignalKind::hangup()] {
987        tasks.push(tokio::spawn(handle_sig(s)));
988    }
989
990    let default_conf_fn = "/etc/rrdnsd.json";
991    let conf_fn = std::env::var("CONF").ok();
992    let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
993
994    let local_node = std::env::var("LOCAL_NODE").ok();
995
996    init(conf_fn, local_node, &mut tasks).await?;
997
998    join_all(tasks).await;
999    Ok(())
1000}
1001
1002/// Setup services from conf file and spawn related probes
1003async fn setup_and_spawn_probes(conf: Conf, chan_core_tx: Sender<CoreMsg>) -> Result<()> {
1004    for service in &conf.services {
1005        let probe_interval_ms = service
1006            .probe_interval_ms
1007            .or(conf.probe_interval_ms)
1008            .unwrap_or(1000);
1009
1010        let request = APIChangeRequest {
1011            actions: vec![APIRPCUpdateAction::CreateService {
1012                fqdn: service.fqdn.clone(),
1013                healthcheck: service.healthcheck.clone(),
1014                ttl: service.ttl,
1015                zone: service.zone.clone(),
1016                probe_interval_ms,
1017            }],
1018        };
1019        send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1020
1021        for ipaddr_str in &service.ipaddrs {
1022            let request = APIChangeRequest {
1023                actions: vec![APIRPCUpdateAction::AddIpAddr {
1024                    fqdn: service.fqdn.clone(),
1025                    ipaddr: ipaddr_str.parse()?,
1026                }],
1027            };
1028            send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1029        }
1030    }
1031    Ok(())
1032}
1033
1034async fn send_api_change_request_then_check_resp(
1035    chan_core_tx: &Sender<CoreMsg>,
1036    request: APIChangeRequest,
1037) -> Result<(), anyhow::Error> {
1038    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1039    let msg = CoreMsg::APIChangeRequestWrap {
1040        request: request.clone(),
1041        resp_chan_tx,
1042    };
1043    chan_core_tx.send(msg).await?;
1044    let resp = resp_chan_rx.await?;
1045
1046    for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1047        if let APIChangeResult::Error { error } = result {
1048            error!("Action {action:?} failed: {error}");
1049        }
1050    }
1051
1052    Ok(())
1053}