1use anyhow::anyhow;
6use log::{debug, error, info, warn};
7use statsd::client::Client as Statsd;
8use std::collections::HashMap;
9use std::time::SystemTime;
11
12use crate::*;
13async fn updater_handle_one_update(
20 statsd: &Statsd,
21 conf: &Conf,
22 upwrap: UpWrap,
23 service_statuses: &mut FqdnToStatuses,
24 chan_outbox_tx: &OutboxSend,
25) -> Result<()> {
26 match upwrap {
27 UpWrap::Fetch { chan_dash_tx } => {
29 updater_fetch(statsd, service_statuses, chan_dash_tx);
30 return Ok(());
31 }
32 UpWrap::APIChangeRequestWrap {
34 request,
35 resp_chan_tx,
36 } => {
37 debug!("[updater] processing {request:?}");
39
40 let results = request
41 .actions
42 .iter()
43 .map(
44 |action| match handle_change(conf, service_statuses, action, chan_outbox_tx) {
45 Ok(()) => APIChangeResult::Ok,
46 Err(e) => APIChangeResult::Error {
47 error: e.to_string(),
48 },
49 },
50 )
51 .collect();
52
53 let resp = APIChangeResponse { results };
54 if resp_chan_tx.send(resp).is_err() {
55 error!("[updater] failed to send response");
56 }
57 return Ok(());
58 }
59
60 UpWrap::Update { update } => {
62 statsd.incr("received_update.cnt");
65 let msg = update; debug!("[updater] received a datapoint for {:?}", msg.ipaddr);
68 let endpoint_ipaddr = msg.ipaddr;
69
70 let srv = service_statuses
71 .get_mut(&msg.fqdn)
72 .context(format!("Unknown FQDN in quorum msg '{}'", msg.fqdn))?;
73
74 let end = srv
75 .endpoint_statuses
76 .get_mut(&endpoint_ipaddr)
77 .context("Unknown endpoind ipaddr in quorum msg")?;
78
79 let change = ingest_datapoint(end, &msg);
81 if !change {
82 return Ok(());
83 }
84
85 statsd.incr("status_change.cnt");
88 info!(
89 "[updater] Service '{}' endpoint {} changed to {:?}",
90 &msg.fqdn, endpoint_ipaddr, end.computed_status
91 );
92 handle_endpoint_change(conf, srv, endpoint_ipaddr, &msg.fqdn).await;
93 }
94 }
95 Ok(())
96}
97
98fn handle_change(
100 conf: &Conf,
101 service_statuses: &mut FqdnToStatuses,
102 action: &APIRPCUpdateAction,
103 chan_outbox_tx: &OutboxSend,
104) -> Result<()> {
105 match action {
106 APIRPCUpdateAction::CreateService {
108 fqdn,
109 healthcheck,
110 ttl,
111 zone,
112 probe_interval_ms,
113 } => {
114 if service_statuses.contains_key(fqdn) {
115 bail!("Service with FQDN {fqdn} already exists")
116 }
117 let probe_target_uri = healthcheck.replace("{}", fqdn);
118 let probe_interval = Duration::from_millis(*probe_interval_ms);
119
120 service_statuses.insert(
121 fqdn.to_string(),
122 ServiceStatus {
123 dns_ttl: *ttl,
124 dns_zone: zone.clone(),
125 endpoint_statuses: HashMap::new(),
126 probe_interval,
127 probe_target_url: probe_target_uri,
128 },
129 );
130
131 Ok(())
132 }
133
134 APIRPCUpdateAction::DeleteService { fqdn } => {
136 if service_statuses.remove(fqdn).is_some() {
137 Ok(())
138 } else {
139 bail!("Service with FQDN {fqdn} not found when deleting");
140 }
141 }
142
143 APIRPCUpdateAction::AddIpAddr { fqdn, ipaddr } => {
145 let service = service_statuses
146 .get_mut(fqdn)
147 .ok_or_else(|| anyhow!("Service with FQDN {fqdn} not found when adding ipaddr"))?;
148
149 let endpoint_ipaddr = ipaddr;
150
151 if service.endpoint_statuses.contains_key(endpoint_ipaddr) {
152 bail!("{ipaddr} is already listed for '{fqdn}'");
153 }
154
155 let cancel_probe = CancellationToken::new();
156 let probe_target_uri = service.probe_target_url.clone();
157 info!("[updater] Adding {ipaddr} for {fqdn} and starting probe for {probe_target_uri}");
158
159 let node_to_ens = conf
160 .parsed_nodes
161 .iter()
162 .map(|&node| {
163 (
164 node,
165 Enst {
166 status: Status::Unknown,
167 last_update_time: SystemTime::now(),
168 },
169 )
170 })
171 .collect();
172
173 let es = EndpointStatus {
174 node_to_ens,
175 computed_status_time: SystemTime::now(),
176 computed_status: Status::Unknown,
177 probe_cancel: cancel_probe.clone(),
178 };
179
180 service.endpoint_statuses.insert(*endpoint_ipaddr, es);
181
182 let local_node = conf.parsed_local_node.unwrap();
184 let prober = probe(
185 service.probe_interval,
186 local_node,
187 *endpoint_ipaddr,
188 probe_target_uri,
189 fqdn.to_string(),
190 chan_outbox_tx.clone(),
191 cancel_probe,
192 );
193 tokio::spawn(prober);
194
195 Ok(())
196 }
197
198 APIRPCUpdateAction::DeleteIpAddr { fqdn, ipaddr } => {
200 let svc = service_statuses.get_mut(fqdn).ok_or_else(|| {
201 anyhow!("Service with FQDN {fqdn} not found when deleting ipaddr")
202 })?;
203
204 let endpoint_status = svc
205 .endpoint_statuses
206 .remove(ipaddr)
207 .ok_or_else(|| anyhow!("{ipaddr} not found for '{fqdn}'"))?;
208
209 endpoint_status.probe_cancel.cancel(); Ok(())
211 }
212 }
213}
214
215pub async fn updater(
217 statsd: Statsd,
218 conf: Conf,
219 mut service_statuses: FqdnToStatuses,
220 mut chan_update_rx: UpdateRec,
221 chan_outbox_tx: OutboxSend,
222) {
223 info!("[updater] started");
224 while let Some(upwrap) = chan_update_rx.recv().await {
225 let res = updater_handle_one_update(
226 &statsd,
227 &conf,
228 upwrap,
229 &mut service_statuses,
230 &chan_outbox_tx,
231 )
232 .await;
233 if let Err(err) = &res {
234 warn!("[updater_handle_one_update] {err}");
235 }
236 }
237 info!("[updater] exiting");
238 drop(chan_update_rx);
239}
240
241fn updater_fetch(
243 statsd: &Statsd,
244 service_statuses: &FqdnToStatuses,
245 chan_statusclone_tx: ServiceStatusCloneSend,
246) {
247 statsd.incr("fetch.cnt");
249
250 let payload: FqdnToStatuses = service_statuses.clone();
251 if chan_statusclone_tx.send(payload).is_err() {
252 error!("[updater] failed to send response");
253 }
254}