rrdnsd/
core.rs

1// rrdnsd - monitoring and failover for Round Robin DNS records
2// Copyright 2024-2025 Federico Ceratto <federico@debian.org>
3// Released under AGPLv3
4
5use anyhow::anyhow;
6use log::{debug, error, info, warn};
7use statsd::client::Client as Statsd;
8use std::collections::HashMap;
9// TODO: tokio exit?
10use std::time::SystemTime;
11
12use crate::*;
13/// Receives:
14///  - an incoming datapoint from a probe
15///  - a request to generate the dashboard page
16///  - a Change action from the RPC API
17///
18/// Triggers DNS failover / RR change if needed.
19async fn core_handle_one_msg(
20    statsd: &Statsd,
21    conf: &Conf,
22    coremsg: CoreMsg,
23    service_statuses: &mut FqdnToStatuses,
24    chan_outbox_tx: &OutboxSend,
25) -> Result<()> {
26    match coremsg {
27        // Respond to a request from the HTTP service to generate the HTML dashboard
28        CoreMsg::Fetch { chan_dash_tx } => {
29            core_handle_fetch(statsd, service_statuses, chan_dash_tx);
30            return Ok(());
31        }
32        // Handle a RPC API Change request
33        CoreMsg::APIChangeRequestWrap {
34            request,
35            resp_chan_tx,
36        } => {
37            debug!("[core] processing {request:?}");
38
39            let results = request
40                .actions
41                .iter()
42                .map(
43                    |action| match handle_change(conf, service_statuses, action, chan_outbox_tx) {
44                        Ok(()) => APIChangeResult::Ok,
45                        Err(e) => APIChangeResult::Error {
46                            error: e.to_string(),
47                        },
48                    },
49                )
50                .collect();
51
52            let resp = APIChangeResponse { results };
53            if resp_chan_tx.send(resp).is_err() {
54                error!("[core] failed to send response");
55            }
56            return Ok(());
57        }
58
59        // Receive a monitoring datapoint
60        CoreMsg::Datapoint { datapoint } => {
61            // debug!("[core] doing update")
62            // statsd metric `received_update.cnt` - internal
63            statsd.incr("received_update.cnt");
64            let msg = datapoint; //.context("Empty update")?;
65
66            debug!("[core] received a datapoint for {:?}", msg.ipaddr);
67            let endpoint_ipaddr = msg.ipaddr;
68
69            let srv = service_statuses
70                .get_mut(&msg.fqdn)
71                .context(format!("Unknown FQDN in quorum msg '{}'", msg.fqdn))?;
72
73            let end = srv
74                .endpoint_statuses
75                .get_mut(&endpoint_ipaddr)
76                .context("Unknown endpoind ipaddr in quorum msg")?;
77
78            // Store received datapoint
79            let change = ingest_datapoint(end, &msg);
80            if !change {
81                return Ok(());
82            }
83
84            // Endopoint status change detected: Update DNS
85            // statsd metric `status_change.cnt` - Endpoint status change count
86            statsd.incr("status_change.cnt");
87            info!(
88                "[core] Service '{}' endpoint {} changed to {:?}",
89                &msg.fqdn, endpoint_ipaddr, end.computed_status
90            );
91            let _ = handle_endpoint_change(conf, srv, endpoint_ipaddr, &msg.fqdn).await;
92        }
93    }
94    Ok(())
95}
96
97// HTTP API - handle change request
98fn handle_change(
99    conf: &Conf,
100    service_statuses: &mut FqdnToStatuses,
101    action: &APIRPCUpdateAction,
102    chan_outbox_tx: &OutboxSend,
103) -> Result<()> {
104    match action {
105        // Create new service with unique FQDN
106        APIRPCUpdateAction::CreateService {
107            fqdn,
108            healthcheck,
109            ttl,
110            zone,
111            probe_interval_ms,
112        } => {
113            if service_statuses.contains_key(fqdn) {
114                bail!("Service with FQDN {fqdn} already exists")
115            }
116            let probe_target_uri = healthcheck.replace("{}", fqdn);
117            let probe_interval = Duration::from_millis(*probe_interval_ms);
118
119            service_statuses.insert(
120                fqdn.to_string(),
121                ServiceStatus {
122                    dns_ttl: *ttl,
123                    dns_zone: zone.clone(),
124                    endpoint_statuses: HashMap::new(),
125                    probe_interval,
126                    probe_target_url: probe_target_uri,
127                },
128            );
129
130            Ok(())
131        }
132
133        // Delete service by FQDN
134        APIRPCUpdateAction::DeleteService { fqdn } => {
135            if service_statuses.remove(fqdn).is_some() {
136                Ok(())
137            } else {
138                bail!("Service with FQDN {fqdn} not found when deleting");
139            }
140        }
141
142        // Add a new ipaddr to an existing service
143        APIRPCUpdateAction::AddIpAddr { fqdn, ipaddr } => {
144            let service = service_statuses
145                .get_mut(fqdn)
146                .ok_or_else(|| anyhow!("Service with FQDN {fqdn} not found when adding ipaddr"))?;
147
148            let endpoint_ipaddr = ipaddr;
149
150            if service.endpoint_statuses.contains_key(endpoint_ipaddr) {
151                bail!("{ipaddr} is already listed for '{fqdn}'");
152            }
153
154            let cancel_probe = CancellationToken::new();
155            let probe_target_uri = service.probe_target_url.clone();
156            info!("[core] Adding {ipaddr} for {fqdn} and starting probe for {probe_target_uri}");
157
158            let node_to_ens = conf
159                .parsed_nodes
160                .iter()
161                .map(|&node| {
162                    (
163                        node,
164                        Enst {
165                            status: Status::Unknown,
166                            last_update_time: SystemTime::now(),
167                        },
168                    )
169                })
170                .collect();
171
172            let es = EndpointStatus {
173                node_to_ens,
174                computed_status_time: SystemTime::now(),
175                computed_status: Status::Unknown,
176                probe_cancel: cancel_probe.clone(),
177            };
178
179            service.endpoint_statuses.insert(*endpoint_ipaddr, es);
180
181            // spawn dedicated probe worker
182            let local_node = conf.parsed_local_node.unwrap();
183            let prober = probe(
184                service.probe_interval,
185                local_node,
186                *endpoint_ipaddr,
187                probe_target_uri,
188                fqdn.to_string(),
189                chan_outbox_tx.clone(),
190                cancel_probe,
191            );
192            tokio::spawn(prober);
193
194            Ok(())
195        }
196
197        // Delete an existing ipaddr from an existing service
198        APIRPCUpdateAction::DeleteIpAddr { fqdn, ipaddr } => {
199            let svc = service_statuses.get_mut(fqdn).ok_or_else(|| {
200                anyhow!("Service with FQDN {fqdn} not found when deleting ipaddr")
201            })?;
202
203            let endpoint_status = svc
204                .endpoint_statuses
205                .remove(ipaddr)
206                .ok_or_else(|| anyhow!("{ipaddr} not found for '{fqdn}'"))?;
207
208            endpoint_status.probe_cancel.cancel(); // shut down probe
209            Ok(())
210        }
211    }
212}
213
214/// Runs the core reactor. See `core_handle_one_msg`
215pub async fn core(
216    statsd: Statsd,
217    conf: Conf,
218    mut service_statuses: FqdnToStatuses,
219    mut chan_core_rx: CoreRec,
220    chan_outbox_tx: OutboxSend,
221) {
222    info!("[core] started");
223    while let Some(upwrap) = chan_core_rx.recv().await {
224        let res = core_handle_one_msg(
225            &statsd,
226            &conf,
227            upwrap,
228            &mut service_statuses,
229            &chan_outbox_tx,
230        )
231        .await;
232        if let Err(err) = &res {
233            warn!("[core] {err}");
234        }
235    }
236    info!("[core] exiting");
237    drop(chan_core_rx);
238}
239
240/// Responds to the fetch message with the service and endpoint status
241fn core_handle_fetch(
242    statsd: &Statsd,
243    service_statuses: &FqdnToStatuses,
244    chan_statusclone_tx: ServiceStatusCloneSend,
245) {
246    // statsd metric `fetch.cnt` - internal
247    statsd.incr("fetch.cnt");
248
249    let payload: FqdnToStatuses = service_statuses.clone();
250    if chan_statusclone_tx.send(payload).is_err() {
251        error!("[core] failed to send response");
252    }
253}