bindy/reconcilers/dnszone/
helpers.rs

1// Copyright (c) 2025 Erick Bourgeois, firestoned
2// SPDX-License-Identifier: MIT
3
4//! Helper functions for DNS zone reconciliation.
5//!
6//! This module contains the validation and change detection helper functions
7//! extracted from the main reconcile_dnszone() function to improve maintainability.
8
9use crate::crd::{DNSZone, InstanceReference};
10use anyhow::{anyhow, Context as AnyhowContext, Result};
11use k8s_openapi::api::core::v1::{Endpoints, Secret};
12use kube::{Api, Client};
13use std::collections::{HashMap, HashSet};
14use tracing::{error, info};
15
16use super::types::{DuplicateZoneInfo, EndpointAddress};
17use crate::bind9::RndcKeyData;
18
19/// Re-fetch a DNSZone to get the latest status.
20///
21/// The `dnszone` parameter from the watch event might have stale status from the cache.
22/// We need the latest `status.bind9Instances` which may have been updated by the
23/// Bind9Instance reconciler.
24///
25/// # Arguments
26/// * `client` - Kubernetes client
27/// * `namespace` - Namespace of the DNSZone
28/// * `name` - Name of the DNSZone
29///
30/// # Returns
31/// The freshly fetched DNSZone with current status
32///
33/// # Errors
34/// Returns an error if the Kubernetes API call fails
35pub async fn refetch_zone(client: &Client, namespace: &str, name: &str) -> Result<DNSZone> {
36    let zones_api: kube::Api<DNSZone> = kube::Api::namespaced(client.clone(), namespace);
37    let zone = zones_api.get(name).await?;
38    Ok(zone)
39}
40
41/// Handle duplicate zone conflicts by setting Ready=False and stopping reconciliation.
42///
43/// When a duplicate zone is detected, this function:
44/// 1. Logs a warning with details about the conflict
45/// 2. Updates the status with Ready=False and DuplicateZone condition
46/// 3. Applies the status to the API server
47///
48/// # Arguments
49/// * `client` - Kubernetes client
50/// * `namespace` - Namespace of the conflicting DNSZone
51/// * `name` - Name of the conflicting DNSZone
52/// * `duplicate_info` - Information about the duplicate zone conflict
53/// * `status_updater` - Status updater to apply the condition
54///
55/// # Errors
56/// Returns an error if the status update fails
57pub async fn handle_duplicate_zone(
58    client: &Client,
59    namespace: &str,
60    name: &str,
61    duplicate_info: &DuplicateZoneInfo,
62    status_updater: &mut crate::reconcilers::status::DNSZoneStatusUpdater,
63) -> Result<()> {
64    tracing::warn!(
65        "Duplicate zone detected: {}/{} cannot claim '{}' because it is already configured by: {:?}",
66        namespace, name, duplicate_info.zone_name, duplicate_info.conflicting_zones
67    );
68
69    // Build list of conflicting zones in namespace/name format
70    let conflicting_zone_refs: Vec<String> = duplicate_info
71        .conflicting_zones
72        .iter()
73        .map(|z| format!("{}/{}", z.namespace, z.name))
74        .collect();
75
76    // Set Ready=False with DuplicateZone reason
77    status_updater.set_duplicate_zone_condition(&duplicate_info.zone_name, &conflicting_zone_refs);
78
79    // Apply status and stop processing
80    status_updater.apply(client).await?;
81
82    Ok(())
83}
84
85/// Detect if the zone spec has changed since last reconciliation.
86///
87/// Compares current generation with observed generation to determine
88/// if this is first reconciliation or if spec changed.
89///
90/// # Arguments
91///
92/// * `zone` - The DNSZone resource
93///
94/// # Returns
95///
96/// Tuple of (first_reconciliation, spec_changed)
97#[must_use]
98pub fn detect_spec_changes(zone: &DNSZone) -> (bool, bool) {
99    let current_generation = zone.metadata.generation;
100    let observed_generation = zone.status.as_ref().and_then(|s| s.observed_generation);
101
102    let first_reconciliation = observed_generation.is_none();
103    let spec_changed =
104        crate::reconcilers::should_reconcile(current_generation, observed_generation);
105
106    (first_reconciliation, spec_changed)
107}
108
109/// Detect if the instance list changed between watch event and re-fetch.
110///
111/// This is critical for detecting when:
112/// 1. New instances are added to `status.bind9Instances` (via `bind9InstancesFrom` selectors)
113/// 2. Instance `lastReconciledAt` timestamps are cleared (e.g., instance deleted, needs reconfiguration)
114///
115/// NOTE: `InstanceReference` `PartialEq` ignores `lastReconciledAt`, so we must check timestamps separately!
116///
117/// # Arguments
118///
119/// * `namespace` - Namespace for logging
120/// * `name` - Zone name for logging
121/// * `watch_instances` - Instances from the watch event that triggered reconciliation
122/// * `current_instances` - Instances after re-fetching (current state)
123///
124/// # Returns
125///
126/// `true` if instances changed (list or timestamps), `false` otherwise
127pub fn detect_instance_changes(
128    namespace: &str,
129    name: &str,
130    watch_instances: Option<&Vec<InstanceReference>>,
131    current_instances: &[InstanceReference],
132) -> bool {
133    let Some(watch_instances) = watch_instances else {
134        // No instances in watch event, first reconciliation or error
135        return true;
136    };
137
138    // Get the instance names from the watch event (what triggered us)
139    let watch_instance_names: HashSet<_> = watch_instances.iter().map(|r| &r.name).collect();
140
141    // Get the instance names after re-fetching (current state)
142    let current_instance_names: HashSet<_> = current_instances.iter().map(|r| &r.name).collect();
143
144    // Check if instance list changed (added/removed instances)
145    let list_changed = watch_instance_names != current_instance_names;
146
147    if list_changed {
148        info!(
149            "Instance list changed during reconciliation for zone {}/{}: watch_event={:?}, current={:?}",
150            namespace, name, watch_instance_names, current_instance_names
151        );
152        return true;
153    }
154
155    // List is the same, but check if any lastReconciledAt timestamps changed
156    // Use InstanceReference as HashMap key (uses its Hash impl which hashes identity fields)
157    let watch_timestamps: HashMap<&InstanceReference, Option<&str>> = watch_instances
158        .iter()
159        .map(|inst| (inst, inst.last_reconciled_at.as_deref()))
160        .collect();
161
162    let current_timestamps: HashMap<&InstanceReference, Option<&str>> = current_instances
163        .iter()
164        .map(|inst| (inst, inst.last_reconciled_at.as_deref()))
165        .collect();
166
167    let timestamps_changed = watch_timestamps.iter().any(|(inst_ref, watch_ts)| {
168        current_timestamps
169            .get(inst_ref)
170            .is_some_and(|current_ts| current_ts != watch_ts)
171    });
172
173    if timestamps_changed {
174        info!(
175            "Instance lastReconciledAt timestamps changed for zone {}/{}",
176            namespace, name
177        );
178    }
179
180    timestamps_changed
181}
182
183//
184// ============================================================
185// Endpoint and Instance Utilities
186// ============================================================
187//
188
189/// Execute an operation on all endpoints for a list of instance references.
190///
191/// This is the event-driven instance-based approach that operates on instances
192/// discovered via spec.bind9InstancesFrom selectors.
193///
194/// # Arguments
195///
196/// * `client` - Kubernetes API client
197/// * `instance_refs` - List of instance references to process
198/// * `with_rndc_key` - Whether to load and pass RNDC keys for each instance
199/// * `port_name` - Port name to use for endpoints (e.g., "rndc-api", "dns-tcp")
200/// * `operation` - Async closure to execute for each endpoint
201///
202/// # Returns
203///
204/// * `Ok((first_endpoint, total_endpoints))` - First endpoint found and total count
205///
206/// # Errors
207///
208/// Returns an error if all operations fail or if critical API calls fail.
209pub async fn for_each_instance_endpoint<F, Fut>(
210    client: &Client,
211    instance_refs: &[crate::crd::InstanceReference],
212    with_rndc_key: bool,
213    port_name: &str,
214    operation: F,
215) -> Result<(Option<String>, usize)>
216where
217    F: Fn(String, String, Option<RndcKeyData>) -> Fut,
218    Fut: std::future::Future<Output = Result<()>>,
219{
220    let mut first_endpoint: Option<String> = None;
221    let mut total_endpoints = 0;
222    let mut errors: Vec<String> = Vec::new();
223
224    for instance_ref in instance_refs {
225        info!(
226            "Processing endpoints for instance {}/{}",
227            instance_ref.namespace, instance_ref.name
228        );
229
230        // Load RNDC key for this specific instance if requested
231        let key_data = if with_rndc_key {
232            Some(load_rndc_key(client, &instance_ref.namespace, &instance_ref.name).await?)
233        } else {
234            None
235        };
236
237        // Get all endpoints for this instance's service
238        let endpoints = get_endpoint(
239            client,
240            &instance_ref.namespace,
241            &instance_ref.name,
242            port_name,
243        )
244        .await?;
245
246        info!(
247            "Found {} endpoint(s) for instance {}/{}",
248            endpoints.len(),
249            instance_ref.namespace,
250            instance_ref.name
251        );
252
253        for endpoint in &endpoints {
254            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
255
256            // Save the first endpoint
257            if first_endpoint.is_none() {
258                first_endpoint = Some(pod_endpoint.clone());
259            }
260
261            // Execute the operation on this endpoint
262            if let Err(e) = operation(
263                pod_endpoint.clone(),
264                instance_ref.name.clone(),
265                key_data.clone(),
266            )
267            .await
268            {
269                error!(
270                    "Failed operation on endpoint {} (instance {}/{}): {}",
271                    pod_endpoint, instance_ref.namespace, instance_ref.name, e
272                );
273                errors.push(format!(
274                    "endpoint {pod_endpoint} (instance {}/{}): {e}",
275                    instance_ref.namespace, instance_ref.name
276                ));
277            } else {
278                total_endpoints += 1;
279            }
280        }
281    }
282
283    // If ALL operations failed, return an error
284    if total_endpoints == 0 && !errors.is_empty() {
285        return Err(anyhow!(
286            "All operations failed. Errors: {}",
287            errors.join("; ")
288        ));
289    }
290
291    Ok((first_endpoint, total_endpoints))
292}
293
294/// Load RNDC key from the instance's secret.
295///
296/// # Arguments
297///
298/// * `client` - Kubernetes API client
299/// * `namespace` - Namespace of the instance
300/// * `instance_name` - Name of the instance
301///
302/// # Returns
303///
304/// Parsed RNDC key data
305///
306/// # Errors
307///
308/// Returns an error if the secret is not found or cannot be parsed
309pub async fn load_rndc_key(
310    client: &Client,
311    namespace: &str,
312    instance_name: &str,
313) -> Result<RndcKeyData> {
314    let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
315    let secret_name = format!("{instance_name}-rndc-key");
316
317    let secret = secret_api.get(&secret_name).await.context(format!(
318        "Failed to get RNDC secret {secret_name} in namespace {namespace}"
319    ))?;
320
321    let data = secret
322        .data
323        .as_ref()
324        .ok_or_else(|| anyhow!("Secret {secret_name} has no data"))?;
325
326    // Convert ByteString to Vec<u8>
327    let mut converted_data = std::collections::BTreeMap::new();
328    for (key, value) in data {
329        converted_data.insert(key.clone(), value.0.clone());
330    }
331
332    crate::bind9::Bind9Manager::parse_rndc_secret_data(&converted_data)
333}
334
335/// Get all ready endpoints for a service.
336///
337/// Queries the Kubernetes Endpoints API to find all ready pod IPs and ports
338/// for a given service. The port_name must match the name field in the
339/// service's port specification.
340///
341/// # Arguments
342///
343/// * `client` - Kubernetes API client
344/// * `namespace` - Namespace of the service
345/// * `service_name` - Name of the service (usually same as instance name)
346/// * `port_name` - Name of the port to query (e.g., "rndc-api", "dns-tcp")
347///
348/// # Returns
349///
350/// Vector of endpoint addresses with IP and port
351///
352/// # Errors
353///
354/// Returns an error if:
355/// - Failed to get endpoints from API
356/// - No ready addresses found
357pub async fn get_endpoint(
358    client: &Client,
359    namespace: &str,
360    service_name: &str,
361    port_name: &str,
362) -> Result<Vec<EndpointAddress>> {
363    let endpoints_api: Api<Endpoints> = Api::namespaced(client.clone(), namespace);
364    let endpoints = endpoints_api.get(service_name).await.context(format!(
365        "Failed to get endpoints for service {service_name}"
366    ))?;
367
368    let mut result = Vec::new();
369
370    // Endpoints are organized into subsets. Each subset has:
371    // - addresses: List of ready pod IPs
372    // - ports: List of container ports
373    if let Some(subsets) = endpoints.subsets {
374        for subset in subsets {
375            // Find the port in this subset
376            if let Some(ports) = subset.ports {
377                if let Some(endpoint_port) = ports
378                    .iter()
379                    .find(|p| p.name.as_ref().is_some_and(|name| name == port_name))
380                {
381                    let port = endpoint_port.port;
382
383                    // Get all ready addresses for this subset
384                    if let Some(addresses) = subset.addresses {
385                        for addr in addresses {
386                            result.push(EndpointAddress {
387                                ip: addr.ip.clone(),
388                                port,
389                            });
390                        }
391                    }
392                }
393            }
394        }
395    }
396
397    if result.is_empty() {
398        return Err(anyhow!(
399            "No ready endpoints found for service {service_name} with port '{port_name}'"
400        ));
401    }
402
403    Ok(result)
404}