1use anyhow::{bail, Context, Result};
6use axum::extract::{Json, State};
7use futures::future::join_all;
8use log::{debug, error, info, warn, LevelFilter};
10use reqwest::redirect::Policy;
11use reqwest::Client as Reqw;
12use serde::{Deserialize, Serialize};
13use statsd::client::Client as Statsd;
14use std::collections::BTreeMap;
15use std::collections::HashMap;
16use std::fs;
17use std::net::{IpAddr, SocketAddr};
18use std::process; use std::str;
20use std::time::Instant;
21use std::time::SystemTime;
22use systemd_journal_logger::JournalLog;
23use tokio::signal::unix::{signal, SignalKind};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::time;
27use tokio::time::Duration;
28use tokio_util::sync::CancellationToken;
29
30mod core;
31mod dash;
32mod providers;
33
34use crate::providers::SetProvider;
35
36#[cfg(test)]
37mod tests;
38
39const VERSION: &str = env!("CARGO_PKG_VERSION");
40
41#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, Copy)]
42pub enum Status {
43 Unknown,
44 Down,
45 Up,
46}
47
48#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
49struct Enst {
50 status: Status,
51 last_update_time: SystemTime,
52}
53
54#[derive(Debug, Serialize, Deserialize, Clone)]
57struct QuorumMsg {
58 format_version: u8,
59 fqdn: String,
60 ipaddr: IpAddr,
61 status: Status,
62 tstamp: SystemTime,
63 node_addr: NodeAddr,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Copy, Serialize, Deserialize)]
67struct NodeAddr {
68 ipa: IpAddr,
69 port: u16,
70}
71
72type NodeToEnst = HashMap<NodeAddr, Enst>;
90
91#[derive(Debug, Clone)]
92struct EndpointStatus {
93 node_to_ens: NodeToEnst,
94 computed_status: Status,
95 computed_status_time: SystemTime,
96 probe_cancel: tokio_util::sync::CancellationToken,
97}
98
99type EndpointStatuses = HashMap<IpAddr, EndpointStatus>;
100
101#[derive(Debug, Clone)]
102struct ServiceStatus {
103 dns_ttl: u32,
104 dns_zone: String,
105 endpoint_statuses: EndpointStatuses,
106 probe_interval: Duration,
107 probe_target_url: String,
108}
109
110type FqdnToStatuses = HashMap<String, ServiceStatus>;
111
112#[derive(Debug)]
114enum CoreMsg {
115 Datapoint {
116 datapoint: Datapoint,
117 },
118 Fetch {
119 chan_dash_tx: DashTx,
120 },
121 APIChangeRequestWrap {
122 request: APIChangeRequest,
123 resp_chan_tx: tokio::sync::oneshot::Sender<APIChangeResponse>,
124 },
125}
126
127#[derive(Debug)]
129struct Datapoint {
130 fqdn: String,
131 ipaddr: IpAddr,
132 status: Status,
133 tstamp: SystemTime,
134 node_addr: NodeAddr,
135}
136
137type DashTx = tokio::sync::oneshot::Sender<FqdnToStatuses>; type CoreTx = Sender<CoreMsg>; type CoreRx = Receiver<CoreMsg>; type OutboxTx = Sender<QuorumMsg>; type OutboxRx = Receiver<QuorumMsg>; #[derive(Debug, Clone)]
145struct Shared {
146 chan_core_tx: CoreTx,
147}
148type SharedState = axum::extract::State<Shared>;
149
150macro_rules! unwrap_or_return {
151 ($res:expr) => {
152 match $res {
153 Ok(val) => val,
154 Err(e) => {
155 error!("error: {}", e);
156 return;
157 }
158 }
159 };
160}
161
162macro_rules! exp {
164 ($res:expr) => {
165 match $res {
166 Ok(val) => val,
167 Err(e) => {
168 error!("{:?}", e);
169 process::exit(1);
170 }
171 }
172 };
173}
174
175async fn handle_sig(sig: SignalKind) {
179 exp!(signal(sig)).recv().await;
180 info!("[main] exiting");
181 process::exit(0);
182}
183
184fn timer(statsd: &Statsd, name: &str, t0: Instant) {
186 let delta = t0.elapsed().as_millis() as f64;
187 statsd.timer(name, delta);
188}
189
190#[derive(Deserialize, Debug, Clone)]
194struct ServiceConf {
195 fqdn: String,
196 healthcheck: String,
197 ipaddrs: Vec<String>,
198 ttl: u32,
199 zone: String,
200 probe_interval_ms: Option<u64>,
201}
202
203#[derive(Debug, Deserialize, Clone)]
206struct APIConfig {
207 enabled: bool,
208}
209
210#[derive(Deserialize, Debug, Clone)]
212#[serde(rename_all = "lowercase")]
213enum ResolverUpdateMethod {
214 Dynu,
215 Dynv6,
216 Nsupdate,
217 Knsupdate,
218 Script { path: String },
219}
220
221#[derive(Deserialize, Debug, Clone)]
223struct Conf {
224 conf_version: u8,
225 local_node: String,
226 #[serde(skip_deserializing)]
227 parsed_local_node: Option<NodeAddr>,
228 nodes: Vec<String>,
229 probe_interval_ms: Option<u64>,
230 #[serde(skip_deserializing)]
231 parsed_nodes: Vec<NodeAddr>,
232 services: Vec<ServiceConf>,
233 update_method: ResolverUpdateMethod,
234 update_resolvers: Vec<String>,
235 update_credentials: String,
237 enable_fail_open: bool,
238 api: APIConfig,
239}
240
241fn trim_dots(s: &str) -> String {
242 s.trim_start_matches('.').trim_end_matches('.').to_owned()
243}
244
245fn prepare_conf(jc: &str, local_node: Option<String>) -> Result<Conf> {
246 let mut conf: Conf = serde_json::from_str(jc).context("Unable to parse configuration")?;
247 debug!("[main] conf loaded");
248 if conf.conf_version != 1 {
249 bail!("Invalid configuration format version");
250 }
251 if let Some(ln) = local_node {
252 conf.local_node = ln;
253 }
254 conf.parsed_local_node = Some(parse_ipaddr_port(&conf.local_node)?);
255 conf.parsed_nodes = conf
256 .nodes
257 .iter()
258 .map(|n| parse_ipaddr_port(n).expect("Unable to parse {n}"))
259 .collect();
260 for n in &conf.update_resolvers {
261 parse_ipaddr_port(n)?;
262 }
263 for sv in &mut conf.services {
264 sv.fqdn = trim_dots(&sv.fqdn);
265 sv.zone = trim_dots(&sv.zone);
266 if sv.fqdn.ends_with(&sv.zone) {
267 continue;
268 }
269 bail!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
270 }
271 Ok(conf)
272}
273
274fn load_conf(conf_fn: &str, local_node: Option<String>) -> Result<Conf> {
276 info!("[load_conf] reading {conf_fn}");
277 let jc = fs::read_to_string(conf_fn).context("Unable to read config file")?;
278 prepare_conf(&jc, local_node)
279}
280
281async fn outbox_sender(mut chan_outbox_rx: OutboxRx, nodes: Vec<String>) {
283 let client_ = Reqw::builder()
286 .connect_timeout(Duration::new(5, 0))
287 .pool_max_idle_per_host(9)
288 .tcp_keepalive(Duration::new(15, 0))
289 .timeout(Duration::new(5, 0))
290 .user_agent("rrdnsd")
291 .build();
292 let client = unwrap_or_return!(client_);
293
294 while let Some(msg) = chan_outbox_rx.recv().await {
295 for node in &nodes {
296 let client = client.clone();
297 let msg = msg.clone();
298 let url = format!("http://{node}/qmsg");
299 let node = node.clone();
300 tokio::spawn(async move {
301 if let Err(e) = client.post(&url).json(&msg).send().await {
302 info!("Error sending msg to {node}: {e:?}");
303 }
304 });
306 }
307 }
308 info!("[outbox] exiting");
309 drop(chan_outbox_rx);
310}
311
312async fn probe(
316 interval: Duration,
317 local_node: NodeAddr,
318 endpoint_ipaddr: IpAddr,
319 target_uri: String,
320 fqdn: String,
321 chan_outbox_tx: OutboxTx,
322 cancel_probe: CancellationToken,
323) {
324 let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
328
329 let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
332
333 info!("[probe] creating probe for uri {target_uri} at ipaddr {endpoint_saddr}");
335 let client_ = Reqw::builder()
336 .connect_timeout(Duration::new(5, 0))
337 .pool_max_idle_per_host(2)
338 .redirect(Policy::none())
339 .resolve(&fqdn, endpoint_saddr)
340 .tcp_keepalive(Duration::new(15, 0))
341 .timeout(Duration::new(5, 0))
342 .user_agent("rrdnsd")
343 .build();
344 let client = unwrap_or_return!(client_);
345
346 debug!("[probe] starting");
347 let mut ticker = time::interval(interval);
348 loop {
349 if cancel_probe.is_cancelled() {
350 info!("[probe] exited probe for uri {target_uri} at ipaddr {endpoint_saddr}");
351 return;
352 }
353 let t0 = Instant::now();
354 ticker.tick().await;
355 let t1 = Instant::now();
356
357 debug!("[probe] probing {target_uri} at endpoint {endpoint_ipaddr}");
358 let resp = client.get(&target_uri).send().await;
359 let success = resp.is_ok();
360 timer(&statsd, "probe.duration", t0);
362 if success {
363 statsd.incr("probe.success.cnt");
365 } else {
366 statsd.incr("probe.failure.cnt");
368 }
369 let msg = QuorumMsg {
370 format_version: 1,
371 fqdn: fqdn.clone(),
372 ipaddr: endpoint_ipaddr,
373 status: if success { Status::Up } else { Status::Down },
374 tstamp: SystemTime::now(),
375 node_addr: local_node,
376 };
377 if chan_outbox_tx.send(msg).await.is_err() {
379 info!("[probe] send error");
380 drop(chan_outbox_tx);
381 return;
382 }
383
384 let loop_elapsed = t0.elapsed();
385 let probing = t1.elapsed();
386 statsd.timer("probe_time_msec", probing.as_millis() as f64);
388 let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
389 statsd.histogram("probe.load_factor", lf);
391 }
392}
393
394fn parse_ipaddr_port(n: &str) -> Result<NodeAddr> {
396 let (ipa_str, port_str) = n.rsplit_once(':').context("Invalid ipaddr:port format")?;
397 let ipa = ipa_str.parse::<IpAddr>().context("Invalid IP address")?;
398 let port = port_str.parse::<u16>().context("Invalid port number")?;
399 Ok(NodeAddr { ipa, port })
400}
401
402fn create_nsupdate_blob(
404 resolver_addr: NodeAddr,
405 fqdn: &str,
406 zone: &str,
407 ttl: u32,
408 endpoint_ipaddr: &IpAddr,
409 status: Status,
410) -> String {
411 let record_type = if endpoint_ipaddr.is_ipv4() {
412 "A"
413 } else {
414 "AAAA"
415 };
416
417 let operation = match status {
418 Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
419 Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
420 Status::Unknown => "exit".to_string(),
421 };
422 format!(
423 "server {ipa} {port}\nzone {zone}\nttl {ttl}\n{op}\nsend\nexit\n",
424 ipa = resolver_addr.ipa,
425 port = resolver_addr.port,
426 zone = zone,
427 ttl = ttl,
428 op = operation,
429 )
430}
431
432async fn run_nsupdates(
434 update_method: &ResolverUpdateMethod,
435 resolvers: &Vec<String>,
436 srv: &ServiceStatus,
437 fqdn: &str,
438 endpoint_ipaddr: &IpAddr,
439) -> Result<()> {
440 let endpoint_that_changed = srv
442 .endpoint_statuses
443 .get(endpoint_ipaddr)
444 .context("Endpoint not found")?;
445 let status = endpoint_that_changed.computed_status;
446
447 let cmd = match update_method {
448 ResolverUpdateMethod::Nsupdate => "/usr/bin/nsupdate",
449 ResolverUpdateMethod::Knsupdate => "/usr/bin/knsupdate",
450 _ => bail!("Incorrect function call"),
451 };
452
453 for resolver in resolvers {
454 let nsupdate_blob = create_nsupdate_blob(
455 parse_ipaddr_port(resolver)?,
456 fqdn,
457 &srv.dns_zone,
458 srv.dns_ttl,
459 endpoint_ipaddr,
460 status,
461 );
462 run_nsupdate_once(cmd, nsupdate_blob).await?;
463 }
464 Ok(())
465}
466
467#[cfg(not(test))]
469async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
470 use tokio::process::Command;
471 let tmpfile = tempfile::NamedTempFile::new().context("Failed to create tmpfile")?;
472 let conf_fn = tmpfile.path();
473 fs::write(conf_fn, &conf).context(format!("Failed to write {}", conf_fn.display()))?;
474 info!(
475 "[dns] calling {} {} with commands:\n{}",
476 cmd,
477 conf_fn.display(),
478 conf
479 );
480 let out = Command::new(cmd).arg(conf_fn).output().await;
481 match out {
482 Ok(_) => info!("[dns] success"),
483 Err(_) => warn!("[dns] failure"),
484 }
485 Ok(())
486}
487
488#[cfg(test)]
489#[allow(clippy::unused_async)]
490async fn run_nsupdate_once(cmd: &str, conf: String) -> Result<()> {
491 info!("--cmd--{cmd}--conf--\n{conf}--");
492 Ok(())
493}
494
495#[must_use]
499const fn quorum(n: u16) -> u16 {
500 n.div_ceil(2)
501}
502
503#[must_use]
505#[allow(clippy::cast_possible_truncation)]
506#[allow(clippy::match_same_arms)]
507fn compute_status(endp_statuses: &NodeToEnst) -> Status {
508 if endp_statuses.is_empty() {
509 return Status::Unknown;
510 }
511 let q = quorum(endp_statuses.len() as u16);
512 let up_cnt = endp_statuses
513 .values()
514 .filter(|ens| ens.status == Status::Up)
515 .count() as u16;
516 let down_cnt = endp_statuses
517 .values()
518 .filter(|ens| ens.status == Status::Down)
519 .count() as u16;
520
521 match (up_cnt >= q, down_cnt >= q) {
522 (true, false) => Status::Up,
523 (false, true) => Status::Down,
524 (true, true) => Status::Up, (false, false) => Status::Unknown, }
527}
528
529fn ingest_datapoint(end: &mut EndpointStatus, msg: &Datapoint) -> bool {
531 let Some(s) = end.node_to_ens.get_mut(&msg.node_addr) else {
532 warn!("Received datapoint from unknown node {:?}", &msg.node_addr);
533 return false;
534 };
535
536 s.status = msg.status;
537 s.last_update_time = msg.tstamp;
538
539 let prev_computed_status = end.computed_status;
541 end.computed_status = compute_status(&end.node_to_ens);
542 end.computed_status_time = SystemTime::now();
543
544 if prev_computed_status == Status::Unknown {
545 return false;
546 }
547 if prev_computed_status == end.computed_status {
548 return false;
549 }
550 true
551}
552
553#[must_use]
556fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
557 let up = estatuses
558 .values()
559 .filter(|s| s.computed_status == Status::Up)
560 .count();
561 let down = estatuses
562 .values()
563 .filter(|s| s.computed_status == Status::Down)
564 .count();
565 let unknown = estatuses
566 .values()
567 .filter(|s| s.computed_status == Status::Unknown)
568 .count();
569
570 let th = estatuses.len().div_ceil(2);
572 let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {th}");
573 {
574 if unknown >= th {
575 info!("{msg}: too many endpoints in unknown state");
576 true
577 } else if down >= th {
578 info!("{msg}: too many endpoints in down state");
579 true
580 } else if up <= th {
581 info!("{msg}: not enough endpoints in up state");
582 true
583 } else {
584 debug!("No {msg}");
585 false
586 }
587 }
588}
589
590async fn run_update_command(
592 script_path: &str,
593 endpoint_ipaddr: IpAddr,
594 fqdn: &str,
595 dns_zone: &str,
596 status: Status,
597) {
598 let args = [
599 &endpoint_ipaddr.to_string(),
600 fqdn,
601 dns_zone,
602 &format!("{status:?}"),
603 ];
604 info!("[script] running with {:?}", &args);
605
606 let mut cmd = tokio::process::Command::new(script_path);
607 cmd.args(args);
608 let Ok(output) = cmd.output().await else {
609 error!("[script] failed to execute '{script_path}'");
610 return;
611 };
612 if !output.status.success() {
613 error!("[script] failed with {}", output.status);
614 }
615 let stderr = String::from_utf8_lossy(&output.stderr);
616 let stdout = String::from_utf8_lossy(&output.stdout);
617 if !stderr.is_empty() {
618 error!("[script] stderr: {stderr}");
619 }
620 if !stdout.is_empty() {
621 info!("[script] output: {stdout}");
622 }
623}
624
625async fn handle_endpoint_change(
627 conf: &Conf,
628 srv: &ServiceStatus,
629 endpoint_ipaddr: IpAddr,
630 fqdn: &str,
631) -> Result<()> {
632 let Some(endpoint_that_changed) = srv.endpoint_statuses.get(&endpoint_ipaddr) else {
633 return Ok(());
634 };
635 let status = endpoint_that_changed.computed_status;
636 if status == Status::Unknown {
637 return Ok(());
638 }
639 if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
640 return Ok(());
643 }
644
645 let zone = &srv.dns_zone;
646 match &conf.update_method {
647 ResolverUpdateMethod::Dynu => match status {
649 Status::Up => {
650 providers::Dynu::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
651 .await?;
652 }
653 Status::Down => {
654 providers::Dynu::delete_record(
655 &conf.update_credentials,
656 fqdn,
657 zone,
658 endpoint_ipaddr,
659 )
660 .await?;
661 }
662 Status::Unknown => {}
663 },
664 ResolverUpdateMethod::Dynv6 => match status {
665 Status::Up => {
666 providers::Dynv6::add_record(&conf.update_credentials, fqdn, zone, endpoint_ipaddr)
667 .await?;
668 }
669 Status::Down => {
670 providers::Dynv6::delete_record(
671 &conf.update_credentials,
672 fqdn,
673 zone,
674 endpoint_ipaddr,
675 )
676 .await?;
677 }
678 Status::Unknown => {}
679 },
680
681 ResolverUpdateMethod::Nsupdate | ResolverUpdateMethod::Knsupdate => {
682 run_nsupdates(
683 &conf.update_method,
684 &conf.update_resolvers,
685 srv,
686 fqdn,
687 &endpoint_ipaddr,
688 )
689 .await?;
690 }
691 ResolverUpdateMethod::Script { path } => {
692 run_update_command(path, endpoint_ipaddr, fqdn, &srv.dns_zone, status).await;
693 }
694 }
695 Ok(())
696}
697
698async fn fetch_status(chan_core_tx: CoreTx) -> Result<FqdnToStatuses> {
701 let (chan_dash_tx, chan_dash_rx) = tokio::sync::oneshot::channel();
703 let wu = CoreMsg::Fetch { chan_dash_tx };
704 chan_core_tx.send(wu).await?;
705 Ok(chan_dash_rx.await?)
706}
707
708async fn handle_http_get_dashboard(State(shared): SharedState) -> axum::response::Html<String> {
709 let resp = fetch_status(shared.chan_core_tx).await;
710 match resp {
711 Err(e) => {
712 error!("dashboard: failed to fetch: {e}");
713 axum::response::Html("Error".into())
714 }
715 Ok(r) => {
716 let dash = dash::render_html_dashboard(&r);
717 axum::response::Html(dash)
718 }
719 }
720}
721
722async fn handle_http_get_health() -> &'static str {
723 "ok"
724}
725
726async fn handle_http_post_qmsg(State(shared): SharedState, Json(msg): Json<QuorumMsg>) {
727 let wu = CoreMsg::Datapoint {
729 datapoint: Datapoint {
730 fqdn: msg.fqdn.to_string(),
731 ipaddr: msg.ipaddr,
732 status: msg.status,
733 tstamp: msg.tstamp,
734 node_addr: msg.node_addr,
735 },
736 };
737 if shared.chan_core_tx.send(wu).await.is_err() {
738 info!("[http] exiting");
739 };
741}
742
743#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
749pub struct APIListItem {
750 pub ipaddrs: HashMap<IpAddr, Status>,
751 dns_ttl: u32,
752 dns_zone: String,
753 probe_interval_ms: u128,
754 probe_target_url: String,
755}
756
757pub type APIListOut = BTreeMap<String, APIListItem>;
758
759async fn handle_http_api_list(State(shared): SharedState) -> Json<APIListOut> {
761 let resp = fetch_status(shared.chan_core_tx).await.unwrap();
762
763 let out = resp
764 .into_iter()
765 .map(|(fqdn, srvs)| {
766 let ipaddrs = srvs
768 .endpoint_statuses
769 .into_iter()
770 .map(|(srv_ipaddr, ep)| (srv_ipaddr, ep.computed_status))
771 .collect();
772
773 (
774 fqdn,
775 APIListItem {
776 ipaddrs,
777 dns_ttl: srvs.dns_ttl,
778 dns_zone: srvs.dns_zone,
779 probe_interval_ms: srvs.probe_interval.as_millis(),
780 probe_target_url: srvs.probe_target_url,
781 },
782 )
783 })
784 .collect();
785
786 Json(out)
787}
788
789#[derive(Debug, Serialize, Deserialize, Clone)]
793#[serde(tag = "method", rename_all = "snake_case")]
794pub enum APIRPCUpdateAction {
795 CreateService {
796 fqdn: String,
797 healthcheck: String,
798 ttl: u32,
799 zone: String,
800 probe_interval_ms: u64,
801 },
802 DeleteService {
803 fqdn: String,
804 },
805 AddIpAddr {
806 fqdn: String,
807 ipaddr: IpAddr,
808 },
809 DeleteIpAddr {
810 fqdn: String,
811 ipaddr: IpAddr,
812 },
813}
814
815#[derive(Deserialize, Serialize, Debug, Clone)]
816pub struct APIChangeRequest {
817 pub actions: Vec<APIRPCUpdateAction>,
818}
819
820#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
821#[serde(tag = "status", rename_all = "lowercase")]
822pub enum APIChangeResult {
823 Ok,
824 Error { error: String },
825}
826
827#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
828pub struct APIChangeResponse {
829 results: Vec<APIChangeResult>,
830}
831
832impl APIChangeResponse {
833 pub fn all_ok(&self) -> bool {
834 self.results
835 .iter()
836 .all(|r| matches!(r, APIChangeResult::Ok))
837 }
838}
839
840async fn handle_http_api_update(
842 State(shared): SharedState,
843 axum::Json(req): Json<APIChangeRequest>,
844) -> axum::Json<APIChangeResponse> {
845 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
846 let wu = CoreMsg::APIChangeRequestWrap {
847 request: req,
848 resp_chan_tx,
849 };
850 debug!("sending API change request into chan_core_tx");
851 let _ = shared.chan_core_tx.send(wu).await;
852 debug!("waiting from resp_chan_rx");
853 let resp = resp_chan_rx
854 .await
855 .unwrap_or_else(|_| APIChangeResponse { results: vec![] });
856
857 debug!("received a reply from resp_chan_rx");
858 Json(resp)
859}
860
861fn create_router(chan_core_tx: CoreTx, api_enabled: bool) -> axum::Router {
864 let r = axum::Router::new()
865 .route("/dash", axum::routing::get(handle_http_get_dashboard))
866 .route("/health", axum::routing::get(handle_http_get_health))
867 .route("/qmsg", axum::routing::post(handle_http_post_qmsg));
868
869 if api_enabled {
870 r.route(
871 "/api/v0/list_endpoints",
872 axum::routing::get(handle_http_api_list),
873 )
874 .route(
875 "/api/v0/update_endpoints",
876 axum::routing::post(handle_http_api_update),
877 )
878 } else {
879 r
880 }
881 .with_state(Shared { chan_core_tx })
882}
883
884async fn wait_for_shutdown(token: CancellationToken) {
885 token.cancelled().await;
886 debug!("Shutdown token triggered. Axum server exiting.");
887}
888
889async fn run_axum_server(
890 http_listener: tokio::net::TcpListener,
891 http_router: axum::Router,
892 shutdown_token: CancellationToken,
893) {
894 if let Err(e) = axum::serve(http_listener, http_router)
895 .with_graceful_shutdown(wait_for_shutdown(shutdown_token))
896 .await
897 {
898 error!("Axum server failed: {e}");
899 process::exit(1);
900 }
901}
902
903pub async fn init(
905 conf_fn: &str,
906 local_node: Option<String>,
907 tasks: &mut Vec<tokio::task::JoinHandle<()>>,
908) -> Result<CancellationToken> {
909 let conf = load_conf(conf_fn, local_node).unwrap_or_else(|err| {
911 error!("Failed to load config: {err}");
912 process::exit(1)
913 });
914
915 let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd")?;
917 let service_statuses = HashMap::new();
918
919 let (chan_core_tx, chan_core_rx): (CoreTx, CoreRx) = mpsc::channel(10);
921 let (chan_outbox_tx, chan_outbox_rx): (OutboxTx, OutboxRx) = mpsc::channel(100);
922
923 let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
925 tasks.push(tokio::spawn(sender));
926
927 let core_t = core::core(
929 statsd,
930 conf.clone(),
931 service_statuses,
932 chan_core_rx,
933 chan_outbox_tx,
934 );
935 tasks.push(tokio::spawn(core_t));
936
937 time::sleep(Duration::from_millis(100)).await;
938
939 let axum_shutdown = CancellationToken::new();
941 let shutdown_token_child = axum_shutdown.child_token();
942
943 debug!("creating listener");
945 let http_router = create_router(chan_core_tx.clone(), conf.api.enabled);
946 let http_listener = tokio::net::TcpListener::bind(conf.local_node.clone()).await?;
947
948 debug!("starting axum");
950 let fut = tokio::spawn(run_axum_server(
951 http_listener,
952 http_router,
953 shutdown_token_child,
954 ));
955
956 tasks.push(fut);
957
958 debug!("starting probes");
960 setup_and_spawn_probes(conf, chan_core_tx).await?;
961 debug!("starting probes: done");
962
963 Ok(axum_shutdown)
964}
965
966pub async fn run() -> Result<()> {
967 JournalLog::new()
969 .context("Failed to create JournalLog")?
970 .install()
971 .context("Failed to setup journald logging")?;
972
973 log::set_max_level(LevelFilter::Info);
975 info!("[main] starting");
976
977 let mut tasks = Vec::new();
979 for s in [SignalKind::interrupt(), SignalKind::hangup()] {
980 tasks.push(tokio::spawn(handle_sig(s)));
981 }
982
983 let default_conf_fn = "/etc/rrdnsd.json";
984 let conf_fn = std::env::var("CONF").ok();
985 let conf_fn = conf_fn.as_deref().unwrap_or(default_conf_fn);
986
987 let local_node = std::env::var("LOCAL_NODE").ok();
988
989 init(conf_fn, local_node, &mut tasks).await?;
990
991 join_all(tasks).await;
992 Ok(())
993}
994
995async fn setup_and_spawn_probes(conf: Conf, chan_core_tx: Sender<CoreMsg>) -> Result<()> {
997 for service in &conf.services {
998 let probe_interval_ms = service
999 .probe_interval_ms
1000 .or(conf.probe_interval_ms)
1001 .unwrap_or(1000);
1002
1003 let request = APIChangeRequest {
1004 actions: vec![APIRPCUpdateAction::CreateService {
1005 fqdn: service.fqdn.clone(),
1006 healthcheck: service.healthcheck.clone(),
1007 ttl: service.ttl,
1008 zone: service.zone.clone(),
1009 probe_interval_ms,
1010 }],
1011 };
1012 send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1013
1014 for ipaddr_str in &service.ipaddrs {
1015 let request = APIChangeRequest {
1016 actions: vec![APIRPCUpdateAction::AddIpAddr {
1017 fqdn: service.fqdn.clone(),
1018 ipaddr: ipaddr_str.parse()?,
1019 }],
1020 };
1021 send_api_change_request_then_check_resp(&chan_core_tx, request).await?;
1022 }
1023 }
1024 Ok(())
1025}
1026
1027async fn send_api_change_request_then_check_resp(
1028 chan_core_tx: &Sender<CoreMsg>,
1029 request: APIChangeRequest,
1030) -> Result<(), anyhow::Error> {
1031 let (resp_chan_tx, resp_chan_rx) = tokio::sync::oneshot::channel();
1032 let msg = CoreMsg::APIChangeRequestWrap {
1033 request: request.clone(),
1034 resp_chan_tx,
1035 };
1036 chan_core_tx.send(msg).await?;
1037 let resp = resp_chan_rx.await?;
1038
1039 for (action, result) in request.actions.iter().zip(resp.results.iter()) {
1040 if let APIChangeResult::Error { error } = result {
1041 error!("Action {action:?} failed: {error}");
1042 }
1043 }
1044
1045 Ok(())
1046}