1use anyhow::anyhow;
6use log::{debug, error, info, warn};
7use statsd::client::Client as Statsd;
8use std::collections::HashMap;
9use std::time::SystemTime;
11
12use crate::*;
13async fn core_handle_one_msg(
20 statsd: &Statsd,
21 conf: &Conf,
22 coremsg: CoreMsg,
23 service_statuses: &mut FqdnToStatuses,
24 chan_outbox_tx: &OutboxSend,
25) -> Result<()> {
26 match coremsg {
27 CoreMsg::Fetch { chan_dash_tx } => {
29 core_handle_fetch(statsd, service_statuses, chan_dash_tx);
30 return Ok(());
31 }
32 CoreMsg::APIChangeRequestWrap {
34 request,
35 resp_chan_tx,
36 } => {
37 debug!("[core] processing {request:?}");
38
39 let results = request
40 .actions
41 .iter()
42 .map(
43 |action| match handle_change(conf, service_statuses, action, chan_outbox_tx) {
44 Ok(()) => APIChangeResult::Ok,
45 Err(e) => APIChangeResult::Error {
46 error: e.to_string(),
47 },
48 },
49 )
50 .collect();
51
52 let resp = APIChangeResponse { results };
53 if resp_chan_tx.send(resp).is_err() {
54 error!("[core] failed to send response");
55 }
56 return Ok(());
57 }
58
59 CoreMsg::Datapoint { datapoint } => {
61 statsd.incr("received_update.cnt");
64 let msg = datapoint; debug!("[core] received a datapoint for {:?}", msg.ipaddr);
67 let endpoint_ipaddr = msg.ipaddr;
68
69 let srv = service_statuses
70 .get_mut(&msg.fqdn)
71 .context(format!("Unknown FQDN in quorum msg '{}'", msg.fqdn))?;
72
73 let end = srv
74 .endpoint_statuses
75 .get_mut(&endpoint_ipaddr)
76 .context("Unknown endpoind ipaddr in quorum msg")?;
77
78 let change = ingest_datapoint(end, &msg);
80 if !change {
81 return Ok(());
82 }
83
84 statsd.incr("status_change.cnt");
87 info!(
88 "[core] Service '{}' endpoint {} changed to {:?}",
89 &msg.fqdn, endpoint_ipaddr, end.computed_status
90 );
91 let _ = handle_endpoint_change(conf, srv, endpoint_ipaddr, &msg.fqdn).await;
92 }
93 }
94 Ok(())
95}
96
97fn handle_change(
99 conf: &Conf,
100 service_statuses: &mut FqdnToStatuses,
101 action: &APIRPCUpdateAction,
102 chan_outbox_tx: &OutboxSend,
103) -> Result<()> {
104 match action {
105 APIRPCUpdateAction::CreateService {
107 fqdn,
108 healthcheck,
109 ttl,
110 zone,
111 probe_interval_ms,
112 } => {
113 if service_statuses.contains_key(fqdn) {
114 bail!("Service with FQDN {fqdn} already exists")
115 }
116 let probe_target_uri = healthcheck.replace("{}", fqdn);
117 let probe_interval = Duration::from_millis(*probe_interval_ms);
118
119 service_statuses.insert(
120 fqdn.to_string(),
121 ServiceStatus {
122 dns_ttl: *ttl,
123 dns_zone: zone.clone(),
124 endpoint_statuses: HashMap::new(),
125 probe_interval,
126 probe_target_url: probe_target_uri,
127 },
128 );
129
130 Ok(())
131 }
132
133 APIRPCUpdateAction::DeleteService { fqdn } => {
135 if service_statuses.remove(fqdn).is_some() {
136 Ok(())
137 } else {
138 bail!("Service with FQDN {fqdn} not found when deleting");
139 }
140 }
141
142 APIRPCUpdateAction::AddIpAddr { fqdn, ipaddr } => {
144 let service = service_statuses
145 .get_mut(fqdn)
146 .ok_or_else(|| anyhow!("Service with FQDN {fqdn} not found when adding ipaddr"))?;
147
148 let endpoint_ipaddr = ipaddr;
149
150 if service.endpoint_statuses.contains_key(endpoint_ipaddr) {
151 bail!("{ipaddr} is already listed for '{fqdn}'");
152 }
153
154 let cancel_probe = CancellationToken::new();
155 let probe_target_uri = service.probe_target_url.clone();
156 info!("[core] Adding {ipaddr} for {fqdn} and starting probe for {probe_target_uri}");
157
158 let node_to_ens = conf
159 .parsed_nodes
160 .iter()
161 .map(|&node| {
162 (
163 node,
164 Enst {
165 status: Status::Unknown,
166 last_update_time: SystemTime::now(),
167 },
168 )
169 })
170 .collect();
171
172 let es = EndpointStatus {
173 node_to_ens,
174 computed_status_time: SystemTime::now(),
175 computed_status: Status::Unknown,
176 probe_cancel: cancel_probe.clone(),
177 };
178
179 service.endpoint_statuses.insert(*endpoint_ipaddr, es);
180
181 let local_node = conf.parsed_local_node.unwrap();
183 let prober = probe(
184 service.probe_interval,
185 local_node,
186 *endpoint_ipaddr,
187 probe_target_uri,
188 fqdn.to_string(),
189 chan_outbox_tx.clone(),
190 cancel_probe,
191 );
192 tokio::spawn(prober);
193
194 Ok(())
195 }
196
197 APIRPCUpdateAction::DeleteIpAddr { fqdn, ipaddr } => {
199 let svc = service_statuses.get_mut(fqdn).ok_or_else(|| {
200 anyhow!("Service with FQDN {fqdn} not found when deleting ipaddr")
201 })?;
202
203 let endpoint_status = svc
204 .endpoint_statuses
205 .remove(ipaddr)
206 .ok_or_else(|| anyhow!("{ipaddr} not found for '{fqdn}'"))?;
207
208 endpoint_status.probe_cancel.cancel(); Ok(())
210 }
211 }
212}
213
214pub async fn core(
216 statsd: Statsd,
217 conf: Conf,
218 mut service_statuses: FqdnToStatuses,
219 mut chan_core_rx: CoreRec,
220 chan_outbox_tx: OutboxSend,
221) {
222 info!("[core] started");
223 while let Some(upwrap) = chan_core_rx.recv().await {
224 let res = core_handle_one_msg(
225 &statsd,
226 &conf,
227 upwrap,
228 &mut service_statuses,
229 &chan_outbox_tx,
230 )
231 .await;
232 if let Err(err) = &res {
233 warn!("[core] {err}");
234 }
235 }
236 info!("[core] exiting");
237 drop(chan_core_rx);
238}
239
240fn core_handle_fetch(
242 statsd: &Statsd,
243 service_statuses: &FqdnToStatuses,
244 chan_statusclone_tx: ServiceStatusCloneSend,
245) {
246 statsd.incr("fetch.cnt");
248
249 let payload: FqdnToStatuses = service_statuses.clone();
250 if chan_statusclone_tx.send(payload).is_err() {
251 error!("[core] failed to send response");
252 }
253}