bindy/reconcilers/dnszone/
secondary.rs

1// Copyright (c) 2025 Erick Bourgeois, firestoned
2// SPDX-License-Identifier: MIT
3
4//! Secondary zone instance operations.
5//!
6//! This module handles all operations specific to SECONDARY BIND9 instances,
7//! including:
8//! - Filtering instance references to only secondary instances
9//! - Finding secondary pods across instances
10//! - Collecting secondary pod IPs
11//! - Executing operations on all secondary endpoints
12
13use anyhow::{anyhow, Result};
14use k8s_openapi::api::core::v1::Pod;
15use kube::{api::ListParams, Api, Client};
16use tracing::{debug, error, info, warn};
17
18use super::helpers::{get_endpoint, load_rndc_key};
19use super::types::PodInfo;
20use crate::bind9::RndcKeyData;
21
22/// Filters a list of instance references to only SECONDARY instances.
23///
24/// # Arguments
25///
26/// * `client` - Kubernetes API client
27/// * `instance_refs` - Instance references to filter
28///
29/// # Returns
30///
31/// Vector of instance references that have role=Secondary
32///
33/// # Errors
34///
35/// Returns an error if Kubernetes API calls fail.
36pub async fn filter_secondary_instances(
37    client: &Client,
38    instance_refs: &[crate::crd::InstanceReference],
39) -> Result<Vec<crate::crd::InstanceReference>> {
40    use crate::crd::{Bind9Instance, ServerRole};
41
42    let mut secondary_refs = Vec::new();
43
44    for instance_ref in instance_refs {
45        let instance_api: Api<Bind9Instance> =
46            Api::namespaced(client.clone(), &instance_ref.namespace);
47
48        match instance_api.get(&instance_ref.name).await {
49            Ok(instance) => {
50                if instance.spec.role == ServerRole::Secondary {
51                    secondary_refs.push(instance_ref.clone());
52                }
53            }
54            Err(e) => {
55                warn!(
56                    "Failed to get instance {}/{}: {}. Skipping.",
57                    instance_ref.namespace, instance_ref.name, e
58                );
59            }
60        }
61    }
62
63    Ok(secondary_refs)
64}
65
66/// Finds all pod IPs from a list of instance references, filtering by role.
67///
68/// Queries each `Bind9Instance` resource to determine its role, then collects
69/// pod IPs only from secondary instances. This is event-driven as it reacts
70/// to the current state of `Bind9Instance` resources rather than caching.
71///
72/// # Arguments
73///
74/// * `client` - Kubernetes API client
75/// * `instance_refs` - Instance references to query
76///
77/// # Returns
78///
79/// Vector of pod IP addresses from secondary instances only
80///
81/// # Errors
82///
83/// Returns an error if Kubernetes API calls fail.
84pub async fn find_secondary_pod_ips_from_instances(
85    client: &Client,
86    instance_refs: &[crate::crd::InstanceReference],
87) -> Result<Vec<String>> {
88    use crate::crd::{Bind9Instance, ServerRole};
89    use k8s_openapi::api::core::v1::Pod;
90
91    let mut secondary_ips = Vec::new();
92
93    for instance_ref in instance_refs {
94        // Query the Bind9Instance resource to check its role
95        let instance_api: Api<Bind9Instance> =
96            Api::namespaced(client.clone(), &instance_ref.namespace);
97
98        let instance = match instance_api.get(&instance_ref.name).await {
99            Ok(inst) => inst,
100            Err(e) => {
101                warn!(
102                    "Failed to get Bind9Instance {}/{}: {}. Skipping.",
103                    instance_ref.namespace, instance_ref.name, e
104                );
105                continue;
106            }
107        };
108
109        // Only collect IPs from secondary instances
110        if instance.spec.role != ServerRole::Secondary {
111            debug!(
112                "Skipping instance {}/{} - role is {:?}, not Secondary",
113                instance_ref.namespace, instance_ref.name, instance.spec.role
114            );
115            continue;
116        }
117
118        // Find pods for this secondary instance
119        let pod_api: Api<Pod> = Api::namespaced(client.clone(), &instance_ref.namespace);
120        let label_selector = format!("app=bind9,instance={}", instance_ref.name);
121        let lp = ListParams::default().labels(&label_selector);
122
123        match pod_api.list(&lp).await {
124            Ok(pods) => {
125                for pod in pods.items {
126                    if let Some(pod_ip) = pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) {
127                        // Check if pod is running
128                        let phase = pod
129                            .status
130                            .as_ref()
131                            .and_then(|s| s.phase.as_ref())
132                            .map_or("Unknown", std::string::String::as_str);
133
134                        if phase == "Running" {
135                            secondary_ips.push(pod_ip.clone());
136                        } else {
137                            debug!(
138                                "Skipping pod {} in phase {} for instance {}/{}",
139                                pod.metadata.name.as_ref().unwrap_or(&"unknown".to_string()),
140                                phase,
141                                instance_ref.namespace,
142                                instance_ref.name
143                            );
144                        }
145                    }
146                }
147            }
148            Err(e) => {
149                warn!(
150                    "Failed to list pods for instance {}/{}: {}. Skipping.",
151                    instance_ref.namespace, instance_ref.name, e
152                );
153            }
154        }
155    }
156
157    Ok(secondary_ips)
158}
159
160async fn find_all_secondary_pods(
161    client: &Client,
162    namespace: &str,
163    cluster_name: &str,
164    is_cluster_provider: bool,
165) -> Result<Vec<PodInfo>> {
166    use crate::crd::{Bind9Instance, ServerRole};
167
168    // Find all Bind9Instance resources with role=SECONDARY for this cluster
169    let instance_api: Api<Bind9Instance> = if is_cluster_provider {
170        Api::all(client.clone())
171    } else {
172        Api::namespaced(client.clone(), namespace)
173    };
174    let instances = instance_api.list(&ListParams::default()).await?;
175
176    // Store tuples of (instance_name, instance_namespace)
177    let mut secondary_instances: Vec<(String, String)> = Vec::new();
178    for instance in instances.items {
179        if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Secondary
180        {
181            if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
182                secondary_instances.push((name, ns));
183            }
184        }
185    }
186
187    if secondary_instances.is_empty() {
188        info!("No SECONDARY instances found for cluster {cluster_name}");
189        return Ok(Vec::new());
190    }
191
192    info!(
193        "Found {} SECONDARY instance(s) for cluster {}: {:?}",
194        secondary_instances.len(),
195        cluster_name,
196        secondary_instances
197    );
198
199    let mut all_pod_infos = Vec::new();
200
201    for (instance_name, instance_namespace) in &secondary_instances {
202        // Find all pods for this secondary instance in its namespace
203        let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
204        let label_selector = format!("app=bind9,instance={instance_name}");
205        let lp = ListParams::default().labels(&label_selector);
206
207        let pods = pod_api.list(&lp).await?;
208
209        debug!(
210            "Found {} pod(s) for SECONDARY instance {}",
211            pods.items.len(),
212            instance_name
213        );
214
215        for pod in &pods.items {
216            let pod_name = pod
217                .metadata
218                .name
219                .as_ref()
220                .ok_or_else(|| anyhow!("Pod has no name"))?
221                .clone();
222
223            // Get pod IP
224            let pod_ip = pod
225                .status
226                .as_ref()
227                .and_then(|s| s.pod_ip.as_ref())
228                .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
229                .clone();
230
231            // Check if pod is running
232            let phase = pod
233                .status
234                .as_ref()
235                .and_then(|s| s.phase.as_ref())
236                .map(String::as_str);
237
238            if phase == Some("Running") {
239                all_pod_infos.push(PodInfo {
240                    name: pod_name.clone(),
241                    ip: pod_ip.clone(),
242                    instance_name: instance_name.clone(),
243                    namespace: instance_namespace.clone(),
244                });
245                debug!(
246                    "Found running secondary pod {} with IP {} in namespace {}",
247                    pod_name, pod_ip, instance_namespace
248                );
249            } else {
250                debug!(
251                    "Skipping secondary pod {} (phase: {:?}, not running)",
252                    pod_name, phase
253                );
254            }
255        }
256    }
257
258    info!(
259        "Found {} running SECONDARY pod(s) across {} instance(s) for cluster {}",
260        all_pod_infos.len(),
261        secondary_instances.len(),
262        cluster_name
263    );
264
265    Ok(all_pod_infos)
266}
267
268/// Update `lastReconciledAt` timestamp for a zone in `Bind9Instance.status.selectedZones[]`.
269///
270/// This function implements the critical Phase 2 completion step: after successfully
271/// configuring a zone on an instance, we update the instance's status to signal that
272/// the zone is now reconciled and doesn't need reconfiguration on future reconciliations.
273///
274/// This prevents infinite reconciliation loops by ensuring the `DNSZone` watch mapper
275/// only triggers reconciliation when `lastReconciledAt == None`.
276///
277/// # Arguments
278///
279/// * `client` - Kubernetes API client
280/// * `instance_name` - Name of the `Bind9Instance`
281/// * `instance_namespace` - Namespace of the `Bind9Instance`
282/// * `zone_name` - Name of the `DNSZone` resource
283/// * `zone_namespace` - Namespace of the `DNSZone` resource
284///
285/// Execute an operation on all SECONDARY endpoints for a cluster.
286///
287/// Similar to `for_each_primary_endpoint`, but operates on SECONDARY instances.
288/// Useful for triggering zone transfers or other secondary-specific operations.
289///
290/// # Arguments
291///
292/// * `client` - Kubernetes API client
293/// * `namespace` - Namespace to search for instances
294/// * `cluster_ref` - Cluster reference name
295/// * `is_cluster_provider` - Whether this is a cluster provider (cluster-scoped)
296/// * `with_rndc_key` - Whether to load and pass RNDC keys for each instance
297/// * `port_name` - Port name to use for endpoints (e.g., "rndc-api", "dns-tcp")
298/// * `operation` - Async closure to execute for each endpoint
299///
300/// # Returns
301///
302/// * `Ok((first_endpoint, total_endpoints))` - First endpoint found and total count
303///
304/// # Errors
305///
306/// Returns an error if:
307/// - Failed to find secondary pods
308/// - Failed to load RNDC keys
309/// - Failed to get service endpoints
310/// - The operation closure returns an error for any endpoint
311pub async fn for_each_secondary_endpoint<F, Fut>(
312    client: &Client,
313    namespace: &str,
314    cluster_ref: &str,
315    is_cluster_provider: bool,
316    with_rndc_key: bool,
317    port_name: &str,
318    operation: F,
319) -> Result<(Option<String>, usize)>
320where
321    F: Fn(String, String, Option<RndcKeyData>) -> Fut,
322    Fut: std::future::Future<Output = Result<()>>,
323{
324    // Find all SECONDARY pods to get the unique instance names
325    let secondary_pods =
326        find_all_secondary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
327
328    info!(
329        "Found {} SECONDARY pod(s) for cluster {}",
330        secondary_pods.len(),
331        cluster_ref
332    );
333
334    // Collect unique (instance_name, namespace) tuples from the secondary pods
335    // Each instance may have multiple pods (replicas)
336    let mut instance_tuples: Vec<(String, String)> = secondary_pods
337        .iter()
338        .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
339        .collect();
340    instance_tuples.sort();
341    instance_tuples.dedup();
342
343    info!(
344        "Found {} secondary instance(s) for cluster {}: {:?}",
345        instance_tuples.len(),
346        cluster_ref,
347        instance_tuples
348    );
349
350    let mut first_endpoint: Option<String> = None;
351    let mut total_endpoints = 0;
352    let mut errors: Vec<String> = Vec::new();
353
354    // Loop through each secondary instance and get its endpoints
355    for (instance_name, instance_namespace) in &instance_tuples {
356        info!(
357            "Getting endpoints for secondary instance {}/{} in cluster {}",
358            instance_namespace, instance_name, cluster_ref
359        );
360
361        // Load RNDC key for this specific instance if requested
362        // Each instance has its own RNDC secret for security isolation
363        let key_data = if with_rndc_key {
364            Some(load_rndc_key(client, instance_namespace, instance_name).await?)
365        } else {
366            None
367        };
368
369        // Get all endpoints for this instance's service
370        // The Endpoints API gives us pod IPs with their container ports (not service ports)
371        let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
372
373        info!(
374            "Found {} endpoint(s) for secondary instance {}",
375            endpoints.len(),
376            instance_name
377        );
378
379        for endpoint in &endpoints {
380            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
381
382            // Save the first endpoint
383            if first_endpoint.is_none() {
384                first_endpoint = Some(pod_endpoint.clone());
385            }
386
387            // Execute the operation on this endpoint with this instance's RNDC key
388            // Continue processing remaining endpoints even if this one fails
389            if let Err(e) = operation(
390                pod_endpoint.clone(),
391                instance_name.clone(),
392                key_data.clone(),
393            )
394            .await
395            {
396                error!(
397                    "Failed operation on secondary endpoint {} (instance {}): {}",
398                    pod_endpoint, instance_name, e
399                );
400                errors.push(format!(
401                    "endpoint {pod_endpoint} (instance {instance_name}): {e}"
402                ));
403            } else {
404                total_endpoints += 1;
405            }
406        }
407    }
408
409    // If any operations failed, return an error with all failures listed
410    if !errors.is_empty() {
411        return Err(anyhow::anyhow!(
412            "Failed to process {} secondary endpoint(s): {}",
413            errors.len(),
414            errors.join("; ")
415        ));
416    }
417
418    Ok((first_endpoint, total_endpoints))
419}
420
421#[cfg(test)]
422#[path = "secondary_tests.rs"]
423mod secondary_tests;