rrdnsd/
lib.rs

1// rrdnsd - monitoring and failover for Round Robin DNS records
2// Copyright 2024-2025 Federico Ceratto <federico@debian.org>
3// Released under AGPLv3
4
5use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8// TODO: switch to tracing?
9use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; // TODO: tokio exit?
19use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod core;
31mod dash;
32mod providers;
33
34use crate::providers::SetProvider;
35
36#[cfg(test)]
37mod tests;
38
39const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
42pub enum Status {
43    Unknown,
44    Down,
45    Up,
46}
47
48#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
49struct Enst {
50    status: Status,
51    last_update_time: SystemTime,
52}
53
54/// Used to exchange message with peers
55// TODO: dudupe QuorumMsg vs Datapoint
56#[derive(Debug, Serialize, Deserialize, Clone)]
57struct QuorumMsg {
58    format_version: u8,
59    fqdn: String,
60    ipaddr: IpAddr,
61    status: Status,
62    tstamp: SystemTime,
63    node_addr: NodeAddr,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
67struct NodeAddr {
68    ipa: IpAddr,
69    port: u16,
70}
71
72/*
73Arrows represent hashmaps:
74
75ServiceStatus
76 computed_status: Status
77 computed_status_time: SystemTime
78 dns_ttl: u32
79 dns_zone: String
80 fqdn: String
81 endpoint_statuses[IpAddr] -> EndpointStatus
82                                computed_status: Status
83                                computed_status_time: SystemTime
84                                node_to_ens[node_name] -> Enst
85                                                            status: Status
86                                                            last_update_time: SystemTime
87*/
88
89type NodeToEnst = HashMap<NodeAddr, Enst>;
90
91#[derive(Debug, Clone)]
92struct EndpointStatus {
93    node_to_ens: NodeToEnst,
94    computed_status: Status,
95    computed_status_time: SystemTime,
96    probe_cancel: tokio_util::sync::CancellationToken,
97}
98
99type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
100
101#[derive(Debug, Clone)]
102struct ServiceStatus {
103    dns_ttl: u32,
104    dns_zone: String,
105    endpoint_statuses: EndpointStatuses,
106    probe_interval: Duration,
107    probe_target_url: String,
108}
109
110type FqdnToStatuses = HashMap<String, ServiceStatus>;
111
112/// Internal message to the core reactor
113#[derive(Debug)]
114enum CoreMsg {
115    Datapoint {
116        datapoint: Datapoint,
117    },
118    Fetch {
119        chan_dash_tx: DashTx,
120    },
121    APIChangeRequestWrap {
122        request: APIChangeRequest,
123        resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
124    },
125}
126
127/// Monitoring datapoint received from or set to other nodes
128#[derive(Debug)]
129struct Datapoint {
130    fqdn: String,
131    ipaddr: IpAddr,
132    status: Status,
133    tstamp: SystemTime,
134    node_addr: NodeAddr,
135}
136
137// Channels
138type DashTx = tokio::sync::oneshot::Sender<FqdnToStatuses>; // chan_dash_tx
139type CoreTx = Sender<CoreMsg>; // chan_core_tx
140type CoreRx = Receiver<CoreMsg>; // chan_core_rx
141type OutboxTx = Sender<QuorumMsg>; // chan_outbox_tx
142type OutboxRx = Receiver<QuorumMsg>; // chan_outbox_rx
143
144#[derive(Debug, Clone)]
145struct Shared {
146    chan_core_tx: CoreTx,
147}
148type SharedState = axum::extract::State<Shared>;
149
150macro_rules! unwrap_or_return {
151    ($res:expr) => {
152        match $res {
153            Ok(val) => val,
154            Err(e) => {
155                error!("error: {}", e);
156                return;
157            }
158        }
159    };
160}
161
162// Like expect("..")
163macro_rules! exp {
164    ($res:expr) => {
165        match $res {
166            Ok(val) => val,
167            Err(e) => {
168                error!("{:?}", e);
169                process::exit(1);
170            }
171        }
172    };
173}
174
175// UNIX signal handlers
176
177/// Shuts down the daemon when requested
178async fn handle_sig(sig: SignalKind) {
179    exp!(signal(sig)).recv().await;
180    info!("[main] exiting");
181    process::exit(0);
182}
183
184/// Statsd timer
185fn timer(statsd: &Statsd, name: &str, t0: Instant) {
186    let delta = t0.elapsed().as_millis() as f64;
187    statsd.timer(name, delta);
188}
189
190// Configuration handling //
191
192/// Service-specific configuration block
193#[derive(Deserialize, Debug, Clone)]
194struct ServiceConf {
195    fqdn: String,
196    healthcheck: String,
197    ipaddrs: Vec<String>,
198    ttl: u32,
199    zone: String,
200    probe_interval_ms: Option<u64>,
201}
202
203/// API configuration block
204
205#[derive(Debug, Deserialize, Clone)]
206struct APIConfig {
207    enabled: bool,
208}
209
210/// How to send an update to the DNS resolver e.g. nsupdate
211#[derive(Deserialize, Debug, Clone)]
212#[serde(rename_all = "lowercase")]
213enum ResolverUpdateMethod {
214    Dynu,
215    Dynv6,
216    Nsupdate,
217    Knsupdate,
218    Script { path: String },
219}
220
221/// Configuration
222#[derive(Deserialize, Debug, Clone)]
223struct Conf {
224    conf_version: u8,
225    local_node: String,
226    #[serde(skip_deserializing)]
227    parsed_local_node: Option<NodeAddr>,
228    nodes: Vec<String>,
229    probe_interval_ms: Option<u64>,
230    #[serde(skip_deserializing)]
231    parsed_nodes: Vec<NodeAddr>,
232    services: Vec<ServiceConf>,
233    update_method: ResolverUpdateMethod,
234    update_resolvers: Vec<String>,
235    // TODO: parsed_resolvers
236    update_credentials: String,
237    enable_fail_open: bool,
238    api: APIConfig,
239}
240
241fn trim_dots(s: &str) -> String {
242    s.trim_start_matches('.').trim_end_matches('.').to_owned()
243}
244
245fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
246    let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
247    debug!("[main] conf loaded");
248    if conf.conf_version != 1 {
249        bail!("Invalid configuration format version");
250    }
251    if let Some(ln) = local_node {
252        conf.local_node = ln;
253    }
254    conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
255    conf.parsed_nodes = conf
256        .nodes
257        .iter()
258        .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
259        .collect();
260    for n in &conf.update_resolvers {
261        parse_ipaddr_port(n)?;
262    }
263    for sv in &mut conf.services {
264        sv.fqdn = trim_dots(&sv.fqdn);
265        sv.zone = trim_dots(&sv.zone);
266        if sv.fqdn.ends_with(&sv.zone) {
267            continue;
268        }
269        bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
270    }
271    Ok(conf)
272}
273
274/// Loads configuration from file. Supports overriding the filename and local node name.
275fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
276    info!("[load_conf] reading {conf_fn}");
277    let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
278    prepare_conf(&jc, local_node)
279}
280
281/// Outbox: receives from `chan_outbox_rx` and send updates to peers
282async fn outbox_sender(mut chan_outbox_rx: OutboxRx, nodes: Vec<String>) {
283    // let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
284    // TODO reuse connections
285    let client_ = Reqw::builder()
286        .connect_timeout(Duration::new(5, 0))
287        .pool_max_idle_per_host(9)
288        .tcp_keepalive(Duration::new(15, 0))
289        .timeout(Duration::new(5, 0))
290        .user_agent("rrdnsd")
291        .build();
292    let client = unwrap_or_return!(client_);
293
294    while let Some(msg) = chan_outbox_rx.recv().await {
295        for node in &nodes {
296            let client = client.clone();
297            let msg = msg.clone();
298            let url = format!("http://{node}/qmsg");
299            let node = node.clone();
300            tokio::spawn(async move {
301                if let Err(e) = client.post(&url).json(&msg).send().await {
302                    info!("Error sending msg to {node}: {e:?}");
303                }
304                // TODO generate metrics // statsd.incr("outbox_sent");
305            });
306        }
307    }
308    info!("[outbox] exiting");
309    drop(chan_outbox_rx);
310}
311
312// Probe
313
314/// Runs an HTTP[S] probe. Sends datapoints into `chan_outbox_tx`
315async fn probe(
316    interval: Duration,
317    local_node: NodeAddr,
318    endpoint_ipaddr: IpAddr,
319    target_uri: String,
320    fqdn: String,
321    chan_outbox_tx: OutboxTx,
322    cancel_probe: CancellationToken,
323) {
324    // FIXME stagger runs!
325    // TODO make statsd ipaddr/port configurable
326    // TODO export service status in statsd?
327    let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
328
329    // Port number in endpoint_saddr is ignored. Conventionally set to 0.
330    // See https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.resolve
331    let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
332
333    // TODO configure timeouts
334    info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
335    let client_ = Reqw::builder()
336        .connect_timeout(Duration::new(5, 0))
337        .pool_max_idle_per_host(2)
338        .redirect(Policy::none())
339        .resolve(&fqdn, endpoint_saddr)
340        .tcp_keepalive(Duration::new(15, 0))
341        .timeout(Duration::new(5, 0))
342        .user_agent("rrdnsd")
343        .build();
344    let client = unwrap_or_return!(client_);
345
346    debug!("[probe] starting");
347    let mut ticker = time::interval(interval);
348    loop {
349        if cancel_probe.is_cancelled() {
350            info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
351            return;
352        }
353        let t0 = Instant::now();
354        ticker.tick().await;
355        let t1 = Instant::now();
356
357        debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
358        let resp = client.get(&target_uri).send().await;
359        let success = resp.is_ok();
360        // statsd metric `probe.duration` - internal
361        timer(&statsd, "probe.duration", t0);
362        if success {
363            // statsd metric `probe.success.cnt` - Successful service probes count
364            statsd.incr("probe.success.cnt");
365        } else {
366            // statsd metric `probe.failure.cnt` - Failed service probes count
367            statsd.incr("probe.failure.cnt");
368        }
369        let msg = QuorumMsg {
370            format_version: 1,
371            fqdn: fqdn.clone(),
372            ipaddr: endpoint_ipaddr,
373            status: if success { Status::Up } else { Status::Down },
374            tstamp: SystemTime::now(),
375            node_addr: local_node,
376        };
377        // TODO send datapoint to myself using channel instead of HTTP?
378        if chan_outbox_tx.send(msg).await.is_err() {
379            info!("[probe] send error");
380            drop(chan_outbox_tx);
381            return;
382        }
383
384        let loop_elapsed = t0.elapsed();
385        let probing = t1.elapsed();
386        // statsd metric `probe_time_msec` - Service probing elapsed time in milliseconds
387        statsd.timer("probe_time_msec", probing.as_millis() as f64);
388        let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
389        // statsd metric `probe.load_factor` - debugging
390        statsd.histogram("probe.load_factor", lf);
391    }
392}
393
394/// Parses an `<ipaddr>:<port>` string
395fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
396    let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
397    let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
398    let port = port_str.parse::<u16>().context("Invalid port number")?;
399    Ok(NodeAddr { ipa, port })
400}
401
402/// Generates a config/script file for nsupdate to add or delete a record
403fn create_nsupdate_blob(
404    resolver_addr: NodeAddr,
405    fqdn: &str,
406    zone: &str,
407    ttl: u32,
408    endpoint_ipaddr: &IpAddr,
409    status: Status,
410) -> String {
411    let record_type = if endpoint_ipaddr.is_ipv4() {
412        "A"
413    } else {
414        "AAAA"
415    };
416
417    let operation = match status {
418        Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
419        Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
420        Status::Unknown => "exit".to_string(),
421    };
422    format!(
423        "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
424        ipa = resolver_addr.ipa,
425        port = resolver_addr.port,
426        zone = zone,
427        ttl = ttl,
428        op = operation,
429    )
430}
431
432/// Updates resolver[s] using nsupdate or knsupdate
433async fn run_nsupdates(
434    update_method: &ResolverUpdateMethod,
435    resolvers: &Vec<String>,
436    srv: &ServiceStatus,
437    fqdn: &str,
438    endpoint_ipaddr: &IpAddr,
439) -> Result<()> {
440    // TODO pass status from caller
441    let endpoint_that_changed = srv
442        .endpoint_statuses
443        .get(endpoint_ipaddr)
444        .context("Endpoint not found")?;
445    let status = endpoint_that_changed.computed_status;
446
447    let cmd = match update_method {
448        ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
449        ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
450        _ => bail!("Incorrect function call"),
451    };
452
453    for resolver in resolvers {
454        let nsupdate_blob = create_nsupdate_blob(
455            parse_ipaddr_port(resolver)?,
456            fqdn,
457            &srv.dns_zone,
458            srv.dns_ttl,
459            endpoint_ipaddr,
460            status,
461        );
462        run_nsupdate_once(cmd, nsupdate_blob).await?;
463    }
464    Ok(())
465}
466
467/// Write configuration file and execute [k]nsupdate
468#[cfg(not(test))]
469async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
470    use tokio::process::Command;
471    let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
472    let conf_fn = tmpfile.path();
473    fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
474    info!(
475        "[dns] calling {} {} with commands:\n{}",
476        cmd,
477        conf_fn.display(),
478        conf
479    );
480    let out = Command::new(cmd).arg(conf_fn).output().await;
481    match out {
482        Ok(_) => info!("[dns] success"),
483        Err(_) => warn!("[dns] failure"),
484    }
485    Ok(())
486}
487
488#[cfg(test)]
489#[allow(clippy::unused_async)]
490async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
491    info!("--cmd--{cmd}--conf--\n{conf}--");
492    Ok(())
493}
494
495// Logic
496
497/// Calculates quorum value e.g. 5 -> 3, 3 -> 1
498#[must_use]
499const fn quorum(n: u16) -> u16 {
500    n.div_ceil(2)
501}
502
503/// Computes endpoint status using quorum (majority)
504#[must_use]
505#[allow(clippy::cast_possible_truncation)]
506#[allow(clippy::match_same_arms)]
507fn compute_status(endp_statuses: &NodeToEnst) -> Status {
508    if endp_statuses.is_empty() {
509        return Status::Unknown;
510    }
511    let q = quorum(endp_statuses.len() as u16);
512    let up_cnt = endp_statuses
513        .values()
514        .filter(|ens| ens.status == Status::Up)
515        .count() as u16;
516    let down_cnt = endp_statuses
517        .values()
518        .filter(|ens| ens.status == Status::Down)
519        .count() as u16;
520
521    match (up_cnt >= q, down_cnt >= q) {
522        (true, false) => Status::Up,
523        (false, true) => Status::Down,
524        (true, true) => Status::Up, // possible when the number of nodes is even
525        (false, false) => Status::Unknown, // not enough datapoints yet
526    }
527}
528
529/// Stores a probe datapoint. Returns `true` if a service endpoint change is detected.
530fn ingest_datapoint(end: &mut EndpointStatus, msg: &Datapoint) -> bool {
531    let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
532        warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
533        return false;
534    };
535
536    s.status = msg.status;
537    s.last_update_time = msg.tstamp;
538
539    // Computes updated status
540    let prev_computed_status = end.computed_status;
541    end.computed_status = compute_status(&end.node_to_ens);
542    end.computed_status_time = SystemTime::now();
543
544    if prev_computed_status == Status::Unknown {
545        return false;
546    }
547    if prev_computed_status == end.computed_status {
548        return false;
549    }
550    true
551}
552
553/// This is called when an enpdoint is going down. Decide if we want to
554/// delete its record from DNS or fail open (aka do nothing and keep it around).
555#[must_use]
556fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
557    let up = estatuses
558        .values()
559        .filter(|s| s.computed_status == Status::Up)
560        .count();
561    let down = estatuses
562        .values()
563        .filter(|s| s.computed_status == Status::Down)
564        .count();
565    let unknown = estatuses
566        .values()
567        .filter(|s| s.computed_status == Status::Unknown)
568        .count();
569
570    // Threshold for majority
571    let th = estatuses.len().div_ceil(2);
572    let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
573    {
574        if unknown >= th {
575            info!("{msg}: too many endpoints in unknown state");
576            true
577        } else if down >= th {
578            info!("{msg}: too many endpoints in down state");
579            true
580        } else if up <= th {
581            info!("{msg}: not enough endpoints in up state");
582            true
583        } else {
584            debug!("No {msg}");
585            false
586        }
587    }
588}
589
590/// Runs user-supplied command or script
591async fn run_update_command(
592    script_path: &str,
593    endpoint_ipaddr: IpAddr,
594    fqdn: &str,
595    dns_zone: &str,
596    status: Status,
597) {
598    let args = [
599        &endpoint_ipaddr.to_string(),
600        fqdn,
601        dns_zone,
602        &format!("{status:?}"),
603    ];
604    info!("[script] running with {:?}", &args);
605
606    let mut cmd = tokio::process::Command::new(script_path);
607    cmd.args(args);
608    let Ok(output) = cmd.output().await else {
609        error!("[script] failed to execute '{script_path}'");
610        return;
611    };
612    if !output.status.success() {
613        error!("[script] failed with {}", output.status);
614    }
615    let stderr = String::from_utf8_lossy(&output.stderr);
616    let stdout = String::from_utf8_lossy(&output.stdout);
617    if !stderr.is_empty() {
618        error!("[script] stderr: {stderr}");
619    }
620    if !stdout.is_empty() {
621        info!("[script] output: {stdout}");
622    }
623}
624
625/// Endpoint status change detected: Update DNS
626async fn handle_endpoint_change(
627    conf: &Conf,
628    srv: &ServiceStatus,
629    endpoint_ipaddr: IpAddr,
630    fqdn: &str,
631) -> Result<()> {
632    let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
633        return Ok(());
634    };
635    let status = endpoint_that_changed.computed_status;
636    if status == Status::Unknown {
637        return Ok(());
638    }
639    if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
640        // Do not delete the endpoint record
641        // TODO: log
642        return Ok(());
643    }
644
645    let zone = &srv.dns_zone;
646    match &conf.update_method {
647        // HTTP DNS APIs
648        ResolverUpdateMethod::Dynu => match status {
649            Status::Up => {
650                providers::Dynu::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
651                    .await?;
652            }
653            Status::Down => {
654                providers::Dynu::delete_record(
655                    &conf.update_credentials,
656                    fqdn,
657                    zone,
658                    endpoint_ipaddr,
659                )
660                .await?;
661            }
662            Status::Unknown => {}
663        },
664        ResolverUpdateMethod::Dynv6 => match status {
665            Status::Up => {
666                providers::Dynv6::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
667                    .await?;
668            }
669            Status::Down => {
670                providers::Dynv6::delete_record(
671                    &conf.update_credentials,
672                    fqdn,
673                    zone,
674                    endpoint_ipaddr,
675                )
676                .await?;
677            }
678            Status::Unknown => {}
679        },
680
681        ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
682            run_nsupdates(
683                &conf.update_method,
684                &conf.update_resolvers,
685                srv,
686                fqdn,
687                &endpoint_ipaddr,
688            )
689            .await?;
690        }
691        ResolverUpdateMethod::Script { path } => {
692            run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
693        }
694    }
695    Ok(())
696}
697
698// HTTP routes //
699
700async fn fetch_status(chan_core_tx: CoreTx) -> Result<FqdnToStatuses> {
701    // send a request to core_task and wait
702    let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
703    let wu = CoreMsg::Fetch { chan_dash_tx };
704    chan_core_tx.send(wu).await?;
705    Ok(chan_dash_rx.await?)
706}
707
708async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
709    let resp = fetch_status(shared.chan_core_tx).await;
710    match resp {
711        Err(e) => {
712            error!("dashboard: failed to fetch: {e}");
713            axum::response::Html("Error".into())
714        }
715        Ok(r) => {
716            let dash = dash::render_html_dashboard(&r);
717            axum::response::Html(dash)
718        }
719    }
720}
721
722async fn handle_http_get_health() -> &'static str {
723    "ok"
724}
725
726async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
727    // send to core
728    let wu = CoreMsg::Datapoint {
729        datapoint: Datapoint {
730            fqdn: msg.fqdn.to_string(),
731            ipaddr: msg.ipaddr,
732            status: msg.status,
733            tstamp: msg.tstamp,
734            node_addr: msg.node_addr,
735        },
736    };
737    if shared.chan_core_tx.send(wu).await.is_err() {
738        info!("[http] exiting");
739        // TODO handle error
740    };
741}
742
743// HTTP routes: API //
744
745// API - List endpoints
746
747/// Represents a service under monitoring including the state of each endpoint
748#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
749pub struct APIListItem {
750    pub ipaddrs: HashMap<IpAddr, Status>,
751    dns_ttl: u32,
752    dns_zone: String,
753    probe_interval_ms: u128,
754    probe_target_url: String,
755}
756
757pub type APIListOut = BTreeMap<String, APIListItem>;
758
759/// Lists monitored services. Includes the current state of each endpoint
760async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
761    let resp = fetch_status(shared.chan_core_tx).await.unwrap();
762
763    let out = resp
764        .into_iter()
765        .map(|(fqdn, srvs)| {
766            // collect ipaddr -> status hashmap
767            let ipaddrs = srvs
768                .endpoint_statuses
769                .into_iter()
770                .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
771                .collect();
772
773            (
774                fqdn,
775                APIListItem {
776                    ipaddrs,
777                    dns_ttl: srvs.dns_ttl,
778                    dns_zone: srvs.dns_zone,
779                    probe_interval_ms: srvs.probe_interval.as_millis(),
780                    probe_target_url: srvs.probe_target_url,
781                },
782            )
783        })
784        .collect();
785
786    Json(out)
787}
788
789// API - Update endpoints
790
791/// Struct sent to the API to modify the running configuration: add or remove services or endpoints
792#[derive(Debug, Serialize, Deserialize, Clone)]
793#[serde(tag = "method", rename_all = "snake_case")]
794pub enum APIRPCUpdateAction {
795    CreateService {
796        fqdn: String,
797        healthcheck: String,
798        ttl: u32,
799        zone: String,
800        probe_interval_ms: u64,
801    },
802    DeleteService {
803        fqdn: String,
804    },
805    AddIpAddr {
806        fqdn: String,
807        ipaddr: IpAddr,
808    },
809    DeleteIpAddr {
810        fqdn: String,
811        ipaddr: IpAddr,
812    },
813}
814
815#[derive(Deserialize, Serialize, Debug, Clone)]
816pub struct APIChangeRequest {
817    pub actions: Vec<APIRPCUpdateAction>,
818}
819
820#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
821#[serde(tag = "status", rename_all = "lowercase")]
822pub enum APIChangeResult {
823    Ok,
824    Error { error: String },
825}
826
827#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
828pub struct APIChangeResponse {
829    results: Vec<APIChangeResult>,
830}
831
832impl APIChangeResponse {
833    pub fn all_ok(&self) -> bool {
834        self.results
835            .iter()
836            .all(|r| matches!(r, APIChangeResult::Ok))
837    }
838}
839
840// HTTP API receives an update request and sends it into chan_core_tx
841async fn handle_http_api_update(
842    State(shared): SharedState,
843    axum::Json(req): Json<APIChangeRequest>,
844) -> axum::Json<APIChangeResponse> {
845    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
846    let wu = CoreMsg::APIChangeRequestWrap {
847        request: req,
848        resp_chan_tx,
849    };
850    debug!("sending API change request into chan_core_tx");
851    let _ = shared.chan_core_tx.send(wu).await;
852    debug!("waiting from resp_chan_rx");
853    let resp = resp_chan_rx
854        .await
855        .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
856
857    debug!("received a reply from resp_chan_rx");
858    Json(resp)
859}
860
861// end of API routes
862
863fn create_router(chan_core_tx: CoreTx, api_enabled: bool) -> axum::Router {
864    let r = axum::Router::new()
865        .route("/dash", axum::routing::get(handle_http_get_dashboard))
866        .route("/health", axum::routing::get(handle_http_get_health))
867        .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
868
869    if api_enabled {
870        r.route(
871            "/api/v0/list_endpoints",
872            axum::routing::get(handle_http_api_list),
873        )
874        .route(
875            "/api/v0/update_endpoints",
876            axum::routing::post(handle_http_api_update),
877        )
878    } else {
879        r
880    }
881    .with_state(Shared { chan_core_tx })
882}
883
884async fn wait_for_shutdown(token: CancellationToken) {
885    token.cancelled().await;
886    debug!("Shutdown token triggered. Axum server exiting.");
887}
888
889async fn run_axum_server(
890    http_listener: tokio::net::TcpListener,
891    http_router: axum::Router,
892    shutdown_token: CancellationToken,
893) {
894    if let Err(e) = axum::serve(http_listener, http_router)
895        .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
896        .await
897    {
898        error!("Axum server failed: {e}");
899        process::exit(1);
900    }
901}
902
903// init section //
904pub async fn init(
905    conf_fn: &str,
906    local_node: Option<String>,
907    tasks: &mut Vec<tokio::task::JoinHandle<()>>,
908) -> Result<CancellationToken> {
909    // Load and parse conf
910    let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
911        error!("Failed to load config: {err}");
912        process::exit(1)
913    });
914
915    // Init
916    let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
917    let service_statuses = HashMap::new();
918
919    // Setup channels
920    let (chan_core_tx, chan_core_rx): (CoreTx, CoreRx) = mpsc::channel(10);
921    let (chan_outbox_tx, chan_outbox_rx): (OutboxTx, OutboxRx) = mpsc::channel(100);
922
923    // Outbox sender
924    let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
925    tasks.push(tokio::spawn(sender));
926
927    // Core reactor
928    let core_t = core::core(
929        statsd,
930        conf.clone(),
931        service_statuses,
932        chan_core_rx,
933        chan_outbox_tx,
934    );
935    tasks.push(tokio::spawn(core_t));
936
937    time::sleep(Duration::from_millis(100)).await;
938
939    // Create cancellation token
940    let axum_shutdown = CancellationToken::new();
941    let shutdown_token_child = axum_shutdown.child_token();
942
943    // HTTP receiver setup
944    debug!("creating listener");
945    let http_router = create_router(chan_core_tx.clone(), conf.api.enabled);
946    let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
947
948    // Spawn Axum server
949    debug!("starting axum");
950    let fut = tokio::spawn(run_axum_server(
951        http_listener,
952        http_router,
953        shutdown_token_child,
954    ));
955
956    tasks.push(fut);
957
958    // Scan config to setup services and spawn probes
959    debug!("starting probes");
960    setup_and_spawn_probes(conf, chan_core_tx).await?;
961    debug!("starting probes: done");
962
963    Ok(axum_shutdown)
964}
965
966pub async fn run() -> Result<()> {
967    // Setup logging and metrics
968    JournalLog::new()
969        .context("Failed to create JournalLog")?
970        .install()
971        .context("Failed to setup journald logging")?;
972
973    // TODO: configurable loglevel
974    log::set_max_level(LevelFilter::Info);
975    info!("[main] starting");
976
977    // Start signal handlers
978    let mut tasks = Vec::new();
979    for s in [SignalKind::interrupt(), SignalKind::hangup()] {
980        tasks.push(tokio::spawn(handle_sig(s)));
981    }
982
983    let default_conf_fn = "/etc/rrdnsd.json";
984    let conf_fn = std::env::var("CONF").ok();
985    let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
986
987    let local_node = std::env::var("LOCAL_NODE").ok();
988
989    init(conf_fn, local_node, &mut tasks).await?;
990
991    join_all(tasks).await;
992    Ok(())
993}
994
995/// Setup services from conf file and spawn related probes
996async fn setup_and_spawn_probes(conf: Conf, chan_core_tx: Sender<CoreMsg>) -> Result<()> {
997    for service in &conf.services {
998        let probe_interval_ms = service
999            .probe_interval_ms
1000            .or(conf.probe_interval_ms)
1001            .unwrap_or(1000);
1002
1003        let request = APIChangeRequest {
1004            actions: vec![APIRPCUpdateAction::CreateService {
1005                fqdn: service.fqdn.clone(),
1006                healthcheck: service.healthcheck.clone(),
1007                ttl: service.ttl,
1008                zone: service.zone.clone(),
1009                probe_interval_ms,
1010            }],
1011        };
1012        send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1013
1014        for ipaddr_str in &service.ipaddrs {
1015            let request = APIChangeRequest {
1016                actions: vec![APIRPCUpdateAction::AddIpAddr {
1017                    fqdn: service.fqdn.clone(),
1018                    ipaddr: ipaddr_str.parse()?,
1019                }],
1020            };
1021            send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1022        }
1023    }
1024    Ok(())
1025}
1026
1027async fn send_api_change_request_then_check_resp(
1028    chan_core_tx: &Sender<CoreMsg>,
1029    request: APIChangeRequest,
1030) -> Result<(), anyhow::Error> {
1031    let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1032    let msg = CoreMsg::APIChangeRequestWrap {
1033        request: request.clone(),
1034        resp_chan_tx,
1035    };
1036    chan_core_tx.send(msg).await?;
1037    let resp = resp_chan_rx.await?;
1038
1039    for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1040        if let APIChangeResult::Error { error } = result {
1041            error!("Action {action:?} failed: {error}");
1042        }
1043    }
1044
1045    Ok(())
1046}