1use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod api_clients;
31mod core;
32mod dash;
33
34#[cfg(test)]
35mod tests;
36
37const VERSION: &str = env!("CARGO_PKG_VERSION");
38
39#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
40pub enum Status {
41 Unknown,
42 Down,
43 Up,
44}
45
46#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
47struct Enst {
48 status: Status,
49 last_update_time: SystemTime,
50}
51
52#[derive(Debug, Serialize, Deserialize, Clone)]
54struct QuorumMsg {
55 format_version: u8,
56 fqdn: String,
57 ipaddr: IpAddr,
58 status: Status,
59 tstamp: SystemTime,
60 node_addr: NodeAddr,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
64struct NodeAddr {
65 ipa: IpAddr,
66 port: u16,
67}
68
69type NodeToEnst = HashMap<NodeAddr, Enst>;
87
88#[derive(Debug, Clone)]
89struct EndpointStatus {
90 node_to_ens: NodeToEnst,
91 computed_status: Status,
92 computed_status_time: SystemTime,
93 probe_cancel: tokio_util::sync::CancellationToken,
94}
95
96impl Drop for EndpointStatus {
97 fn drop(&mut self) {
100 self.probe_cancel.cancel();
101 }
102}
103
104type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
105
106#[derive(Debug, Clone)]
107struct ServiceStatus {
108 dns_ttl: u32,
109 dns_zone: String,
110 endpoint_statuses: EndpointStatuses,
111 probe_interval: Duration,
112 probe_target_url: String,
113}
114
115type FqdnToStatuses = HashMap<String, ServiceStatus>;
116
117#[derive(Debug)]
118enum UpWrap {
119 Update {
120 update: Update,
121 },
122 Fetch {
123 chan_dash_tx: ServiceStatusCloneSend,
124 },
125 APIChangeRequestWrap {
126 request: APIChangeRequest,
127 resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
128 },
129}
130
131#[derive(Debug)]
132struct Update {
133 fqdn: String,
134 ipaddr: IpAddr,
135 status: Status,
136 tstamp: SystemTime,
137 node_addr: NodeAddr,
138}
139
140type ServiceStatusCloneSend = tokio::sync::oneshot::Sender<FqdnToStatuses>;
142type UpdateSend = Sender<UpWrap>;
143type UpdateRec = Receiver<UpWrap>;
144type OutboxSend = Sender<QuorumMsg>;
145type OutboxRec = Receiver<QuorumMsg>;
146#[derive(Debug, Clone)]
149struct Shared {
150 chan_update_tx: UpdateSend,
151}
152type SharedState = axum::extract::State<Shared>;
153
154macro_rules! unwrap_or_return {
155 ($res:expr) => {
156 match $res {
157 Ok(val) => val,
158 Err(e) => {
159 error!("error: {}", e);
160 return;
161 }
162 }
163 };
164}
165
166macro_rules! exp {
168 ($res:expr) => {
169 match $res {
170 Ok(val) => val,
171 Err(e) => {
172 error!("{:?}", e);
173 process::exit(1);
174 }
175 }
176 };
177}
178
179async fn handle_sig(sig: SignalKind) {
183 exp!(signal(sig)).recv().await;
184 info!("[main] exiting");
185 process::exit(0);
186}
187
188fn timer(statsd: &Statsd, name: &str, t0: Instant) {
190 let delta = t0.elapsed().as_millis() as f64;
191 statsd.timer(name, delta);
192}
193
194#[derive(Deserialize, Debug, Clone)]
198struct ServiceConf {
199 fqdn: String,
200 healthcheck: String,
201 ipaddrs: Vec<String>,
202 ttl: u32,
203 zone: String,
204 probe_interval_ms: Option<u64>,
205}
206
207#[derive(Debug, Deserialize, Clone)]
210struct APIConfig {
211 enabled: bool,
212}
213
214#[derive(Deserialize, Debug, Clone)]
216#[serde(rename_all = "lowercase")]
217enum ResolverUpdateMethod {
218 Dynu,
219 Nsupdate,
220 Knsupdate,
221 Script { path: String },
222}
223
224#[derive(Deserialize, Debug, Clone)]
226struct Conf {
227 conf_version: u8,
228 local_node: String,
229 #[serde(skip_deserializing)]
230 parsed_local_node: Option<NodeAddr>,
231 nodes: Vec<String>,
232 probe_interval_ms: Option<u64>,
233 #[serde(skip_deserializing)]
234 parsed_nodes: Vec<NodeAddr>,
235 services: Vec<ServiceConf>,
236 update_method: ResolverUpdateMethod,
237 update_resolvers: Vec<String>,
238 update_credentials: String,
240 enable_fail_open: bool,
241 api: APIConfig,
242}
243
244fn trim_dots(s: &str) -> String {
245 s.trim_start_matches('.').trim_end_matches('.').to_owned()
246}
247
248fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
249 let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
250 debug!("[main] conf loaded");
251 if conf.conf_version != 1 {
252 bail!("Invalid configuration format version");
253 }
254 if let Some(ln) = local_node {
255 conf.local_node = ln;
256 }
257 conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
258 conf.parsed_nodes = conf
259 .nodes
260 .iter()
261 .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
262 .collect();
263 for n in &conf.update_resolvers {
264 parse_ipaddr_port(n)?;
265 }
266 for sv in &mut conf.services {
267 sv.fqdn = trim_dots(&sv.fqdn);
268 sv.zone = trim_dots(&sv.zone);
269 if sv.fqdn.ends_with(&sv.zone) {
270 continue;
271 }
272 bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
273 }
274 Ok(conf)
275}
276
277fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
279 info!("[load_conf] reading {conf_fn}");
280 let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
281 prepare_conf(&jc, local_node)
282}
283
284async fn outbox_sender(mut chan_outbox_rx: OutboxRec, nodes: Vec<String>) {
286 let client_ = Reqw::builder()
288 .user_agent("rrdnsd")
289 .connect_timeout(Duration::new(5, 0))
290 .timeout(Duration::new(5, 0))
291 .tcp_keepalive(Duration::new(15, 0))
292 .build();
293 let client = unwrap_or_return!(client_);
294
295 while let Some(msg) = chan_outbox_rx.recv().await {
296 for node in &nodes {
297 let client = client.clone();
298 let msg = msg.clone();
299 let url = format!("http://{node}/qmsg");
300 let node = node.clone();
301 tokio::spawn(async move {
302 if let Err(e) = client.post(&url).json(&msg).send().await {
303 info!("Error sending msg to {node}: {e:?}");
304 }
305 });
307 }
308 }
309 info!("[outbox] exiting");
310 drop(chan_outbox_rx);
311}
312
313async fn probe(
317 interval: Duration,
318 local_node: NodeAddr,
319 endpoint_ipaddr: IpAddr,
320 target_uri: String,
321 fqdn: String,
322 chan_outbox_tx: OutboxSend,
323 cancel_probe: CancellationToken,
324) {
325 let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
329
330 let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
333
334 info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
336 let client_ = Reqw::builder()
337 .user_agent("rrdnsd")
338 .connect_timeout(Duration::new(5, 0))
339 .timeout(Duration::new(5, 0))
340 .tcp_keepalive(Duration::new(15, 0))
341 .redirect(Policy::none())
342 .resolve(&fqdn, endpoint_saddr)
343 .build();
344 let client = unwrap_or_return!(client_);
345
346 debug!("[probe] starting");
347 let mut ticker = time::interval(interval);
348 loop {
349 if cancel_probe.is_cancelled() {
350 info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
351 return;
352 }
353 let t0 = Instant::now();
354 ticker.tick().await;
355 let t1 = Instant::now();
356
357 debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
358 let resp = client.get(&target_uri).send().await;
359 let success = resp.is_ok();
360 timer(&statsd, "probe.duration", t0);
362 if success {
363 statsd.incr("probe.success.cnt");
365 } else {
366 statsd.incr("probe.failure.cnt");
368 }
369 let msg = QuorumMsg {
370 format_version: 1,
371 fqdn: fqdn.clone(),
372 ipaddr: endpoint_ipaddr,
373 status: if success { Status::Up } else { Status::Down },
374 tstamp: SystemTime::now(),
375 node_addr: local_node,
376 };
377 if chan_outbox_tx.send(msg).await.is_err() {
381 info!("[probe] send error");
382 drop(chan_outbox_tx);
383 return;
384 }
385
386 let loop_elapsed = t0.elapsed();
387 let probing = t1.elapsed();
388 statsd.timer("probe_time_msec", probing.as_millis() as f64);
390 let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
391 statsd.histogram("probe.load_factor", lf);
393 }
394}
395
396fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
398 let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
399 let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
400 let port = port_str.parse::<u16>().context("Invalid port number")?;
401 Ok(NodeAddr { ipa, port })
402}
403
404fn create_nsupdate_blob(
406 resolver_addr: NodeAddr,
407 fqdn: &str,
408 zone: &str,
409 ttl: u32,
410 endpoint_ipaddr: &IpAddr,
411 status: Status,
412) -> String {
413 let record_type = if endpoint_ipaddr.is_ipv4() {
414 "A"
415 } else {
416 "AAAA"
417 };
418
419 let operation = match status {
420 Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
421 Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
422 Status::Unknown => "exit".to_string(),
423 };
424 format!(
425 "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
426 ipa = resolver_addr.ipa,
427 port = resolver_addr.port,
428 zone = zone,
429 ttl = ttl,
430 op = operation,
431 )
432}
433
434async fn run_nsupdates(
436 update_method: &ResolverUpdateMethod,
437 resolvers: &Vec<String>,
438 srv: &ServiceStatus,
439 fqdn: &str,
440 endpoint_ipaddr: &IpAddr,
441) -> Result<()> {
442 let endpoint_that_changed = srv
444 .endpoint_statuses
445 .get(endpoint_ipaddr)
446 .context("Endpoint not found")?;
447 let status = endpoint_that_changed.computed_status;
448
449 let cmd = match update_method {
450 ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
451 ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
452 _ => bail!("Incorrect function call"),
453 };
454
455 for resolver in resolvers {
456 let nsupdate_blob = create_nsupdate_blob(
457 parse_ipaddr_port(resolver)?,
458 fqdn,
459 &srv.dns_zone,
460 srv.dns_ttl,
461 endpoint_ipaddr,
462 status,
463 );
464 run_nsupdate_once(cmd, nsupdate_blob).await?;
465 }
466 Ok(())
467}
468
469#[cfg(not(test))]
471async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
472 use tokio::process::Command;
473 let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
474 let conf_fn = tmpfile.path();
475 fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
476 info!(
477 "[dns] calling {} {} with commands:\n{}",
478 cmd,
479 conf_fn.display(),
480 conf
481 );
482 let out = Command::new(cmd).arg(conf_fn).output().await;
483 match out {
484 Ok(_) => info!("[dns] success"),
485 Err(_) => warn!("[dns] failure"),
486 }
487 Ok(())
488}
489
490#[cfg(test)]
491#[allow(clippy::unused_async)]
492async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
493 info!("--cmd--{cmd}--conf--\n{conf}--");
494 Ok(())
495}
496
497#[must_use]
501const fn quorum(n: u16) -> u16 {
502 n.div_ceil(2)
503}
504
505#[must_use]
507#[allow(clippy::cast_possible_truncation)]
508#[allow(clippy::match_same_arms)]
509fn compute_status(endp_statuses: &NodeToEnst) -> Status {
510 if endp_statuses.is_empty() {
511 return Status::Unknown;
512 }
513 let q = quorum(endp_statuses.len() as u16);
514 let up_cnt = endp_statuses
515 .values()
516 .filter(|ens| ens.status == Status::Up)
517 .count() as u16;
518 let down_cnt = endp_statuses
519 .values()
520 .filter(|ens| ens.status == Status::Down)
521 .count() as u16;
522
523 match (up_cnt >= q, down_cnt >= q) {
524 (true, false) => Status::Up,
525 (false, true) => Status::Down,
526 (true, true) => Status::Up, (false, false) => Status::Unknown, }
529}
530
531fn ingest_datapoint(end: &mut EndpointStatus, msg: &Update) -> bool {
533 let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
534 warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
535 return false;
536 };
537
538 s.status = msg.status;
539 s.last_update_time = msg.tstamp;
540
541 let prev_computed_status = end.computed_status;
543 end.computed_status = compute_status(&end.node_to_ens);
544 end.computed_status_time = SystemTime::now();
545
546 if prev_computed_status == Status::Unknown {
547 return false;
548 }
549 if prev_computed_status == end.computed_status {
550 return false;
551 }
552 true
553}
554
555#[must_use]
558fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
559 let up = estatuses
560 .values()
561 .filter(|s| s.computed_status == Status::Up)
562 .count();
563 let down = estatuses
564 .values()
565 .filter(|s| s.computed_status == Status::Down)
566 .count();
567 let unknown = estatuses
568 .values()
569 .filter(|s| s.computed_status == Status::Unknown)
570 .count();
571
572 let th = estatuses.len().div_ceil(2);
574 let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
575 {
576 if unknown >= th {
577 info!("{msg}: too many endpoints in unknown state");
578 true
579 } else if down >= th {
580 info!("{msg}: too many endpoints in down state");
581 true
582 } else if up <= th {
583 info!("{msg}: not enough endpoints in up state");
584 true
585 } else {
586 debug!("No {msg}");
587 false
588 }
589 }
590}
591
592async fn run_update_command(
594 script_path: &str,
595 endpoint_ipaddr: IpAddr,
596 fqdn: &str,
597 dns_zone: &str,
598 status: Status,
599) {
600 let args = [
601 &endpoint_ipaddr.to_string(),
602 fqdn,
603 dns_zone,
604 &format!("{status:?}"),
605 ];
606 info!("[script] running with {:?}", &args);
607
608 let mut cmd = tokio::process::Command::new(script_path);
609 cmd.args(args);
610 let Ok(output) = cmd.output().await else {
611 error!("[script] failed to execute '{script_path}'");
612 return;
613 };
614 if !output.status.success() {
615 error!("[script] failed with {}", output.status);
616 }
617 let stderr = String::from_utf8_lossy(&output.stderr);
618 let stdout = String::from_utf8_lossy(&output.stdout);
619 if !stderr.is_empty() {
620 error!("[script] stderr: {stderr}");
621 }
622 if !stdout.is_empty() {
623 info!("[script] output: {stdout}");
624 }
625}
626
627fn run_dynu_update(
628 conf: &Conf,
629 srv: &ServiceStatus,
630 endpoint_ipaddr: IpAddr,
631 fqdn: &str,
632 status: Status,
633) {
634 let client = api_clients::Dynu::new(conf.update_credentials.to_string());
635 let fqdn = fqdn.to_string();
636 let zone = srv.dns_zone.clone();
637
638 tokio::spawn(async move {
639 match status {
640 Status::Up => client.add_record(&fqdn, &zone, endpoint_ipaddr).await,
641 Status::Down => client.delete_record(&fqdn, &zone, endpoint_ipaddr).await,
642 Status::Unknown => {}
643 }
644 });
645}
646
647async fn handle_endpoint_change(
649 conf: &Conf,
650 srv: &ServiceStatus,
651 endpoint_ipaddr: IpAddr,
652 fqdn: &str,
653) {
654 let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
655 return;
656 };
657 let status = endpoint_that_changed.computed_status;
658 if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
659 return;
661 }
662
663 match &conf.update_method {
664 ResolverUpdateMethod::Dynu => {
665 run_dynu_update(conf, srv, endpoint_ipaddr, fqdn, status);
666 }
667 ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
668 let _ = run_nsupdates(
669 &conf.update_method,
670 &conf.update_resolvers,
671 srv,
672 fqdn,
673 &endpoint_ipaddr,
674 )
675 .await;
676 }
677 ResolverUpdateMethod::Script { path } => {
678 run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
679 }
680 }
681}
682
683async fn fetch_status(chan_update_tx: UpdateSend) -> Result<FqdnToStatuses> {
686 let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
688 let wu = UpWrap::Fetch { chan_dash_tx };
689 chan_update_tx.send(wu).await?;
690 Ok(chan_dash_rx.await?)
691}
692
693async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
694 let resp = fetch_status(shared.chan_update_tx).await;
695 match resp {
696 Err(e) => {
697 error!("dashboard: failed to fetch: {e}");
698 axum::response::Html("Error".into())
699 }
700 Ok(r) => {
701 let dash = dash::render_html_dashboard(&r);
702 axum::response::Html(dash)
703 }
704 }
705}
706
707async fn handle_http_get_health() -> &'static str {
708 "ok"
709}
710
711async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
712 let wu = UpWrap::Update {
714 update: Update {
715 fqdn: msg.fqdn.to_string(),
716 ipaddr: msg.ipaddr,
717 status: msg.status,
718 tstamp: msg.tstamp,
719 node_addr: msg.node_addr,
720 },
721 };
722 if shared.chan_update_tx.send(wu).await.is_err() {
723 info!("[http] exiting");
724 };
726}
727
728#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
734pub struct APIListItem {
735 pub ipaddrs: HashMap<IpAddr, Status>,
736 dns_ttl: u32,
737 dns_zone: String,
738 probe_interval_ms: u128,
739 probe_target_url: String,
740}
741
742pub type APIListOut = BTreeMap<String, APIListItem>;
743
744async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
746 let resp = fetch_status(shared.chan_update_tx).await.unwrap();
747
748 let out = resp
749 .into_iter()
750 .map(|(fqdn, srvs)| {
751 let ipaddrs = srvs
753 .endpoint_statuses
754 .into_iter()
755 .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
756 .collect();
757
758 (
759 fqdn,
760 APIListItem {
761 ipaddrs,
762 dns_ttl: srvs.dns_ttl,
763 dns_zone: srvs.dns_zone,
764 probe_interval_ms: srvs.probe_interval.as_millis(),
765 probe_target_url: srvs.probe_target_url,
766 },
767 )
768 })
769 .collect();
770
771 Json(out)
772}
773
774#[derive(Debug, Serialize, Deserialize, Clone)]
778#[serde(tag = "method", rename_all = "snake_case")]
779pub enum APIRPCUpdateAction {
780 CreateService {
781 fqdn: String,
782 healthcheck: String,
783 ttl: u32,
784 zone: String,
785 probe_interval_ms: u64,
786 },
787 DeleteService {
788 fqdn: String,
789 },
790 AddIpAddr {
791 fqdn: String,
792 ipaddr: IpAddr,
793 },
794 DeleteIpAddr {
795 fqdn: String,
796 ipaddr: IpAddr,
797 },
798}
799
800#[derive(Deserialize, Serialize, Debug, Clone)]
801pub struct APIChangeRequest {
802 pub actions: Vec<APIRPCUpdateAction>,
803}
804
805#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
806#[serde(tag = "status", rename_all = "lowercase")]
807pub enum APIChangeResult {
808 Ok,
809 Error { error: String },
810}
811
812#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
813pub struct APIChangeResponse {
814 results: Vec<APIChangeResult>,
815}
816
817impl APIChangeResponse {
818 pub fn all_ok(&self) -> bool {
819 self.results
820 .iter()
821 .all(|r| matches!(r, APIChangeResult::Ok))
822 }
823}
824
825async fn handle_http_api_update(
827 State(shared): SharedState,
828 axum::Json(req): Json<APIChangeRequest>,
829) -> axum::Json<APIChangeResponse> {
830 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
831 let wu = UpWrap::APIChangeRequestWrap {
832 request: req,
833 resp_chan_tx,
834 };
835 debug!("sending API posnt into chan_update_tx");
836 let _ = shared.chan_update_tx.send(wu).await;
837 debug!("waiting from resp_chan_rx");
838 let resp = resp_chan_rx
839 .await
840 .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
841
842 debug!("received a reply from resp_chan_rx");
843 Json(resp)
844}
845
846fn create_router(chan_update_tx: UpdateSend, api_enabled: bool) -> axum::Router {
849 let r = axum::Router::new()
850 .route("/dash", axum::routing::get(handle_http_get_dashboard))
851 .route("/health", axum::routing::get(handle_http_get_health))
852 .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
853
854 if api_enabled {
855 r.route(
856 "/api/v0/list_endpoints",
857 axum::routing::get(handle_http_api_list),
858 )
859 .route(
860 "/api/v0/update_endpoints",
861 axum::routing::post(handle_http_api_update),
862 )
863 } else {
864 r
865 }
866 .with_state(Shared { chan_update_tx })
867}
868
869async fn wait_for_shutdown(token: CancellationToken) {
870 token.cancelled().await;
871 debug!("Shutdown token triggered. Axum server exiting.");
872}
873
874async fn run_axum_server(
875 http_listener: tokio::net::TcpListener,
876 http_router: axum::Router,
877 shutdown_token: CancellationToken,
878) {
879 if let Err(e) = axum::serve(http_listener, http_router)
880 .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
881 .await
882 {
883 error!("Axum server failed: {e}");
884 process::exit(1);
885 }
886}
887
888pub async fn init(
890 conf_fn: &str,
891 local_node: Option<String>,
892 tasks: &mut Vec<tokio::task::JoinHandle<()>>,
893) -> Result<CancellationToken> {
894 let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
896 error!("Failed to load config: {err}");
897 process::exit(1)
898 });
899
900 let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
902 let service_statuses = HashMap::new();
903
904 let (chan_update_tx, chan_update_rx): (UpdateSend, UpdateRec) = mpsc::channel(10);
906 let (chan_outbox_tx, chan_outbox_rx): (OutboxSend, OutboxRec) = mpsc::channel(100);
907
908 let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
910 tasks.push(tokio::spawn(sender));
911
912 let updater_t = core::updater(
914 statsd,
915 conf.clone(),
916 service_statuses,
917 chan_update_rx,
918 chan_outbox_tx,
919 );
920 tasks.push(tokio::spawn(updater_t));
921
922 time::sleep(Duration::from_millis(100)).await;
923
924 let axum_shutdown = CancellationToken::new();
926 let shutdown_token_child = axum_shutdown.child_token();
927
928 debug!("creating listener");
930 let http_router = create_router(chan_update_tx.clone(), conf.api.enabled);
931 let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
932
933 debug!("starting axum");
935 let fut = tokio::spawn(run_axum_server(
936 http_listener,
937 http_router,
938 shutdown_token_child,
939 ));
940
941 tasks.push(fut);
942
943 debug!("starting probes");
945 setup_and_spawn_probes(conf, chan_update_tx).await?;
946 debug!("starting probes: done");
947
948 Ok(axum_shutdown)
949}
950
951pub async fn run() -> Result<()> {
952 JournalLog::new()
954 .context("Failed to create JournalLog")?
955 .install()
956 .context("Failed to setup journald logging")?;
957
958 log::set_max_level(LevelFilter::Info);
960 info!("[main] starting");
961
962 let mut tasks = Vec::new();
964 for s in [SignalKind::interrupt(), SignalKind::hangup()] {
965 tasks.push(tokio::spawn(handle_sig(s)));
966 }
967
968 let default_conf_fn = "/etc/rrdnsd.json";
969 let conf_fn = std::env::var("CONF").ok();
970 let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
971
972 let local_node = std::env::var("LOCAL_NODE").ok();
973
974 init(conf_fn, local_node, &mut tasks).await?;
975
976 join_all(tasks).await;
977 Ok(())
978}
979
980async fn setup_and_spawn_probes(conf: Conf, chan_update_tx: Sender<UpWrap>) -> Result<()> {
982 for service in &conf.services {
983 let probe_interval_ms = service
984 .probe_interval_ms
985 .or(conf.probe_interval_ms)
986 .unwrap_or(1000);
987
988 let request = APIChangeRequest {
989 actions: vec![APIRPCUpdateAction::CreateService {
990 fqdn: service.fqdn.clone(),
991 healthcheck: service.healthcheck.clone(),
992 ttl: service.ttl,
993 zone: service.zone.clone(),
994 probe_interval_ms,
995 }],
996 };
997 send_api_change_request_then_check_resp(&chan_update_tx, request).await?;
998
999 for ipaddr_str in &service.ipaddrs {
1000 let request = APIChangeRequest {
1001 actions: vec![APIRPCUpdateAction::AddIpAddr {
1002 fqdn: service.fqdn.clone(),
1003 ipaddr: ipaddr_str.parse()?,
1004 }],
1005 };
1006 send_api_change_request_then_check_resp(&chan_update_tx, request).await?;
1007 }
1008 }
1009 Ok(())
1010}
1011
1012async fn send_api_change_request_then_check_resp(
1013 chan_update_tx: &Sender<UpWrap>,
1014 request: APIChangeRequest,
1015) -> Result<(), anyhow::Error> {
1016 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1017 let msg = UpWrap::APIChangeRequestWrap {
1018 request: request.clone(),
1019 resp_chan_tx,
1020 };
1021 chan_update_tx.send(msg).await?;
1022 let resp = resp_chan_rx.await?;
1023
1024 for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1025 if let APIChangeResult::Error { error } = result {
1026 error!("Action {action:?} failed: {error}");
1027 }
1028 }
1029
1030 Ok(())
1031}