rrdnsd/
core.rs

1// rrdnsd - monitoring and failover for Round Robin DNS records
2// Copyright 2024-2025 Federico Ceratto <federico@debian.org>
3// Released under AGPLv3
4
5use anyhow::anyhow;
6use log::{debug, error, info, warn};
7use statsd::client::Client as Statsd;
8use std::collections::HashMap;
9// TODO: tokio exit?
10use std::time::SystemTime;
11
12use crate::*;
13/// Receives:
14///  - an incoming datapoint from a probe
15///  - a request to generate the dashboard page
16///  - a Change action from the RPC API
17///
18/// Triggers DNS failover / RR change if needed.
19async fn updater_handle_one_update(
20    statsd: &Statsd,
21    conf: &Conf,
22    upwrap: UpWrap,
23    service_statuses: &mut FqdnToStatuses,
24    chan_outbox_tx: &OutboxSend,
25) -> Result<()> {
26    match upwrap {
27        // Respond to a request from the HTTP service to generate the HTML dashboard
28        UpWrap::Fetch { chan_dash_tx } => {
29            updater_fetch(statsd, service_statuses, chan_dash_tx);
30            return Ok(());
31        }
32        // Handle a RPC API Change request
33        UpWrap::APIChangeRequestWrap {
34            request,
35            resp_chan_tx,
36        } => {
37            // updater_handle_api_change_request(request, resp_chan_tx);
38            debug!("[updater] processing {request:?}");
39
40            let results = request
41                .actions
42                .iter()
43                .map(
44                    |action| match handle_change(conf, service_statuses, action, chan_outbox_tx) {
45                        Ok(()) => APIChangeResult::Ok,
46                        Err(e) => APIChangeResult::Error {
47                            error: e.to_string(),
48                        },
49                    },
50                )
51                .collect();
52
53            let resp = APIChangeResponse { results };
54            if resp_chan_tx.send(resp).is_err() {
55                error!("[updater] failed to send response");
56            }
57            return Ok(());
58        }
59
60        // Receive a monitoring datapoint
61        UpWrap::Update { update } => {
62            // debug!("[updater] doing update")
63            // statsd metric `received_update.cnt` - internal
64            statsd.incr("received_update.cnt");
65            let msg = update; //.context("Empty update")?;
66
67            debug!("[updater] received a datapoint for {:?}", msg.ipaddr);
68            let endpoint_ipaddr = msg.ipaddr;
69
70            let srv = service_statuses
71                .get_mut(&msg.fqdn)
72                .context(format!("Unknown FQDN in quorum msg '{}'", msg.fqdn))?;
73
74            let end = srv
75                .endpoint_statuses
76                .get_mut(&endpoint_ipaddr)
77                .context("Unknown endpoind ipaddr in quorum msg")?;
78
79            // Store received datapoint
80            let change = ingest_datapoint(end, &msg);
81            if !change {
82                return Ok(());
83            }
84
85            // Endopoint status change detected: Update DNS
86            // statsd metric `status_change.cnt` - Endpoint status change count
87            statsd.incr("status_change.cnt");
88            info!(
89                "[updater] Service '{}' endpoint {} changed to {:?}",
90                &msg.fqdn, endpoint_ipaddr, end.computed_status
91            );
92            handle_endpoint_change(conf, srv, endpoint_ipaddr, &msg.fqdn).await;
93        }
94    }
95    Ok(())
96}
97
98// HTTP API - handle change request
99fn handle_change(
100    conf: &Conf,
101    service_statuses: &mut FqdnToStatuses,
102    action: &APIRPCUpdateAction,
103    chan_outbox_tx: &OutboxSend,
104) -> Result<()> {
105    match action {
106        // Create new service with unique FQDN
107        APIRPCUpdateAction::CreateService {
108            fqdn,
109            healthcheck,
110            ttl,
111            zone,
112            probe_interval_ms,
113        } => {
114            if service_statuses.contains_key(fqdn) {
115                bail!("Service with FQDN {fqdn} already exists")
116            }
117            let probe_target_uri = healthcheck.replace("{}", fqdn);
118            let probe_interval = Duration::from_millis(*probe_interval_ms);
119
120            service_statuses.insert(
121                fqdn.to_string(),
122                ServiceStatus {
123                    dns_ttl: *ttl,
124                    dns_zone: zone.clone(),
125                    endpoint_statuses: HashMap::new(),
126                    probe_interval,
127                    probe_target_url: probe_target_uri,
128                },
129            );
130
131            Ok(())
132        }
133
134        // Delete service by FQDN
135        APIRPCUpdateAction::DeleteService { fqdn } => {
136            if service_statuses.remove(fqdn).is_some() {
137                Ok(())
138            } else {
139                bail!("Service with FQDN {fqdn} not found when deleting");
140            }
141        }
142
143        // Add a new ipaddr to an existing service
144        APIRPCUpdateAction::AddIpAddr { fqdn, ipaddr } => {
145            let service = service_statuses
146                .get_mut(fqdn)
147                .ok_or_else(|| anyhow!("Service with FQDN {fqdn} not found when adding ipaddr"))?;
148
149            let endpoint_ipaddr = ipaddr;
150
151            if service.endpoint_statuses.contains_key(endpoint_ipaddr) {
152                bail!("{ipaddr} is already listed for '{fqdn}'");
153            }
154
155            let cancel_probe = CancellationToken::new();
156            let probe_target_uri = service.probe_target_url.clone();
157            info!("[updater] Adding {ipaddr} for {fqdn} and starting probe for {probe_target_uri}");
158
159            let node_to_ens = conf
160                .parsed_nodes
161                .iter()
162                .map(|&node| {
163                    (
164                        node,
165                        Enst {
166                            status: Status::Unknown,
167                            last_update_time: SystemTime::now(),
168                        },
169                    )
170                })
171                .collect();
172
173            let es = EndpointStatus {
174                node_to_ens,
175                computed_status_time: SystemTime::now(),
176                computed_status: Status::Unknown,
177                probe_cancel: cancel_probe.clone(),
178            };
179
180            service.endpoint_statuses.insert(*endpoint_ipaddr, es);
181
182            // spawn dedicated probe worker
183            let local_node = conf.parsed_local_node.unwrap();
184            let prober = probe(
185                service.probe_interval,
186                local_node,
187                *endpoint_ipaddr,
188                probe_target_uri,
189                fqdn.to_string(),
190                chan_outbox_tx.clone(),
191                cancel_probe,
192            );
193            tokio::spawn(prober);
194
195            Ok(())
196        }
197
198        // Delete an existing ipaddr from an existing service
199        APIRPCUpdateAction::DeleteIpAddr { fqdn, ipaddr } => {
200            let svc = service_statuses.get_mut(fqdn).ok_or_else(|| {
201                anyhow!("Service with FQDN {fqdn} not found when deleting ipaddr")
202            })?;
203
204            let endpoint_status = svc
205                .endpoint_statuses
206                .remove(ipaddr)
207                .ok_or_else(|| anyhow!("{ipaddr} not found for '{fqdn}'"))?;
208
209            endpoint_status.probe_cancel.cancel(); // shut down probe
210            Ok(())
211        }
212    }
213}
214
215/// Runs the updater actor. See `updater_handle_one_update`
216pub async fn updater(
217    statsd: Statsd,
218    conf: Conf,
219    mut service_statuses: FqdnToStatuses,
220    mut chan_update_rx: UpdateRec,
221    chan_outbox_tx: OutboxSend,
222) {
223    info!("[updater] started");
224    while let Some(upwrap) = chan_update_rx.recv().await {
225        let res = updater_handle_one_update(
226            &statsd,
227            &conf,
228            upwrap,
229            &mut service_statuses,
230            &chan_outbox_tx,
231        )
232        .await;
233        if let Err(err) = &res {
234            warn!("[updater_handle_one_update] {err}");
235        }
236    }
237    info!("[updater] exiting");
238    drop(chan_update_rx);
239}
240
241/// Updater responds to the fetch message with the service and endpoint status
242fn updater_fetch(
243    statsd: &Statsd,
244    service_statuses: &FqdnToStatuses,
245    chan_statusclone_tx: ServiceStatusCloneSend,
246) {
247    // statsd metric `fetch.cnt` - internal
248    statsd.incr("fetch.cnt");
249
250    let payload: FqdnToStatuses = service_statuses.clone();
251    if chan_statusclone_tx.send(payload).is_err() {
252        error!("[updater] failed to send response");
253    }
254}