use anyhow::{Context, Result};
use axum::extract::{Json, State};
use chrono::Utc;
use futures::future::join_all;
use itertools::Itertools;
use log::{debug, error, info, warn, LevelFilter};
use reqwest::redirect::Policy;
use reqwest::Client as Reqw;
use sailfish::TemplateOnce;
use serde::{Deserialize, Serialize};
use statsd::client::Client as Statsd;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::{IpAddr, SocketAddr};
use std::process; use std::str;
use std::time::Instant;
use std::time::SystemTime;
use systemd_journal_logger::JournalLog;
use tokio::process::Command;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time;
use tokio::time::{sleep, Duration};
mod api_clients;
const VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(TemplateOnce)]
#[template(path = "dashboard.stpl")]
struct DashTemplate {
service_tbl: Vec<DashServiceRow>,
tstamp: String,
myversion: String,
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Copy)]
enum Status {
Unknown,
Down,
Up,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
struct Enst {
status: Status,
last_update_time: SystemTime,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct QuorumMsg {
format_version: u8,
fqdn: String,
ipaddr: IpAddr,
status: Status,
tstamp: SystemTime,
node_name: String,
}
type NodeToEnst = HashMap<String, Enst>;
#[derive(Debug, Clone)]
struct EndpointStatus {
ipaddr: IpAddr,
node_to_ens: NodeToEnst,
computed_status: Status,
computed_status_time: SystemTime,
}
#[derive(Debug)]
struct ServiceStatus {
dns_ttl: u32,
dns_zone: String,
endpoint_statuses: HashMap<IpAddr, EndpointStatus>,
}
type FqdnToStatuses = HashMap<String, ServiceStatus>;
#[derive(Debug)]
struct DashServiceRow {
computed_status: String,
fqdn: String,
ipaddr: String,
node_statuses: Vec<(String, f32)>,
}
#[derive(Debug)]
enum UpWrapKind {
Fetch,
Update,
}
#[derive(Debug)]
struct Update {
fqdn: String,
ipaddr: IpAddr,
status: Status,
tstamp: SystemTime,
node_name: String,
}
#[derive(Debug)]
struct UpWrap {
kind: UpWrapKind,
update: Option<Update>,
chan_response: Option<Sender<String>>,
}
type DataSend = Sender<String>;
type UpdateSend = Sender<UpWrap>;
type UpdateRec = Receiver<UpWrap>;
type OutboxSend = Sender<QuorumMsg>;
type OutboxRec = Receiver<QuorumMsg>;
macro_rules! unwrap_or_return {
($res:expr) => {
match $res {
Ok(val) => val,
Err(e) => {
error!("error: {}", e);
return;
}
}
};
}
macro_rules! exp {
($res:expr) => {
match $res {
Ok(val) => val,
Err(e) => {
error!("{:?}", e);
process::exit(1);
}
}
};
}
async fn handle_sig(sig: SignalKind) {
exp!(signal(sig)).recv().await;
info!("[main] exiting");
process::exit(0);
}
fn timer(statsd: &Statsd, name: &str, t0: Instant) {
let delta = t0.elapsed().as_millis() as f64;
statsd.timer(name, delta);
}
#[derive(Deserialize, Debug, Clone)]
struct ServiceConf {
fqdn: String,
healthcheck: String,
ipaddrs: Vec<String>,
ttl: u32,
zone: String,
}
#[derive(Deserialize, Debug, Clone)]
struct Conf {
conf_version: u8,
local_node: String,
nodes: Vec<String>,
probe_interval_ms: u64,
services: Vec<ServiceConf>,
update_method: String,
update_resolvers: Vec<String>,
update_credentials: String,
enable_fail_open: bool,
}
fn env_var(name: &str, fallback: &str) -> String {
match env::var_os(name) {
Some(v) => {
let v = exp!(v.into_string());
info!("[main] config overridden by envvar {name}: {v}");
v
}
None => fallback.to_owned(),
}
}
fn trim_dots(s: &str) -> String {
s.trim_start_matches('.').trim_end_matches('.').to_owned()
}
fn prepare_conf(jc: String) -> Result<Conf, &'static str> {
let mut conf: Conf = serde_json::from_str(&jc).map_err(|e| {
error!("{:?}", e);
"Unable to parse configuration"
})?;
debug!("[main] conf loaded");
if conf.conf_version != 1 {
return Err("Invalid configuration format version");
}
conf.local_node = env_var("LOCAL_NODE", &conf.local_node);
parse_ipaddr_port(&conf.local_node)?;
for n in &conf.nodes {
parse_ipaddr_port(n)?;
}
for n in &conf.update_resolvers {
parse_ipaddr_port(n)?;
}
for sv in &mut conf.services {
sv.fqdn = trim_dots(&sv.fqdn);
sv.zone = trim_dots(&sv.zone);
if sv.fqdn.ends_with(&sv.zone) {
continue;
}
error!("Invalid fqdn '{}' not matching zone '{}'", sv.fqdn, sv.zone);
return Err("Invalid fqdn");
}
Ok(conf)
}
fn load_conf() -> Result<Conf, &'static str> {
let conf_fn = env_var("CONF", "/etc/rrdnsd.json");
info!("[load_conf] reading {}", conf_fn);
let jc = fs::read_to_string(conf_fn).map_err(|e| {
error!("{:?}", e);
"Unable to read config file"
})?;
prepare_conf(jc)
}
async fn outbox_sender(mut chan_outbox_rx: OutboxRec, nodes: Vec<String>) {
let client_ = Reqw::builder()
.user_agent("rrdnsd")
.connect_timeout(Duration::new(5, 0))
.timeout(Duration::new(5, 0))
.tcp_keepalive(Duration::new(15, 0))
.build();
let client = unwrap_or_return!(client_);
while let Some(msg) = chan_outbox_rx.recv().await {
for node in &nodes {
let client = client.clone();
let msg = msg.clone();
let url = format!("http://{node}/qmsg");
let node = node.clone();
tokio::spawn(async move {
if let Err(e) = client.post(&url).json(&msg).send().await {
info!("Error sending msg to {}: {:?}", node, e)
}
});
}
}
info!("[outbox] exiting");
drop(chan_outbox_rx);
}
async fn probe(
interval: Duration,
local_node: String,
endpoint_ipaddr: IpAddr,
url: String,
fqdn: String,
ipaddr: IpAddr,
chan_outbox_tx: OutboxSend,
) {
let statsd = unwrap_or_return!(Statsd::new("127.0.0.1:8125", "rrdnsd"));
let endpoint_saddr = SocketAddr::new(endpoint_ipaddr, 0);
let client_ = Reqw::builder()
.user_agent("rrdnsd")
.connect_timeout(Duration::new(5, 0))
.timeout(Duration::new(5, 0))
.tcp_keepalive(Duration::new(15, 0))
.redirect(Policy::none())
.resolve(&fqdn, endpoint_saddr)
.build();
let client = unwrap_or_return!(client_);
debug!("[probe] starting");
let mut ticker = time::interval(interval);
loop {
let t0 = Instant::now();
ticker.tick().await;
let t1 = Instant::now();
debug!("[probe] probing {} at endpoint {}", &url, endpoint_ipaddr);
let resp = client.get(&url).send().await;
let success = resp.is_ok();
timer(&statsd, "probe.duration", t0);
if success {
statsd.incr("probe.success.cnt")
} else {
statsd.incr("probe.failure.cnt")
};
let msg = QuorumMsg {
format_version: 1,
fqdn: fqdn.clone(),
ipaddr,
status: if success { Status::Up } else { Status::Down },
tstamp: SystemTime::now(),
node_name: local_node.clone(),
};
match chan_outbox_tx.send(msg).await {
Ok(_) => (),
Err(_) => {
info!("[probe] send error");
drop(chan_outbox_tx);
return;
}
}
let loop_elapsed = t0.elapsed();
let probing = t1.elapsed();
statsd.timer("probe_time_msec", probing.as_millis() as f64);
let lf = probing.as_secs_f64() / loop_elapsed.as_secs_f64();
statsd.histogram("probe.load_factor", lf);
}
}
fn parse_ipaddr_port(n: &str) -> Result<(IpAddr, u16), &'static str> {
let mut chunks = n.split(':');
let (ipa_str, port_str) = (
chunks.next().ok_or("Invalid ipaddr:port format")?,
chunks.next().ok_or("Invalid ipaddr:port format")?,
);
let ipa = ipa_str
.parse::<IpAddr>()
.map_err(|_| "Invalid IP address")?;
let port = port_str.parse::<u16>().map_err(|_| "Invalid port number")?;
Ok((ipa, port))
}
fn create_nsupdate_blob(
resolver_ipaddr: IpAddr,
resolver_port: u16,
fqdn: &str,
zone: &str,
ttl: u32,
endpoint_ipaddr: IpAddr,
status: Status,
) -> String {
let record_type = if endpoint_ipaddr.is_ipv4() {
"A"
} else {
"AAAA"
};
let operation = match status {
Status::Down => format!("delete {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
Status::Up => format!("add {fqdn}. {ttl} {record_type} {endpoint_ipaddr}"),
Status::Unknown => "exit".to_string(),
};
let cmd = format!(
"\
server {resolver_ipaddr} {resolver_port}\n\
zone {zone}\n\
ttl {ttl}\n\
{operation}\n\
send\n\
exit\n"
);
cmd
}
async fn run_dns_update<'a>(
update_method: &str,
resolvers: &Vec<String>,
srv: &ServiceStatus,
fqdn: &str,
endpoint_ipaddr: &IpAddr,
) -> Result<(), &'static str> {
let endpoint_that_changed = srv
.endpoint_statuses
.get(endpoint_ipaddr)
.ok_or("Endpoint not found")?;
let status = endpoint_that_changed.computed_status;
let endpoint_ipaddr = endpoint_that_changed.ipaddr;
let nsupdate_fn = &env_var("NSUPDATE_FN", "/tmp/nsupdate.conf");
let cmd = match update_method {
"nsupdate" => "/usr/bin/nsupdate",
"knsupdate" => "/usr/bin/knsupdate",
"script" => "TODO",
_ => "TODO",
};
for resolver in resolvers {
let (r_ipaddr, r_port) = parse_ipaddr_port(resolver)?;
let nsupdate_blob = create_nsupdate_blob(
r_ipaddr,
r_port,
fqdn,
&srv.dns_zone,
srv.dns_ttl,
endpoint_ipaddr,
status,
);
info!(
"[dns] calling {} {} with commands:\n{}",
cmd, nsupdate_fn, nsupdate_blob
);
if fs::write(nsupdate_fn, nsupdate_blob).is_err() {
error!("Failed to write {nsupdate_fn}");
return Err("");
};
let out = Command::new(cmd).arg(nsupdate_fn).output().await;
match out {
Ok(_) => info!("[dns] success"),
Err(_) => info!("[dns] failure"),
};
}
Ok(())
}
async fn updater_fetch(
statsd: &Statsd,
_probe_interval: Duration,
service_statuses: &FqdnToStatuses,
chan_dash_tx: DataSend,
) {
statsd.incr("fetch.cnt");
let now = Utc::now().format("%Y-%m-%d %H:%M:%S");
let mut tpl = DashTemplate {
service_tbl: vec![],
tstamp: format!("{now}"),
myversion: VERSION.to_string(),
};
for (fqdn, srvs) in service_statuses.iter().sorted_by_key(|item| item.0) {
for (srv_ipaddr, ep) in srvs.endpoint_statuses.iter().sorted_by_key(|item| item.0) {
fn circle(s: Status) -> String {
(match s {
Status::Up => "🟢",
Status::Down => "🔴",
Status::Unknown => "â—¯",
})
.to_string()
}
let sym = circle(ep.computed_status);
let row = {
let mut row = DashServiceRow {
computed_status: sym,
fqdn: fqdn.to_string(),
ipaddr: format!("{}", srv_ipaddr.clone()),
node_statuses: vec![],
};
for (_, ens) in ep.node_to_ens.iter().sorted_by_key(|item| item.0) {
let cell = circle(ens.status);
let delay_s = ens
.last_update_time
.elapsed()
.unwrap_or(Duration::new(0, 0))
.as_secs_f32();
let opacity = {
if delay_s < 10.0 {
1.0 - delay_s / 15.0
} else {
0.25
}
};
row.node_statuses.push((cell, opacity));
}
row
};
tpl.service_tbl.push(row);
}
}
let resp = tpl
.render_once()
.unwrap_or("Failed to render HTML".to_owned());
if chan_dash_tx.send(resp).await.is_err() {
debug!("[updater] exiting");
};
}
#[must_use]
fn quorum(n: u16) -> u16 {
(n + 1) / 2
}
#[must_use]
fn compute_status(endp_statuses: &NodeToEnst) -> Status {
let mut up_cnt = 0;
let mut down_cnt = 0;
if endp_statuses.is_empty() {
return Status::Unknown;
}
let q = quorum(endp_statuses.len() as u16);
for (_, ens) in endp_statuses.iter() {
match ens.status {
Status::Up => up_cnt += 1,
Status::Down => down_cnt += 1,
Status::Unknown => (),
}
}
match (up_cnt >= q, down_cnt >= q) {
(true, false) => Status::Up,
(false, true) => Status::Down,
(true, true) => Status::Up, (false, false) => Status::Unknown, }
}
async fn ingest_datapoint(end: &mut EndpointStatus, msg: &Update) -> bool {
match end.node_to_ens.get_mut(&msg.node_name) {
Some(s) => {
s.status = msg.status;
s.last_update_time = msg.tstamp;
}
None => {
warn!("Received datapoint from unknown node {}", msg.node_name);
return false;
}
};
let prev_computed_status = end.computed_status;
end.computed_status = compute_status(&end.node_to_ens);
end.computed_status_time = SystemTime::now();
if prev_computed_status == Status::Unknown {
return false;
}
if prev_computed_status == end.computed_status {
return false;
}
true
}
#[must_use]
fn fail_open(estatuses: &HashMap<IpAddr, EndpointStatus>) -> bool {
let mut up = 0;
let mut down = 0;
let mut unknown = 0;
let q = quorum(estatuses.len() as u16);
for s in estatuses.values() {
match s.computed_status {
Status::Up => up += 1,
Status::Down => down += 1,
Status::Unknown => unknown += 1,
}
}
let msg = format!("fail-open for: up {up} down {down} unknown {unknown} q {q}");
{
if unknown >= q {
info!("{msg}: too many endpoints in unknown state");
true
} else if down >= q {
info!("{msg}: too many endpoints in down state");
true
} else if up <= q {
info!("{msg}: not enough endpoints in up state");
true
} else {
debug!("No {msg}");
false
}
}
}
async fn handle_endpoint_change(
conf: &Conf,
srv: &ServiceStatus,
endpoint_ipaddr: IpAddr,
fqdn: &str,
) {
let endpoint_that_changed = match srv.endpoint_statuses.get(&endpoint_ipaddr) {
Some(v) => v,
None => return,
};
let status = endpoint_that_changed.computed_status;
if (status == Status::Down) & conf.enable_fail_open & fail_open(&srv.endpoint_statuses) {
return;
}
if conf.update_method == "dynu" {
let c = api_clients::Dynu::new(conf.update_credentials.to_string());
let fqdn = fqdn.to_owned();
let zone = srv.dns_zone.clone();
tokio::spawn(async move {
let fqdn = &fqdn.clone();
match status {
Status::Up => c.add_record(fqdn, &zone, endpoint_ipaddr).await,
Status::Down => c.delete_record(fqdn, &zone, endpoint_ipaddr).await,
_ => (),
}
});
} else {
let _ = run_dns_update(
&conf.update_method,
&conf.update_resolvers,
srv,
fqdn,
&endpoint_ipaddr,
)
.await;
}
}
async fn updater_handle_one_update(
statsd: &Statsd,
conf: &Conf,
probe_interval: Duration,
upwrap: UpWrap,
service_statuses: &mut FqdnToStatuses,
) -> Result<()> {
match upwrap.kind {
UpWrapKind::Fetch => {
updater_fetch(
statsd,
probe_interval,
service_statuses,
upwrap.chan_response.unwrap(),
)
.await;
return Ok(());
}
UpWrapKind::Update => (),
}
statsd.incr("received_update.cnt");
let msg = upwrap.update.context("Empty update")?;
debug!("[updater] received a datapoint for {:?}", msg.ipaddr);
let endpoint_ipaddr = msg.ipaddr;
let srv = service_statuses
.get_mut(&msg.fqdn)
.context(format!("Unknown FQDN in quorum msg '{}'", msg.fqdn))?;
let end = srv
.endpoint_statuses
.get_mut(&endpoint_ipaddr)
.context("Unknown endpoind ipaddr in quorum msg")?;
let change = ingest_datapoint(end, &msg).await;
if !change {
return Ok(());
}
statsd.incr("status_change.cnt");
info!(
"[updater] Service '{}' endpoint {} changed to {:?}",
&msg.fqdn, endpoint_ipaddr, end.computed_status
);
handle_endpoint_change(conf, srv, endpoint_ipaddr, &msg.fqdn).await;
Ok(())
}
async fn updater(
statsd: Statsd,
conf: Conf,
mut service_statuses: FqdnToStatuses,
mut chan_update_rx: UpdateRec,
probe_interval: Duration,
) {
info!("[updater] started");
while let Some(upwrap) = chan_update_rx.recv().await {
let res = updater_handle_one_update(
&statsd,
&conf,
probe_interval,
upwrap,
&mut service_statuses,
)
.await;
if let Err(err) = &res {
warn!("[updater_handle_one_update] {}", err);
}
}
info!("[updater] exiting");
drop(chan_update_rx);
}
async fn handle_http_get_dashboard(
State(chan_update_tx): State<UpdateSend>,
) -> axum::response::Html<String> {
let (response_tx, mut response_rx) = mpsc::channel(1);
let wu = UpWrap {
kind: UpWrapKind::Fetch,
update: None,
chan_response: Some(response_tx),
};
let _ = chan_update_tx.send(wu).await;
let resp = match response_rx.recv().await {
Some(v) => v,
None => {
return axum::response::Html("error".to_owned());
}
};
axum::response::Html(resp)
}
async fn handle_http_get_health() -> &'static str {
"ok"
}
async fn handle_http_post_qmsg(
State(chan_update_tx): State<UpdateSend>,
Json(msg): Json<QuorumMsg>,
) {
let wu = UpWrap {
kind: UpWrapKind::Update,
chan_response: None,
update: Some(Update {
fqdn: msg.fqdn.to_string(),
ipaddr: msg.ipaddr,
status: msg.status,
tstamp: msg.tstamp,
node_name: msg.node_name.to_string(),
}),
};
if chan_update_tx.send(wu).await.is_err() {
info!("[http] exiting");
};
}
fn init_service_statuses(conf: &Conf) -> Result<HashMap<String, ServiceStatus>> {
let mut service_statuses: FqdnToStatuses = HashMap::new();
for service in &conf.services {
let srv = service_statuses
.entry(service.fqdn.clone())
.or_insert(ServiceStatus {
dns_ttl: service.ttl,
dns_zone: service.zone.clone(),
endpoint_statuses: HashMap::new(),
});
for ipaddr_str in &service.ipaddrs {
let endpoint_ipaddr = ipaddr_str
.parse()
.context("Unable to parse endpoint ipaddr")?;
let mut node_to_ens = HashMap::new();
for node_name in conf.nodes.iter().cloned() {
node_to_ens.insert(
node_name,
Enst {
status: Status::Unknown,
last_update_time: SystemTime::now(),
},
);
}
srv.endpoint_statuses
.entry(endpoint_ipaddr)
.or_insert(EndpointStatus {
ipaddr: endpoint_ipaddr,
node_to_ens,
computed_status_time: SystemTime::now(),
computed_status: Status::Unknown,
});
}
}
Ok(service_statuses)
}
#[tokio::main]
async fn main() -> Result<()> {
JournalLog::new()
.context("Failed to create JournalLog")?
.install()
.context("Failed to setup journald logging")?;
let statsd = Statsd::new("127.0.0.1:8125", "rrdnsd").unwrap();
log::set_max_level(LevelFilter::Info);
info!("[main] starting");
let mut tasks = Vec::new();
for s in [SignalKind::interrupt(), SignalKind::hangup()] {
tasks.push(tokio::spawn(handle_sig(s)));
}
let conf = load_conf().unwrap_or_else(|err| {
error!("{}", err);
process::exit(1)
});
let probe_interval = Duration::from_millis(conf.probe_interval_ms);
let service_statuses = init_service_statuses(&conf)?;
let (chan_update_tx, chan_update_rx): (UpdateSend, UpdateRec) = mpsc::channel(10);
let (chan_outbox_tx, chan_outbox_rx): (OutboxSend, OutboxRec) = mpsc::channel(100);
let sender = outbox_sender(chan_outbox_rx, conf.nodes.clone());
tasks.push(tokio::spawn(sender));
let updater_t = updater(
statsd,
conf.clone(),
service_statuses,
chan_update_rx,
probe_interval,
);
tasks.push(tokio::spawn(updater_t));
let stagger_interval = {
let mut cnt = 0;
for service in &conf.services {
for _ in &service.ipaddrs {
cnt += 1;
}
}
if cnt == 0 {
error!("No endpoints configured");
process::exit(1);
}
probe_interval / cnt
};
debug!(
"Starting probes with stagger interval {:?}",
stagger_interval
);
for service in &conf.services {
for ipaddr_str in &service.ipaddrs {
let endpoint_ipaddr = ipaddr_str.parse().context("Unable to parse ipaddr")?;
let url = service.healthcheck.replace("{}", &service.fqdn.to_string());
debug!(
"[main] Spawing probe for {} {} {}",
service.fqdn, service.zone, &url
);
let fqdn = service.fqdn.clone();
let prober = probe(
probe_interval,
conf.local_node.clone(),
endpoint_ipaddr,
url,
fqdn,
endpoint_ipaddr,
chan_outbox_tx.clone(),
);
tasks.push(tokio::spawn(prober));
sleep(stagger_interval).await;
}
}
let http_router = axum::Router::new()
.route("/dash", axum::routing::get(handle_http_get_dashboard))
.route("/health", axum::routing::get(handle_http_get_health))
.route("/qmsg", axum::routing::post(handle_http_post_qmsg))
.with_state(chan_update_tx);
let http_listener = tokio::net::TcpListener::bind(conf.local_node)
.await
.unwrap();
axum::serve(http_listener, http_router).await.unwrap();
join_all(tasks).await;
Ok(())
}
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn test_trim_dots() {
assert_eq!(trim_dots(".a.b."), "a.b");
}
#[test]
fn test_prepare_conf_empty() {
assert!(prepare_conf("".into()).is_err());
}
#[test]
fn test_prepare_conf() {
let cnf = include_str!("../rrdnsd.example.json").to_owned();
let c = prepare_conf(cnf);
assert!(&c.is_ok());
let c = c.unwrap();
assert_eq!(c.nodes.len(), 1);
assert_eq!(c.update_resolvers.len(), 1);
assert_eq!(c.conf_version, 1);
}
#[test]
fn test_parse_ipaddr_port() {
let v = parse_ipaddr_port("1.2.3.4:5");
assert!(v.is_ok());
assert_eq!(v.unwrap(), ("1.2.3.4".parse::<IpAddr>().unwrap(), 5));
}
#[test]
fn test_create_nsupdate_blob() {
let b = create_nsupdate_blob(
"1.2.3.4".parse().unwrap(),
5,
"my.example.org",
"example.org",
77,
"7.8.9.0".parse().unwrap(),
Status::Up,
);
let exp = "\
server 1.2.3.4 5\
\nzone example.org\
\nttl 77\
\nadd my.example.org. 77 A 7.8.9.0\
\nsend\
\nexit\n";
assert_eq!(b, exp);
}
#[test]
fn test_fail_open() {
fn create_est(inp: [Status; 3]) -> HashMap<IpAddr, EndpointStatus> {
let mut est = HashMap::new();
let ipa = "1.2.3.4".parse().unwrap();
let mut st = EndpointStatus {
ipaddr: ipa,
node_to_ens: HashMap::new(),
computed_status: Status::Up,
computed_status_time: SystemTime::now(),
};
est.insert(ipa, st.clone());
for (n, flag) in inp.iter().enumerate() {
let ipa = format!("1.2.3.{n}").parse().unwrap();
st.computed_status = *flag;
est.insert(ipa, st.clone());
}
est
}
let inp = [Status::Up, Status::Up, Status::Up];
assert!(!fail_open(&create_est(inp)));
let inp = [Status::Up, Status::Up, Status::Down];
assert!(!fail_open(&create_est(inp)));
let inp = [Status::Up, Status::Down, Status::Down];
assert!(fail_open(&create_est(inp)));
let inp = [Status::Down, Status::Down, Status::Down];
assert!(fail_open(&create_est(inp)));
let inp = [Status::Down, Status::Up, Status::Unknown];
assert!(fail_open(&create_est(inp)));
let inp = [Status::Up, Status::Unknown, Status::Unknown];
assert!(fail_open(&create_est(inp)));
}
#[test]
fn test_quorum() {
assert_eq!(quorum(2), 1);
assert_eq!(quorum(3), 2);
assert_eq!(quorum(4), 2);
assert_eq!(quorum(5), 3);
}
#[test]
fn test_compute_status() {
let mut h: NodeToEnst = HashMap::new();
assert_eq!(compute_status(&h), Status::Unknown);
h.insert(
"1".to_string(),
Enst {
status: Status::Up,
last_update_time: SystemTime::now(),
},
);
assert_eq!(compute_status(&h), Status::Up);
h.insert(
"2".to_string(),
Enst {
status: Status::Down,
last_update_time: SystemTime::now(),
},
);
assert_eq!(compute_status(&h), Status::Up);
h.insert(
"3".to_string(),
Enst {
status: Status::Down,
last_update_time: SystemTime::now(),
},
);
assert_eq!(compute_status(&h), Status::Down);
}
#[test]
fn t_create_nsupdate_blob() {
let nsupdate_blob = create_nsupdate_blob(
"127.0.0.1".parse::<IpAddr>().unwrap(),
999,
"mail.foo.com",
"foo.com",
1234,
"1.2.3.4".parse::<IpAddr>().unwrap(),
Status::Up,
);
let exp = "\
server 127.0.0.1 999
zone foo.com
ttl 1234
add mail.foo.com. 1234 A 1.2.3.4
send
exit
";
assert_eq!(nsupdate_blob, exp);
}
#[test]
fn test_init_service_statuses() {
let cnf = include_str!("../rrdnsd.example.json").to_owned();
let conf = prepare_conf(cnf).unwrap();
let st = init_service_statuses(&conf).unwrap();
let ipa = IpAddr::V4("127.0.0.2".parse::<Ipv4Addr>().unwrap());
let sest = &st["rrdnsd.test"];
assert_eq!(sest.dns_ttl, 5);
assert_eq!(sest.dns_zone, "rrdnsd.test".to_string());
assert_eq!(sest.endpoint_statuses.len(), 2);
let es = &sest.endpoint_statuses[&ipa];
assert_eq!(es.computed_status, Status::Unknown);
}
}