1use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod core;
31mod dash;
32mod providers;
33
34use crate::providers::SetProvider;
35
36#[cfg(test)]
37mod tests;
38
39const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
42pub enum Status {
43 Unknown,
44 Down,
45 Up,
46}
47
48#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
49struct Enst {
50 status: Status,
51 last_update_time: SystemTime,
52}
53
54#[derive(Debug, Serialize, Deserialize, Clone)]
56struct QuorumMsg {
57 format_version: u8,
58 fqdn: String,
59 ipaddr: IpAddr,
60 status: Status,
61 tstamp: SystemTime,
62 node_addr: NodeAddr,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
66struct NodeAddr {
67 ipa: IpAddr,
68 port: u16,
69}
70
71type NodeToEnst = HashMap<NodeAddr, Enst>;
89
90#[derive(Debug, Clone)]
91struct EndpointStatus {
92 node_to_ens: NodeToEnst,
93 computed_status: Status,
94 computed_status_time: SystemTime,
95 probe_cancel: tokio_util::sync::CancellationToken,
96}
97
98impl Drop for EndpointStatus {
99 fn drop(&mut self) {
102 self.probe_cancel.cancel();
103 }
104}
105
106type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
107
108#[derive(Debug, Clone)]
109struct ServiceStatus {
110 dns_ttl: u32,
111 dns_zone: String,
112 endpoint_statuses: EndpointStatuses,
113 probe_interval: Duration,
114 probe_target_url: String,
115}
116
117type FqdnToStatuses = HashMap<String, ServiceStatus>;
118
119#[derive(Debug)]
121enum CoreMsg {
122 Datapoint {
123 datapoint: Datapoint,
124 },
125 Fetch {
126 chan_dash_tx: ServiceStatusCloneSend,
127 },
128 APIChangeRequestWrap {
129 request: APIChangeRequest,
130 resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
131 },
132}
133
134#[derive(Debug)]
136struct Datapoint {
137 fqdn: String,
138 ipaddr: IpAddr,
139 status: Status,
140 tstamp: SystemTime,
141 node_addr: NodeAddr,
142}
143
144type ServiceStatusCloneSend = tokio::sync::oneshot::Sender<FqdnToStatuses>;
146type CoreSend = Sender<CoreMsg>;
147type CoreRec = Receiver<CoreMsg>;
148type OutboxSend = Sender<QuorumMsg>;
149type OutboxRec = Receiver<QuorumMsg>;
150#[derive(Debug, Clone)]
153struct Shared {
154 chan_core_tx: CoreSend,
155}
156type SharedState = axum::extract::State<Shared>;
157
158macro_rules! unwrap_or_return {
159 ($res:expr) => {
160 match $res {
161 Ok(val) => val,
162 Err(e) => {
163 error!("error: {}", e);
164 return;
165 }
166 }
167 };
168}
169
170macro_rules! exp {
172 ($res:expr) => {
173 match $res {
174 Ok(val) => val,
175 Err(e) => {
176 error!("{:?}", e);
177 process::exit(1);
178 }
179 }
180 };
181}
182
183async fn handle_sig(sig: SignalKind) {
187 exp!(signal(sig)).recv().await;
188 info!("[main] exiting");
189 process::exit(0);
190}
191
192fn timer(statsd: &Statsd, name: &str, t0: Instant) {
194 let delta = t0.elapsed().as_millis() as f64;
195 statsd.timer(name, delta);
196}
197
198#[derive(Deserialize, Debug, Clone)]
202struct ServiceConf {
203 fqdn: String,
204 healthcheck: String,
205 ipaddrs: Vec<String>,
206 ttl: u32,
207 zone: String,
208 probe_interval_ms: Option<u64>,
209}
210
211#[derive(Debug, Deserialize, Clone)]
214struct APIConfig {
215 enabled: bool,
216}
217
218#[derive(Deserialize, Debug, Clone)]
220#[serde(rename_all = "lowercase")]
221enum ResolverUpdateMethod {
222 Dynu,
223 Dynv6,
224 Nsupdate,
225 Knsupdate,
226 Script { path: String },
227}
228
229#[derive(Deserialize, Debug, Clone)]
231struct Conf {
232 conf_version: u8,
233 local_node: String,
234 #[serde(skip_deserializing)]
235 parsed_local_node: Option<NodeAddr>,
236 nodes: Vec<String>,
237 probe_interval_ms: Option<u64>,
238 #[serde(skip_deserializing)]
239 parsed_nodes: Vec<NodeAddr>,
240 services: Vec<ServiceConf>,
241 update_method: ResolverUpdateMethod,
242 update_resolvers: Vec<String>,
243 update_credentials: String,
245 enable_fail_open: bool,
246 api: APIConfig,
247}
248
249fn trim_dots(s: &str) -> String {
250 s.trim_start_matches('.').trim_end_matches('.').to_owned()
251}
252
253fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
254 let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
255 debug!("[main] conf loaded");
256 if conf.conf_version != 1 {
257 bail!("Invalid configuration format version");
258 }
259 if let Some(ln) = local_node {
260 conf.local_node = ln;
261 }
262 conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
263 conf.parsed_nodes = conf
264 .nodes
265 .iter()
266 .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
267 .collect();
268 for n in &conf.update_resolvers {
269 parse_ipaddr_port(n)?;
270 }
271 for sv in &mut conf.services {
272 sv.fqdn = trim_dots(&sv.fqdn);
273 sv.zone = trim_dots(&sv.zone);
274 if sv.fqdn.ends_with(&sv.zone) {
275 continue;
276 }
277 bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
278 }
279 Ok(conf)
280}
281
282fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
284 info!("[load_conf] reading {conf_fn}");
285 let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
286 prepare_conf(&jc, local_node)
287}
288
289async fn outbox_sender(mut chan_outbox_rx: OutboxRec, nodes: Vec<String>) {
291 let client_ = Reqw::builder()
293 .user_agent("rrdnsd")
294 .connect_timeout(Duration::new(5, 0))
295 .timeout(Duration::new(5, 0))
296 .tcp_keepalive(Duration::new(15, 0))
297 .build();
298 let client = unwrap_or_return!(client_);
299
300 while let Some(msg) = chan_outbox_rx.recv().await {
301 for node in &nodes {
302 let client = client.clone();
303 let msg = msg.clone();
304 let url = format!("http://{node}/qmsg");
305 let node = node.clone();
306 tokio::spawn(async move {
307 if let Err(e) = client.post(&url).json(&msg).send().await {
308 info!("Error sending msg to {node}: {e:?}");
309 }
310 });
312 }
313 }
314 info!("[outbox] exiting");
315 drop(chan_outbox_rx);
316}
317
318async fn probe(
322 interval: Duration,
323 local_node: NodeAddr,
324 endpoint_ipaddr: IpAddr,
325 target_uri: String,
326 fqdn: String,
327 chan_outbox_tx: OutboxSend,
328 cancel_probe: CancellationToken,
329) {
330 let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
334
335 let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
338
339 info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
341 let client_ = Reqw::builder()
342 .user_agent("rrdnsd")
343 .connect_timeout(Duration::new(5, 0))
344 .timeout(Duration::new(5, 0))
345 .tcp_keepalive(Duration::new(15, 0))
346 .redirect(Policy::none())
347 .resolve(&fqdn, endpoint_saddr)
348 .build();
349 let client = unwrap_or_return!(client_);
350
351 debug!("[probe] starting");
352 let mut ticker = time::interval(interval);
353 loop {
354 if cancel_probe.is_cancelled() {
355 info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
356 return;
357 }
358 let t0 = Instant::now();
359 ticker.tick().await;
360 let t1 = Instant::now();
361
362 debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
363 let resp = client.get(&target_uri).send().await;
364 let success = resp.is_ok();
365 timer(&statsd, "probe.duration", t0);
367 if success {
368 statsd.incr("probe.success.cnt");
370 } else {
371 statsd.incr("probe.failure.cnt");
373 }
374 let msg = QuorumMsg {
375 format_version: 1,
376 fqdn: fqdn.clone(),
377 ipaddr: endpoint_ipaddr,
378 status: if success { Status::Up } else { Status::Down },
379 tstamp: SystemTime::now(),
380 node_addr: local_node,
381 };
382 if chan_outbox_tx.send(msg).await.is_err() {
386 info!("[probe] send error");
387 drop(chan_outbox_tx);
388 return;
389 }
390
391 let loop_elapsed = t0.elapsed();
392 let probing = t1.elapsed();
393 statsd.timer("probe_time_msec", probing.as_millis() as f64);
395 let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
396 statsd.histogram("probe.load_factor", lf);
398 }
399}
400
401fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
403 let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
404 let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
405 let port = port_str.parse::<u16>().context("Invalid port number")?;
406 Ok(NodeAddr { ipa, port })
407}
408
409fn create_nsupdate_blob(
411 resolver_addr: NodeAddr,
412 fqdn: &str,
413 zone: &str,
414 ttl: u32,
415 endpoint_ipaddr: &IpAddr,
416 status: Status,
417) -> String {
418 let record_type = if endpoint_ipaddr.is_ipv4() {
419 "A"
420 } else {
421 "AAAA"
422 };
423
424 let operation = match status {
425 Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
426 Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
427 Status::Unknown => "exit".to_string(),
428 };
429 format!(
430 "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
431 ipa = resolver_addr.ipa,
432 port = resolver_addr.port,
433 zone = zone,
434 ttl = ttl,
435 op = operation,
436 )
437}
438
439async fn run_nsupdates(
441 update_method: &ResolverUpdateMethod,
442 resolvers: &Vec<String>,
443 srv: &ServiceStatus,
444 fqdn: &str,
445 endpoint_ipaddr: &IpAddr,
446) -> Result<()> {
447 let endpoint_that_changed = srv
449 .endpoint_statuses
450 .get(endpoint_ipaddr)
451 .context("Endpoint not found")?;
452 let status = endpoint_that_changed.computed_status;
453
454 let cmd = match update_method {
455 ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
456 ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
457 _ => bail!("Incorrect function call"),
458 };
459
460 for resolver in resolvers {
461 let nsupdate_blob = create_nsupdate_blob(
462 parse_ipaddr_port(resolver)?,
463 fqdn,
464 &srv.dns_zone,
465 srv.dns_ttl,
466 endpoint_ipaddr,
467 status,
468 );
469 run_nsupdate_once(cmd, nsupdate_blob).await?;
470 }
471 Ok(())
472}
473
474#[cfg(not(test))]
476async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
477 use tokio::process::Command;
478 let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
479 let conf_fn = tmpfile.path();
480 fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
481 info!(
482 "[dns] calling {} {} with commands:\n{}",
483 cmd,
484 conf_fn.display(),
485 conf
486 );
487 let out = Command::new(cmd).arg(conf_fn).output().await;
488 match out {
489 Ok(_) => info!("[dns] success"),
490 Err(_) => warn!("[dns] failure"),
491 }
492 Ok(())
493}
494
495#[cfg(test)]
496#[allow(clippy::unused_async)]
497async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
498 info!("--cmd--{cmd}--conf--\n{conf}--");
499 Ok(())
500}
501
502#[must_use]
506const fn quorum(n: u16) -> u16 {
507 n.div_ceil(2)
508}
509
510#[must_use]
512#[allow(clippy::cast_possible_truncation)]
513#[allow(clippy::match_same_arms)]
514fn compute_status(endp_statuses: &NodeToEnst) -> Status {
515 if endp_statuses.is_empty() {
516 return Status::Unknown;
517 }
518 let q = quorum(endp_statuses.len() as u16);
519 let up_cnt = endp_statuses
520 .values()
521 .filter(|ens| ens.status == Status::Up)
522 .count() as u16;
523 let down_cnt = endp_statuses
524 .values()
525 .filter(|ens| ens.status == Status::Down)
526 .count() as u16;
527
528 match (up_cnt >= q, down_cnt >= q) {
529 (true, false) => Status::Up,
530 (false, true) => Status::Down,
531 (true, true) => Status::Up, (false, false) => Status::Unknown, }
534}
535
536fn ingest_datapoint(end: &mut EndpointStatus, msg: &Datapoint) -> bool {
538 let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
539 warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
540 return false;
541 };
542
543 s.status = msg.status;
544 s.last_update_time = msg.tstamp;
545
546 let prev_computed_status = end.computed_status;
548 end.computed_status = compute_status(&end.node_to_ens);
549 end.computed_status_time = SystemTime::now();
550
551 if prev_computed_status == Status::Unknown {
552 return false;
553 }
554 if prev_computed_status == end.computed_status {
555 return false;
556 }
557 true
558}
559
560#[must_use]
563fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
564 let up = estatuses
565 .values()
566 .filter(|s| s.computed_status == Status::Up)
567 .count();
568 let down = estatuses
569 .values()
570 .filter(|s| s.computed_status == Status::Down)
571 .count();
572 let unknown = estatuses
573 .values()
574 .filter(|s| s.computed_status == Status::Unknown)
575 .count();
576
577 let th = estatuses.len().div_ceil(2);
579 let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
580 {
581 if unknown >= th {
582 info!("{msg}: too many endpoints in unknown state");
583 true
584 } else if down >= th {
585 info!("{msg}: too many endpoints in down state");
586 true
587 } else if up <= th {
588 info!("{msg}: not enough endpoints in up state");
589 true
590 } else {
591 debug!("No {msg}");
592 false
593 }
594 }
595}
596
597async fn run_update_command(
599 script_path: &str,
600 endpoint_ipaddr: IpAddr,
601 fqdn: &str,
602 dns_zone: &str,
603 status: Status,
604) {
605 let args = [
606 &endpoint_ipaddr.to_string(),
607 fqdn,
608 dns_zone,
609 &format!("{status:?}"),
610 ];
611 info!("[script] running with {:?}", &args);
612
613 let mut cmd = tokio::process::Command::new(script_path);
614 cmd.args(args);
615 let Ok(output) = cmd.output().await else {
616 error!("[script] failed to execute '{script_path}'");
617 return;
618 };
619 if !output.status.success() {
620 error!("[script] failed with {}", output.status);
621 }
622 let stderr = String::from_utf8_lossy(&output.stderr);
623 let stdout = String::from_utf8_lossy(&output.stdout);
624 if !stderr.is_empty() {
625 error!("[script] stderr: {stderr}");
626 }
627 if !stdout.is_empty() {
628 info!("[script] output: {stdout}");
629 }
630}
631
632async fn handle_endpoint_change(
634 conf: &Conf,
635 srv: &ServiceStatus,
636 endpoint_ipaddr: IpAddr,
637 fqdn: &str,
638) -> Result<()> {
639 let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
640 return Ok(());
641 };
642 let status = endpoint_that_changed.computed_status;
643 if status == Status::Unknown {
644 return Ok(());
645 }
646 if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
647 return Ok(());
650 }
651
652 let zone = &srv.dns_zone;
653 match &conf.update_method {
654 ResolverUpdateMethod::Dynu => match status {
656 Status::Up => {
657 providers::Dynu::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
658 .await?;
659 }
660 Status::Down => {
661 providers::Dynu::delete_record(
662 &conf.update_credentials,
663 fqdn,
664 zone,
665 endpoint_ipaddr,
666 )
667 .await?;
668 }
669 Status::Unknown => {}
670 },
671 ResolverUpdateMethod::Dynv6 => match status {
672 Status::Up => {
673 providers::Dynv6::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
674 .await?;
675 }
676 Status::Down => {
677 providers::Dynv6::delete_record(
678 &conf.update_credentials,
679 fqdn,
680 zone,
681 endpoint_ipaddr,
682 )
683 .await?;
684 }
685 Status::Unknown => {}
686 },
687
688 ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
689 run_nsupdates(
690 &conf.update_method,
691 &conf.update_resolvers,
692 srv,
693 fqdn,
694 &endpoint_ipaddr,
695 )
696 .await?;
697 }
698 ResolverUpdateMethod::Script { path } => {
699 run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
700 }
701 }
702 Ok(())
703}
704
705async fn fetch_status(chan_core_tx: CoreSend) -> Result<FqdnToStatuses> {
708 let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
710 let wu = CoreMsg::Fetch { chan_dash_tx };
711 chan_core_tx.send(wu).await?;
712 Ok(chan_dash_rx.await?)
713}
714
715async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
716 let resp = fetch_status(shared.chan_core_tx).await;
717 match resp {
718 Err(e) => {
719 error!("dashboard: failed to fetch: {e}");
720 axum::response::Html("Error".into())
721 }
722 Ok(r) => {
723 let dash = dash::render_html_dashboard(&r);
724 axum::response::Html(dash)
725 }
726 }
727}
728
729async fn handle_http_get_health() -> &'static str {
730 "ok"
731}
732
733async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
734 let wu = CoreMsg::Datapoint {
736 datapoint: Datapoint {
737 fqdn: msg.fqdn.to_string(),
738 ipaddr: msg.ipaddr,
739 status: msg.status,
740 tstamp: msg.tstamp,
741 node_addr: msg.node_addr,
742 },
743 };
744 if shared.chan_core_tx.send(wu).await.is_err() {
745 info!("[http] exiting");
746 };
748}
749
750#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
756pub struct APIListItem {
757 pub ipaddrs: HashMap<IpAddr, Status>,
758 dns_ttl: u32,
759 dns_zone: String,
760 probe_interval_ms: u128,
761 probe_target_url: String,
762}
763
764pub type APIListOut = BTreeMap<String, APIListItem>;
765
766async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
768 let resp = fetch_status(shared.chan_core_tx).await.unwrap();
769
770 let out = resp
771 .into_iter()
772 .map(|(fqdn, srvs)| {
773 let ipaddrs = srvs
775 .endpoint_statuses
776 .into_iter()
777 .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
778 .collect();
779
780 (
781 fqdn,
782 APIListItem {
783 ipaddrs,
784 dns_ttl: srvs.dns_ttl,
785 dns_zone: srvs.dns_zone,
786 probe_interval_ms: srvs.probe_interval.as_millis(),
787 probe_target_url: srvs.probe_target_url,
788 },
789 )
790 })
791 .collect();
792
793 Json(out)
794}
795
796#[derive(Debug, Serialize, Deserialize, Clone)]
800#[serde(tag = "method", rename_all = "snake_case")]
801pub enum APIRPCUpdateAction {
802 CreateService {
803 fqdn: String,
804 healthcheck: String,
805 ttl: u32,
806 zone: String,
807 probe_interval_ms: u64,
808 },
809 DeleteService {
810 fqdn: String,
811 },
812 AddIpAddr {
813 fqdn: String,
814 ipaddr: IpAddr,
815 },
816 DeleteIpAddr {
817 fqdn: String,
818 ipaddr: IpAddr,
819 },
820}
821
822#[derive(Deserialize, Serialize, Debug, Clone)]
823pub struct APIChangeRequest {
824 pub actions: Vec<APIRPCUpdateAction>,
825}
826
827#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
828#[serde(tag = "status", rename_all = "lowercase")]
829pub enum APIChangeResult {
830 Ok,
831 Error { error: String },
832}
833
834#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
835pub struct APIChangeResponse {
836 results: Vec<APIChangeResult>,
837}
838
839impl APIChangeResponse {
840 pub fn all_ok(&self) -> bool {
841 self.results
842 .iter()
843 .all(|r| matches!(r, APIChangeResult::Ok))
844 }
845}
846
847async fn handle_http_api_update(
849 State(shared): SharedState,
850 axum::Json(req): Json<APIChangeRequest>,
851) -> axum::Json<APIChangeResponse> {
852 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
853 let wu = CoreMsg::APIChangeRequestWrap {
854 request: req,
855 resp_chan_tx,
856 };
857 debug!("sending API posnt into chan_core_tx");
858 let _ = shared.chan_core_tx.send(wu).await;
859 debug!("waiting from resp_chan_rx");
860 let resp = resp_chan_rx
861 .await
862 .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
863
864 debug!("received a reply from resp_chan_rx");
865 Json(resp)
866}
867
868fn create_router(chan_core_tx: CoreSend, api_enabled: bool) -> axum::Router {
871 let r = axum::Router::new()
872 .route("/dash", axum::routing::get(handle_http_get_dashboard))
873 .route("/health", axum::routing::get(handle_http_get_health))
874 .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
875
876 if api_enabled {
877 r.route(
878 "/api/v0/list_endpoints",
879 axum::routing::get(handle_http_api_list),
880 )
881 .route(
882 "/api/v0/update_endpoints",
883 axum::routing::post(handle_http_api_update),
884 )
885 } else {
886 r
887 }
888 .with_state(Shared { chan_core_tx })
889}
890
891async fn wait_for_shutdown(token: CancellationToken) {
892 token.cancelled().await;
893 debug!("Shutdown token triggered. Axum server exiting.");
894}
895
896async fn run_axum_server(
897 http_listener: tokio::net::TcpListener,
898 http_router: axum::Router,
899 shutdown_token: CancellationToken,
900) {
901 if let Err(e) = axum::serve(http_listener, http_router)
902 .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
903 .await
904 {
905 error!("Axum server failed: {e}");
906 process::exit(1);
907 }
908}
909
910pub async fn init(
912 conf_fn: &str,
913 local_node: Option<String>,
914 tasks: &mut Vec<tokio::task::JoinHandle<()>>,
915) -> Result<CancellationToken> {
916 let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
918 error!("Failed to load config: {err}");
919 process::exit(1)
920 });
921
922 let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
924 let service_statuses = HashMap::new();
925
926 let (chan_core_tx, chan_core_rx): (CoreSend, CoreRec) = mpsc::channel(10);
928 let (chan_outbox_tx, chan_outbox_rx): (OutboxSend, OutboxRec) = mpsc::channel(100);
929
930 let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
932 tasks.push(tokio::spawn(sender));
933
934 let core_t = core::core(
936 statsd,
937 conf.clone(),
938 service_statuses,
939 chan_core_rx,
940 chan_outbox_tx,
941 );
942 tasks.push(tokio::spawn(core_t));
943
944 time::sleep(Duration::from_millis(100)).await;
945
946 let axum_shutdown = CancellationToken::new();
948 let shutdown_token_child = axum_shutdown.child_token();
949
950 debug!("creating listener");
952 let http_router = create_router(chan_core_tx.clone(), conf.api.enabled);
953 let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
954
955 debug!("starting axum");
957 let fut = tokio::spawn(run_axum_server(
958 http_listener,
959 http_router,
960 shutdown_token_child,
961 ));
962
963 tasks.push(fut);
964
965 debug!("starting probes");
967 setup_and_spawn_probes(conf, chan_core_tx).await?;
968 debug!("starting probes: done");
969
970 Ok(axum_shutdown)
971}
972
973pub async fn run() -> Result<()> {
974 JournalLog::new()
976 .context("Failed to create JournalLog")?
977 .install()
978 .context("Failed to setup journald logging")?;
979
980 log::set_max_level(LevelFilter::Info);
982 info!("[main] starting");
983
984 let mut tasks = Vec::new();
986 for s in [SignalKind::interrupt(), SignalKind::hangup()] {
987 tasks.push(tokio::spawn(handle_sig(s)));
988 }
989
990 let default_conf_fn = "/etc/rrdnsd.json";
991 let conf_fn = std::env::var("CONF").ok();
992 let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
993
994 let local_node = std::env::var("LOCAL_NODE").ok();
995
996 init(conf_fn, local_node, &mut tasks).await?;
997
998 join_all(tasks).await;
999 Ok(())
1000}
1001
1002async fn setup_and_spawn_probes(conf: Conf, chan_core_tx: Sender<CoreMsg>) -> Result<()> {
1004 for service in &conf.services {
1005 let probe_interval_ms = service
1006 .probe_interval_ms
1007 .or(conf.probe_interval_ms)
1008 .unwrap_or(1000);
1009
1010 let request = APIChangeRequest {
1011 actions: vec![APIRPCUpdateAction::CreateService {
1012 fqdn: service.fqdn.clone(),
1013 healthcheck: service.healthcheck.clone(),
1014 ttl: service.ttl,
1015 zone: service.zone.clone(),
1016 probe_interval_ms,
1017 }],
1018 };
1019 send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1020
1021 for ipaddr_str in &service.ipaddrs {
1022 let request = APIChangeRequest {
1023 actions: vec![APIRPCUpdateAction::AddIpAddr {
1024 fqdn: service.fqdn.clone(),
1025 ipaddr: ipaddr_str.parse()?,
1026 }],
1027 };
1028 send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1029 }
1030 }
1031 Ok(())
1032}
1033
1034async fn send_api_change_request_then_check_resp(
1035 chan_core_tx: &Sender<CoreMsg>,
1036 request: APIChangeRequest,
1037) -> Result<(), anyhow::Error> {
1038 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1039 let msg = CoreMsg::APIChangeRequestWrap {
1040 request: request.clone(),
1041 resp_chan_tx,
1042 };
1043 chan_core_tx.send(msg).await?;
1044 let resp = resp_chan_rx.await?;
1045
1046 for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1047 if let APIChangeResult::Error { error } = result {
1048 error!("Action {action:?} failed: {error}");
1049 }
1050 }
1051
1052 Ok(())
1053}