bindy/reconcilers/dnszone/
secondary.rs1use anyhow::{anyhow, Result};
14use k8s_openapi::api::core::v1::Pod;
15use kube::{api::ListParams, Api, Client};
16use tracing::{debug, error, info, warn};
17
18use super::helpers::{get_endpoint, load_rndc_key};
19use super::types::PodInfo;
20use crate::bind9::RndcKeyData;
21
22pub async fn filter_secondary_instances(
37 client: &Client,
38 instance_refs: &[crate::crd::InstanceReference],
39) -> Result<Vec<crate::crd::InstanceReference>> {
40 use crate::crd::{Bind9Instance, ServerRole};
41
42 let mut secondary_refs = Vec::new();
43
44 for instance_ref in instance_refs {
45 let instance_api: Api<Bind9Instance> =
46 Api::namespaced(client.clone(), &instance_ref.namespace);
47
48 match instance_api.get(&instance_ref.name).await {
49 Ok(instance) => {
50 if instance.spec.role == ServerRole::Secondary {
51 secondary_refs.push(instance_ref.clone());
52 }
53 }
54 Err(e) => {
55 warn!(
56 "Failed to get instance {}/{}: {}. Skipping.",
57 instance_ref.namespace, instance_ref.name, e
58 );
59 }
60 }
61 }
62
63 Ok(secondary_refs)
64}
65
66pub async fn find_secondary_pod_ips_from_instances(
85 client: &Client,
86 instance_refs: &[crate::crd::InstanceReference],
87) -> Result<Vec<String>> {
88 use crate::crd::{Bind9Instance, ServerRole};
89 use k8s_openapi::api::core::v1::Pod;
90
91 let mut secondary_ips = Vec::new();
92
93 for instance_ref in instance_refs {
94 let instance_api: Api<Bind9Instance> =
96 Api::namespaced(client.clone(), &instance_ref.namespace);
97
98 let instance = match instance_api.get(&instance_ref.name).await {
99 Ok(inst) => inst,
100 Err(e) => {
101 warn!(
102 "Failed to get Bind9Instance {}/{}: {}. Skipping.",
103 instance_ref.namespace, instance_ref.name, e
104 );
105 continue;
106 }
107 };
108
109 if instance.spec.role != ServerRole::Secondary {
111 debug!(
112 "Skipping instance {}/{} - role is {:?}, not Secondary",
113 instance_ref.namespace, instance_ref.name, instance.spec.role
114 );
115 continue;
116 }
117
118 let pod_api: Api<Pod> = Api::namespaced(client.clone(), &instance_ref.namespace);
120 let label_selector = format!("app=bind9,instance={}", instance_ref.name);
121 let lp = ListParams::default().labels(&label_selector);
122
123 match pod_api.list(&lp).await {
124 Ok(pods) => {
125 for pod in pods.items {
126 if let Some(pod_ip) = pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) {
127 let phase = pod
129 .status
130 .as_ref()
131 .and_then(|s| s.phase.as_ref())
132 .map_or("Unknown", std::string::String::as_str);
133
134 if phase == "Running" {
135 secondary_ips.push(pod_ip.clone());
136 } else {
137 debug!(
138 "Skipping pod {} in phase {} for instance {}/{}",
139 pod.metadata.name.as_ref().unwrap_or(&"unknown".to_string()),
140 phase,
141 instance_ref.namespace,
142 instance_ref.name
143 );
144 }
145 }
146 }
147 }
148 Err(e) => {
149 warn!(
150 "Failed to list pods for instance {}/{}: {}. Skipping.",
151 instance_ref.namespace, instance_ref.name, e
152 );
153 }
154 }
155 }
156
157 Ok(secondary_ips)
158}
159
160async fn find_all_secondary_pods(
161 client: &Client,
162 namespace: &str,
163 cluster_name: &str,
164 is_cluster_provider: bool,
165) -> Result<Vec<PodInfo>> {
166 use crate::crd::{Bind9Instance, ServerRole};
167
168 let instance_api: Api<Bind9Instance> = if is_cluster_provider {
170 Api::all(client.clone())
171 } else {
172 Api::namespaced(client.clone(), namespace)
173 };
174 let instances = instance_api.list(&ListParams::default()).await?;
175
176 let mut secondary_instances: Vec<(String, String)> = Vec::new();
178 for instance in instances.items {
179 if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Secondary
180 {
181 if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
182 secondary_instances.push((name, ns));
183 }
184 }
185 }
186
187 if secondary_instances.is_empty() {
188 info!("No SECONDARY instances found for cluster {cluster_name}");
189 return Ok(Vec::new());
190 }
191
192 info!(
193 "Found {} SECONDARY instance(s) for cluster {}: {:?}",
194 secondary_instances.len(),
195 cluster_name,
196 secondary_instances
197 );
198
199 let mut all_pod_infos = Vec::new();
200
201 for (instance_name, instance_namespace) in &secondary_instances {
202 let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
204 let label_selector = format!("app=bind9,instance={instance_name}");
205 let lp = ListParams::default().labels(&label_selector);
206
207 let pods = pod_api.list(&lp).await?;
208
209 debug!(
210 "Found {} pod(s) for SECONDARY instance {}",
211 pods.items.len(),
212 instance_name
213 );
214
215 for pod in &pods.items {
216 let pod_name = pod
217 .metadata
218 .name
219 .as_ref()
220 .ok_or_else(|| anyhow!("Pod has no name"))?
221 .clone();
222
223 let pod_ip = pod
225 .status
226 .as_ref()
227 .and_then(|s| s.pod_ip.as_ref())
228 .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
229 .clone();
230
231 let phase = pod
233 .status
234 .as_ref()
235 .and_then(|s| s.phase.as_ref())
236 .map(String::as_str);
237
238 if phase == Some("Running") {
239 all_pod_infos.push(PodInfo {
240 name: pod_name.clone(),
241 ip: pod_ip.clone(),
242 instance_name: instance_name.clone(),
243 namespace: instance_namespace.clone(),
244 });
245 debug!(
246 "Found running secondary pod {} with IP {} in namespace {}",
247 pod_name, pod_ip, instance_namespace
248 );
249 } else {
250 debug!(
251 "Skipping secondary pod {} (phase: {:?}, not running)",
252 pod_name, phase
253 );
254 }
255 }
256 }
257
258 info!(
259 "Found {} running SECONDARY pod(s) across {} instance(s) for cluster {}",
260 all_pod_infos.len(),
261 secondary_instances.len(),
262 cluster_name
263 );
264
265 Ok(all_pod_infos)
266}
267
268pub async fn for_each_secondary_endpoint<F, Fut>(
312 client: &Client,
313 namespace: &str,
314 cluster_ref: &str,
315 is_cluster_provider: bool,
316 with_rndc_key: bool,
317 port_name: &str,
318 operation: F,
319) -> Result<(Option<String>, usize)>
320where
321 F: Fn(String, String, Option<RndcKeyData>) -> Fut,
322 Fut: std::future::Future<Output = Result<()>>,
323{
324 let secondary_pods =
326 find_all_secondary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
327
328 info!(
329 "Found {} SECONDARY pod(s) for cluster {}",
330 secondary_pods.len(),
331 cluster_ref
332 );
333
334 let mut instance_tuples: Vec<(String, String)> = secondary_pods
337 .iter()
338 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
339 .collect();
340 instance_tuples.sort();
341 instance_tuples.dedup();
342
343 info!(
344 "Found {} secondary instance(s) for cluster {}: {:?}",
345 instance_tuples.len(),
346 cluster_ref,
347 instance_tuples
348 );
349
350 let mut first_endpoint: Option<String> = None;
351 let mut total_endpoints = 0;
352 let mut errors: Vec<String> = Vec::new();
353
354 for (instance_name, instance_namespace) in &instance_tuples {
356 info!(
357 "Getting endpoints for secondary instance {}/{} in cluster {}",
358 instance_namespace, instance_name, cluster_ref
359 );
360
361 let key_data = if with_rndc_key {
364 Some(load_rndc_key(client, instance_namespace, instance_name).await?)
365 } else {
366 None
367 };
368
369 let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
372
373 info!(
374 "Found {} endpoint(s) for secondary instance {}",
375 endpoints.len(),
376 instance_name
377 );
378
379 for endpoint in &endpoints {
380 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
381
382 if first_endpoint.is_none() {
384 first_endpoint = Some(pod_endpoint.clone());
385 }
386
387 if let Err(e) = operation(
390 pod_endpoint.clone(),
391 instance_name.clone(),
392 key_data.clone(),
393 )
394 .await
395 {
396 error!(
397 "Failed operation on secondary endpoint {} (instance {}): {}",
398 pod_endpoint, instance_name, e
399 );
400 errors.push(format!(
401 "endpoint {pod_endpoint} (instance {instance_name}): {e}"
402 ));
403 } else {
404 total_endpoints += 1;
405 }
406 }
407 }
408
409 if !errors.is_empty() {
411 return Err(anyhow::anyhow!(
412 "Failed to process {} secondary endpoint(s): {}",
413 errors.len(),
414 errors.join("; ")
415 ));
416 }
417
418 Ok((first_endpoint, total_endpoints))
419}
420
421#[cfg(test)]
422#[path = "secondary_tests.rs"]
423mod secondary_tests;