bindy/reconcilers/dnszone/
primary.rs1use anyhow::{anyhow, Result};
14use k8s_openapi::api::core::v1::Pod;
15use kube::{api::ListParams, Api, Client};
16use tracing::{debug, error, info, warn};
17
18use super::helpers::{get_endpoint, load_rndc_key};
19use super::types::PodInfo;
20use crate::bind9::RndcKeyData;
21
22pub async fn filter_primary_instances(
37 client: &Client,
38 instance_refs: &[crate::crd::InstanceReference],
39) -> Result<Vec<crate::crd::InstanceReference>> {
40 use crate::crd::{Bind9Instance, ServerRole};
41
42 let mut primary_refs = Vec::new();
43
44 for instance_ref in instance_refs {
45 let instance_api: Api<Bind9Instance> =
46 Api::namespaced(client.clone(), &instance_ref.namespace);
47
48 match instance_api.get(&instance_ref.name).await {
49 Ok(instance) => {
50 if instance.spec.role == ServerRole::Primary {
51 primary_refs.push(instance_ref.clone());
52 }
53 }
54 Err(e) => {
55 warn!(
56 "Failed to get instance {}/{}: {}. Skipping.",
57 instance_ref.namespace, instance_ref.name, e
58 );
59 }
60 }
61 }
62
63 Ok(primary_refs)
64}
65
66pub async fn find_all_primary_pods(
86 client: &Client,
87 namespace: &str,
88 cluster_name: &str,
89 is_cluster_provider: bool,
90) -> Result<Vec<PodInfo>> {
91 use crate::crd::{Bind9Instance, ServerRole};
92
93 let instance_api: Api<Bind9Instance> = if is_cluster_provider {
95 Api::all(client.clone())
96 } else {
97 Api::namespaced(client.clone(), namespace)
98 };
99 let instances = instance_api.list(&ListParams::default()).await?;
100
101 let mut primary_instances: Vec<(String, String)> = Vec::new();
103 for instance in instances.items {
104 if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Primary {
105 if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
106 primary_instances.push((name, ns));
107 }
108 }
109 }
110
111 if primary_instances.is_empty() {
112 let search_scope = if is_cluster_provider {
113 "all namespaces".to_string()
114 } else {
115 format!("namespace {namespace}")
116 };
117 return Err(anyhow!(
118 "No PRIMARY Bind9Instance resources found for cluster {cluster_name} in {search_scope}"
119 ));
120 }
121
122 info!(
123 "Found {} PRIMARY instance(s) for cluster {}: {:?}",
124 primary_instances.len(),
125 cluster_name,
126 primary_instances
127 );
128
129 let mut all_pod_infos = Vec::new();
130
131 for (instance_name, instance_namespace) in &primary_instances {
132 let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
134 let label_selector = format!("app=bind9,instance={instance_name}");
136 let lp = ListParams::default().labels(&label_selector);
137
138 let pods = pod_api.list(&lp).await?;
139
140 debug!(
141 "Found {} pod(s) for PRIMARY instance {}",
142 pods.items.len(),
143 instance_name
144 );
145
146 for pod in &pods.items {
147 let pod_name = pod
148 .metadata
149 .name
150 .as_ref()
151 .ok_or_else(|| anyhow!("Pod has no name"))?
152 .clone();
153
154 let pod_ip = pod
156 .status
157 .as_ref()
158 .and_then(|s| s.pod_ip.as_ref())
159 .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
160 .clone();
161
162 let phase = pod
164 .status
165 .as_ref()
166 .and_then(|s| s.phase.as_ref())
167 .map(String::as_str);
168
169 if phase == Some("Running") {
170 all_pod_infos.push(PodInfo {
171 name: pod_name.clone(),
172 ip: pod_ip.clone(),
173 instance_name: instance_name.clone(),
174 namespace: instance_namespace.clone(),
175 });
176 debug!(
177 "Found running pod {} with IP {} in namespace {}",
178 pod_name, pod_ip, instance_namespace
179 );
180 } else {
181 debug!(
182 "Skipping pod {} (phase: {:?}, not running)",
183 pod_name, phase
184 );
185 }
186 }
187 }
188
189 if all_pod_infos.is_empty() {
190 return Err(anyhow!(
191 "No running PRIMARY pods found for cluster {cluster_name} in namespace {namespace}"
192 ));
193 }
194
195 info!(
196 "Found {} running PRIMARY pod(s) across {} instance(s) for cluster {}",
197 all_pod_infos.len(),
198 primary_instances.len(),
199 cluster_name
200 );
201
202 Ok(all_pod_infos)
203}
204
205pub async fn find_primary_ips_from_instances(
223 client: &Client,
224 instance_refs: &[crate::crd::InstanceReference],
225) -> Result<Vec<String>> {
226 use crate::crd::{Bind9Instance, ServerRole};
227 use k8s_openapi::api::core::v1::Pod;
228
229 info!(
230 "Finding PRIMARY pod IPs from {} instance reference(s)",
231 instance_refs.len()
232 );
233
234 let mut primary_ips = Vec::new();
235
236 for instance_ref in instance_refs {
237 let instance_api: Api<Bind9Instance> =
239 Api::namespaced(client.clone(), &instance_ref.namespace);
240
241 let instance = match instance_api.get(&instance_ref.name).await {
242 Ok(inst) => inst,
243 Err(e) => {
244 warn!(
245 "Failed to get instance {}/{}: {}",
246 instance_ref.namespace, instance_ref.name, e
247 );
248 continue;
249 }
250 };
251
252 if instance.spec.role != ServerRole::Primary {
254 continue;
255 }
256
257 let pod_api: Api<Pod> = Api::namespaced(client.clone(), &instance_ref.namespace);
259 let label_selector = format!("app=bind9,instance={}", instance_ref.name);
260 let lp = ListParams::default().labels(&label_selector);
261
262 match pod_api.list(&lp).await {
263 Ok(pods) => {
264 for pod in pods.items {
265 if let Some(pod_ip) = pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) {
266 let phase = pod
268 .status
269 .as_ref()
270 .and_then(|s| s.phase.as_ref())
271 .map_or("Unknown", std::string::String::as_str);
272
273 if phase == "Running" {
274 primary_ips.push(pod_ip.clone());
275 debug!(
276 "Added IP {} from running PRIMARY pod {} (instance {}/{})",
277 pod_ip,
278 pod.metadata.name.as_ref().unwrap_or(&"unknown".to_string()),
279 instance_ref.namespace,
280 instance_ref.name
281 );
282 }
283 }
284 }
285 }
286 Err(e) => {
287 warn!(
288 "Failed to list pods for PRIMARY instance {}/{}: {}",
289 instance_ref.namespace, instance_ref.name, e
290 );
291 }
292 }
293 }
294
295 info!(
296 "Found total of {} PRIMARY pod IP(s) across all instances: {:?}",
297 primary_ips.len(),
298 primary_ips
299 );
300
301 Ok(primary_ips)
302}
303pub async fn for_each_primary_endpoint<F, Fut>(
338 client: &Client,
339 namespace: &str,
340 cluster_ref: &str,
341 is_cluster_provider: bool,
342 with_rndc_key: bool,
343 port_name: &str,
344 operation: F,
345) -> Result<(Option<String>, usize)>
346where
347 F: Fn(String, String, Option<RndcKeyData>) -> Fut,
348 Fut: std::future::Future<Output = Result<()>>,
349{
350 let primary_pods =
352 find_all_primary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
353
354 info!(
355 "Found {} PRIMARY pod(s) for cluster {}",
356 primary_pods.len(),
357 cluster_ref
358 );
359
360 let mut instance_tuples: Vec<(String, String)> = primary_pods
363 .iter()
364 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
365 .collect();
366 instance_tuples.sort();
367 instance_tuples.dedup();
368
369 info!(
370 "Found {} primary instance(s) for cluster {}: {:?}",
371 instance_tuples.len(),
372 cluster_ref,
373 instance_tuples
374 );
375
376 let mut first_endpoint: Option<String> = None;
377 let mut total_endpoints = 0;
378 let mut errors: Vec<String> = Vec::new();
379
380 for (instance_name, instance_namespace) in &instance_tuples {
384 info!(
385 "Getting endpoints for instance {}/{} in cluster {}",
386 instance_namespace, instance_name, cluster_ref
387 );
388
389 let key_data = if with_rndc_key {
392 Some(load_rndc_key(client, instance_namespace, instance_name).await?)
393 } else {
394 None
395 };
396
397 let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
400
401 info!(
402 "Found {} endpoint(s) for instance {}",
403 endpoints.len(),
404 instance_name
405 );
406
407 for endpoint in &endpoints {
408 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
409
410 if first_endpoint.is_none() {
412 first_endpoint = Some(pod_endpoint.clone());
413 }
414
415 if let Err(e) = operation(
418 pod_endpoint.clone(),
419 instance_name.clone(),
420 key_data.clone(),
421 )
422 .await
423 {
424 error!(
425 "Failed operation on endpoint {} (instance {}): {}",
426 pod_endpoint, instance_name, e
427 );
428 errors.push(format!(
429 "endpoint {pod_endpoint} (instance {instance_name}): {e}"
430 ));
431 } else {
432 total_endpoints += 1;
433 }
434 }
435 }
436
437 if !errors.is_empty() {
439 return Err(anyhow::anyhow!(
440 "Failed to process {} endpoint(s): {}",
441 errors.len(),
442 errors.join("; ")
443 ));
444 }
445
446 Ok((first_endpoint, total_endpoints))
447}
448
449#[cfg(test)]
450#[path = "primary_tests.rs"]
451mod primary_tests;