bindy/reconcilers/dnszone/
primary.rs

1// Copyright (c) 2025 Erick Bourgeois, firestoned
2// SPDX-License-Identifier: MIT
3
4//! Primary zone instance operations.
5//!
6//! This module handles all operations specific to PRIMARY BIND9 instances,
7//! including:
8//! - Filtering instance references to only primary instances
9//! - Finding primary pods across instances
10//! - Collecting primary pod IPs
11//! - Executing operations on all primary endpoints
12
13use anyhow::{anyhow, Result};
14use k8s_openapi::api::core::v1::Pod;
15use kube::{api::ListParams, Api, Client};
16use tracing::{debug, error, info, warn};
17
18use super::helpers::{get_endpoint, load_rndc_key};
19use super::types::PodInfo;
20use crate::bind9::RndcKeyData;
21
22/// Filters a list of instance references to only PRIMARY instances.
23///
24/// # Arguments
25///
26/// * `client` - Kubernetes API client
27/// * `instance_refs` - Instance references to filter
28///
29/// # Returns
30///
31/// Vector of instance references that have role=Primary
32///
33/// # Errors
34///
35/// Returns an error if Kubernetes API calls fail.
36pub async fn filter_primary_instances(
37    client: &Client,
38    instance_refs: &[crate::crd::InstanceReference],
39) -> Result<Vec<crate::crd::InstanceReference>> {
40    use crate::crd::{Bind9Instance, ServerRole};
41
42    let mut primary_refs = Vec::new();
43
44    for instance_ref in instance_refs {
45        let instance_api: Api<Bind9Instance> =
46            Api::namespaced(client.clone(), &instance_ref.namespace);
47
48        match instance_api.get(&instance_ref.name).await {
49            Ok(instance) => {
50                if instance.spec.role == ServerRole::Primary {
51                    primary_refs.push(instance_ref.clone());
52                }
53            }
54            Err(e) => {
55                warn!(
56                    "Failed to get instance {}/{}: {}. Skipping.",
57                    instance_ref.namespace, instance_ref.name, e
58                );
59            }
60        }
61    }
62
63    Ok(primary_refs)
64}
65
66/// Find all PRIMARY pods for a given cluster or cluster provider.
67///
68/// Returns pod information including name, IP, instance name, and namespace
69/// for all running PRIMARY pods in the cluster.
70///
71/// # Arguments
72///
73/// * `client` - Kubernetes API client
74/// * `namespace` - Namespace to search in (if not cluster provider)
75/// * `cluster_name` - Name of the cluster
76/// * `is_cluster_provider` - Whether to search across all namespaces
77///
78/// # Returns
79///
80/// Vector of PodInfo for all running PRIMARY pods
81///
82/// # Errors
83///
84/// Returns an error if Kubernetes API operations fail
85pub async fn find_all_primary_pods(
86    client: &Client,
87    namespace: &str,
88    cluster_name: &str,
89    is_cluster_provider: bool,
90) -> Result<Vec<PodInfo>> {
91    use crate::crd::{Bind9Instance, ServerRole};
92
93    // First, find all Bind9Instance resources that belong to this cluster and have role=primary
94    let instance_api: Api<Bind9Instance> = if is_cluster_provider {
95        Api::all(client.clone())
96    } else {
97        Api::namespaced(client.clone(), namespace)
98    };
99    let instances = instance_api.list(&ListParams::default()).await?;
100
101    // Store tuples of (instance_name, instance_namespace)
102    let mut primary_instances: Vec<(String, String)> = Vec::new();
103    for instance in instances.items {
104        if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Primary {
105            if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
106                primary_instances.push((name, ns));
107            }
108        }
109    }
110
111    if primary_instances.is_empty() {
112        let search_scope = if is_cluster_provider {
113            "all namespaces".to_string()
114        } else {
115            format!("namespace {namespace}")
116        };
117        return Err(anyhow!(
118            "No PRIMARY Bind9Instance resources found for cluster {cluster_name} in {search_scope}"
119        ));
120    }
121
122    info!(
123        "Found {} PRIMARY instance(s) for cluster {}: {:?}",
124        primary_instances.len(),
125        cluster_name,
126        primary_instances
127    );
128
129    let mut all_pod_infos = Vec::new();
130
131    for (instance_name, instance_namespace) in &primary_instances {
132        // Now find all pods for this primary instance in its namespace
133        let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
134        // List pods with label selector matching the instance
135        let label_selector = format!("app=bind9,instance={instance_name}");
136        let lp = ListParams::default().labels(&label_selector);
137
138        let pods = pod_api.list(&lp).await?;
139
140        debug!(
141            "Found {} pod(s) for PRIMARY instance {}",
142            pods.items.len(),
143            instance_name
144        );
145
146        for pod in &pods.items {
147            let pod_name = pod
148                .metadata
149                .name
150                .as_ref()
151                .ok_or_else(|| anyhow!("Pod has no name"))?
152                .clone();
153
154            // Get pod IP
155            let pod_ip = pod
156                .status
157                .as_ref()
158                .and_then(|s| s.pod_ip.as_ref())
159                .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
160                .clone();
161
162            // Check if pod is running
163            let phase = pod
164                .status
165                .as_ref()
166                .and_then(|s| s.phase.as_ref())
167                .map(String::as_str);
168
169            if phase == Some("Running") {
170                all_pod_infos.push(PodInfo {
171                    name: pod_name.clone(),
172                    ip: pod_ip.clone(),
173                    instance_name: instance_name.clone(),
174                    namespace: instance_namespace.clone(),
175                });
176                debug!(
177                    "Found running pod {} with IP {} in namespace {}",
178                    pod_name, pod_ip, instance_namespace
179                );
180            } else {
181                debug!(
182                    "Skipping pod {} (phase: {:?}, not running)",
183                    pod_name, phase
184                );
185            }
186        }
187    }
188
189    if all_pod_infos.is_empty() {
190        return Err(anyhow!(
191            "No running PRIMARY pods found for cluster {cluster_name} in namespace {namespace}"
192        ));
193    }
194
195    info!(
196        "Found {} running PRIMARY pod(s) across {} instance(s) for cluster {}",
197        all_pod_infos.len(),
198        primary_instances.len(),
199        cluster_name
200    );
201
202    Ok(all_pod_infos)
203}
204
205/// Find primary server IPs from a list of instance references.
206///
207/// This is the NEW instance-based approach that replaces cluster-based lookup.
208/// It filters the instance refs to only PRIMARY instances, then gets their pod IPs.
209///
210/// # Arguments
211///
212/// * `client` - Kubernetes API client
213/// * `instance_refs` - List of instance references to search
214///
215/// # Returns
216///
217/// A vector of IP addresses for all running PRIMARY pods across all primary instances
218///
219/// # Errors
220///
221/// Returns an error if Kubernetes API calls fail or no primary pods are found
222pub async fn find_primary_ips_from_instances(
223    client: &Client,
224    instance_refs: &[crate::crd::InstanceReference],
225) -> Result<Vec<String>> {
226    use crate::crd::{Bind9Instance, ServerRole};
227    use k8s_openapi::api::core::v1::Pod;
228
229    info!(
230        "Finding PRIMARY pod IPs from {} instance reference(s)",
231        instance_refs.len()
232    );
233
234    let mut primary_ips = Vec::new();
235
236    for instance_ref in instance_refs {
237        // Get the Bind9Instance to check its role
238        let instance_api: Api<Bind9Instance> =
239            Api::namespaced(client.clone(), &instance_ref.namespace);
240
241        let instance = match instance_api.get(&instance_ref.name).await {
242            Ok(inst) => inst,
243            Err(e) => {
244                warn!(
245                    "Failed to get instance {}/{}: {}",
246                    instance_ref.namespace, instance_ref.name, e
247                );
248                continue;
249            }
250        };
251
252        // Skip if not a PRIMARY instance
253        if instance.spec.role != ServerRole::Primary {
254            continue;
255        }
256
257        // Get running pod IPs for this primary instance
258        let pod_api: Api<Pod> = Api::namespaced(client.clone(), &instance_ref.namespace);
259        let label_selector = format!("app=bind9,instance={}", instance_ref.name);
260        let lp = ListParams::default().labels(&label_selector);
261
262        match pod_api.list(&lp).await {
263            Ok(pods) => {
264                for pod in pods.items {
265                    if let Some(pod_ip) = pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()) {
266                        // Check if pod is running
267                        let phase = pod
268                            .status
269                            .as_ref()
270                            .and_then(|s| s.phase.as_ref())
271                            .map_or("Unknown", std::string::String::as_str);
272
273                        if phase == "Running" {
274                            primary_ips.push(pod_ip.clone());
275                            debug!(
276                                "Added IP {} from running PRIMARY pod {} (instance {}/{})",
277                                pod_ip,
278                                pod.metadata.name.as_ref().unwrap_or(&"unknown".to_string()),
279                                instance_ref.namespace,
280                                instance_ref.name
281                            );
282                        }
283                    }
284                }
285            }
286            Err(e) => {
287                warn!(
288                    "Failed to list pods for PRIMARY instance {}/{}: {}",
289                    instance_ref.namespace, instance_ref.name, e
290                );
291            }
292        }
293    }
294
295    info!(
296        "Found total of {} PRIMARY pod IP(s) across all instances: {:?}",
297        primary_ips.len(),
298        primary_ips
299    );
300
301    Ok(primary_ips)
302}
303/// Execute an operation on all endpoints of all primary instances in a cluster.
304///
305/// This helper function handles the common pattern of:
306/// 1. Finding all primary pods for a cluster
307/// 2. Collecting unique instance names
308/// 3. Optionally loading RNDC key from each instance
309/// 4. Getting endpoints for each instance
310/// 5. Executing a provided operation on each endpoint
311///
312/// # Arguments
313///
314/// * `client` - Kubernetes API client
315/// * `namespace` - Namespace of the cluster
316/// * `cluster_ref` - Name of the `Bind9Cluster` or `ClusterBind9Provider`
317/// * `is_cluster_provider` - Whether this is a cluster provider (cluster-scoped)
318/// * `with_rndc_key` - Whether to load RNDC key from each instance
319/// * `port_name` - Port name to use for endpoints (e.g., "rndc-api", "dns-tcp")
320/// * `operation` - Async closure to execute for each endpoint
321///   - Arguments: `(pod_endpoint: String, instance_name: String, rndc_key: Option<RndcKeyData>)`
322///   - Returns: `Result<()>`
323///
324/// # Returns
325///
326/// Returns `Ok((first_endpoint, total_count))` where:
327/// - `first_endpoint` - Optional first endpoint encountered (useful for NOTIFY operations)
328/// - `total_count` - Total number of endpoints processed successfully
329///
330/// # Errors
331///
332/// Returns error if:
333/// - No primary pods found for the cluster
334/// - Failed to load RNDC key (if requested)
335/// - Failed to get endpoints for any instance
336/// - The operation closure returns an error for any endpoint
337pub async fn for_each_primary_endpoint<F, Fut>(
338    client: &Client,
339    namespace: &str,
340    cluster_ref: &str,
341    is_cluster_provider: bool,
342    with_rndc_key: bool,
343    port_name: &str,
344    operation: F,
345) -> Result<(Option<String>, usize)>
346where
347    F: Fn(String, String, Option<RndcKeyData>) -> Fut,
348    Fut: std::future::Future<Output = Result<()>>,
349{
350    // Find all PRIMARY pods to get the unique instance names
351    let primary_pods =
352        find_all_primary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
353
354    info!(
355        "Found {} PRIMARY pod(s) for cluster {}",
356        primary_pods.len(),
357        cluster_ref
358    );
359
360    // Collect unique (instance_name, namespace) tuples from the primary pods
361    // Each instance may have multiple pods (replicas)
362    let mut instance_tuples: Vec<(String, String)> = primary_pods
363        .iter()
364        .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
365        .collect();
366    instance_tuples.sort();
367    instance_tuples.dedup();
368
369    info!(
370        "Found {} primary instance(s) for cluster {}: {:?}",
371        instance_tuples.len(),
372        cluster_ref,
373        instance_tuples
374    );
375
376    let mut first_endpoint: Option<String> = None;
377    let mut total_endpoints = 0;
378    let mut errors: Vec<String> = Vec::new();
379
380    // Loop through each primary instance and get its endpoints
381    // Important: With EmptyDir storage (per-pod, non-shared), each primary pod maintains its own
382    // zone files. We need to process ALL pods across ALL instances.
383    for (instance_name, instance_namespace) in &instance_tuples {
384        info!(
385            "Getting endpoints for instance {}/{} in cluster {}",
386            instance_namespace, instance_name, cluster_ref
387        );
388
389        // Load RNDC key for this specific instance if requested
390        // Each instance has its own RNDC secret for security isolation
391        let key_data = if with_rndc_key {
392            Some(load_rndc_key(client, instance_namespace, instance_name).await?)
393        } else {
394            None
395        };
396
397        // Get all endpoints for this instance's service
398        // The Endpoints API gives us pod IPs with their container ports (not service ports)
399        let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
400
401        info!(
402            "Found {} endpoint(s) for instance {}",
403            endpoints.len(),
404            instance_name
405        );
406
407        for endpoint in &endpoints {
408            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
409
410            // Save the first endpoint
411            if first_endpoint.is_none() {
412                first_endpoint = Some(pod_endpoint.clone());
413            }
414
415            // Execute the operation on this endpoint with this instance's RNDC key
416            // Continue processing remaining endpoints even if this one fails
417            if let Err(e) = operation(
418                pod_endpoint.clone(),
419                instance_name.clone(),
420                key_data.clone(),
421            )
422            .await
423            {
424                error!(
425                    "Failed operation on endpoint {} (instance {}): {}",
426                    pod_endpoint, instance_name, e
427                );
428                errors.push(format!(
429                    "endpoint {pod_endpoint} (instance {instance_name}): {e}"
430                ));
431            } else {
432                total_endpoints += 1;
433            }
434        }
435    }
436
437    // If any operations failed, return an error with all failures listed
438    if !errors.is_empty() {
439        return Err(anyhow::anyhow!(
440            "Failed to process {} endpoint(s): {}",
441            errors.len(),
442            errors.join("; ")
443        ));
444    }
445
446    Ok((first_endpoint, total_endpoints))
447}
448
449#[cfg(test)]
450#[path = "primary_tests.rs"]
451mod primary_tests;