bindy/reconcilers/dnszone/helpers.rs
1// Copyright (c) 2025 Erick Bourgeois, firestoned
2// SPDX-License-Identifier: MIT
3
4//! Helper functions for DNS zone reconciliation.
5//!
6//! This module contains the validation and change detection helper functions
7//! extracted from the main reconcile_dnszone() function to improve maintainability.
8
9use crate::crd::{DNSZone, InstanceReference};
10use anyhow::{anyhow, Context as AnyhowContext, Result};
11use k8s_openapi::api::core::v1::{Endpoints, Secret};
12use kube::{Api, Client};
13use std::collections::{HashMap, HashSet};
14use tracing::{error, info};
15
16use super::types::{DuplicateZoneInfo, EndpointAddress};
17use crate::bind9::RndcKeyData;
18
19/// Re-fetch a DNSZone to get the latest status.
20///
21/// The `dnszone` parameter from the watch event might have stale status from the cache.
22/// We need the latest `status.bind9Instances` which may have been updated by the
23/// Bind9Instance reconciler.
24///
25/// # Arguments
26/// * `client` - Kubernetes client
27/// * `namespace` - Namespace of the DNSZone
28/// * `name` - Name of the DNSZone
29///
30/// # Returns
31/// The freshly fetched DNSZone with current status
32///
33/// # Errors
34/// Returns an error if the Kubernetes API call fails
35pub async fn refetch_zone(client: &Client, namespace: &str, name: &str) -> Result<DNSZone> {
36 let zones_api: kube::Api<DNSZone> = kube::Api::namespaced(client.clone(), namespace);
37 let zone = zones_api.get(name).await?;
38 Ok(zone)
39}
40
41/// Handle duplicate zone conflicts by setting Ready=False and stopping reconciliation.
42///
43/// When a duplicate zone is detected, this function:
44/// 1. Logs a warning with details about the conflict
45/// 2. Updates the status with Ready=False and DuplicateZone condition
46/// 3. Applies the status to the API server
47///
48/// # Arguments
49/// * `client` - Kubernetes client
50/// * `namespace` - Namespace of the conflicting DNSZone
51/// * `name` - Name of the conflicting DNSZone
52/// * `duplicate_info` - Information about the duplicate zone conflict
53/// * `status_updater` - Status updater to apply the condition
54///
55/// # Errors
56/// Returns an error if the status update fails
57pub async fn handle_duplicate_zone(
58 client: &Client,
59 namespace: &str,
60 name: &str,
61 duplicate_info: &DuplicateZoneInfo,
62 status_updater: &mut crate::reconcilers::status::DNSZoneStatusUpdater,
63) -> Result<()> {
64 tracing::warn!(
65 "Duplicate zone detected: {}/{} cannot claim '{}' because it is already configured by: {:?}",
66 namespace, name, duplicate_info.zone_name, duplicate_info.conflicting_zones
67 );
68
69 // Build list of conflicting zones in namespace/name format
70 let conflicting_zone_refs: Vec<String> = duplicate_info
71 .conflicting_zones
72 .iter()
73 .map(|z| format!("{}/{}", z.namespace, z.name))
74 .collect();
75
76 // Set Ready=False with DuplicateZone reason
77 status_updater.set_duplicate_zone_condition(&duplicate_info.zone_name, &conflicting_zone_refs);
78
79 // Apply status and stop processing
80 status_updater.apply(client).await?;
81
82 Ok(())
83}
84
85/// Detect if the zone spec has changed since last reconciliation.
86///
87/// Compares current generation with observed generation to determine
88/// if this is first reconciliation or if spec changed.
89///
90/// # Arguments
91///
92/// * `zone` - The DNSZone resource
93///
94/// # Returns
95///
96/// Tuple of (first_reconciliation, spec_changed)
97#[must_use]
98pub fn detect_spec_changes(zone: &DNSZone) -> (bool, bool) {
99 let current_generation = zone.metadata.generation;
100 let observed_generation = zone.status.as_ref().and_then(|s| s.observed_generation);
101
102 let first_reconciliation = observed_generation.is_none();
103 let spec_changed =
104 crate::reconcilers::should_reconcile(current_generation, observed_generation);
105
106 (first_reconciliation, spec_changed)
107}
108
109/// Detect if the instance list changed between watch event and re-fetch.
110///
111/// This is critical for detecting when:
112/// 1. New instances are added to `status.bind9Instances` (via `bind9InstancesFrom` selectors)
113/// 2. Instance `lastReconciledAt` timestamps are cleared (e.g., instance deleted, needs reconfiguration)
114///
115/// NOTE: `InstanceReference` `PartialEq` ignores `lastReconciledAt`, so we must check timestamps separately!
116///
117/// # Arguments
118///
119/// * `namespace` - Namespace for logging
120/// * `name` - Zone name for logging
121/// * `watch_instances` - Instances from the watch event that triggered reconciliation
122/// * `current_instances` - Instances after re-fetching (current state)
123///
124/// # Returns
125///
126/// `true` if instances changed (list or timestamps), `false` otherwise
127pub fn detect_instance_changes(
128 namespace: &str,
129 name: &str,
130 watch_instances: Option<&Vec<InstanceReference>>,
131 current_instances: &[InstanceReference],
132) -> bool {
133 let Some(watch_instances) = watch_instances else {
134 // No instances in watch event, first reconciliation or error
135 return true;
136 };
137
138 // Get the instance names from the watch event (what triggered us)
139 let watch_instance_names: HashSet<_> = watch_instances.iter().map(|r| &r.name).collect();
140
141 // Get the instance names after re-fetching (current state)
142 let current_instance_names: HashSet<_> = current_instances.iter().map(|r| &r.name).collect();
143
144 // Check if instance list changed (added/removed instances)
145 let list_changed = watch_instance_names != current_instance_names;
146
147 if list_changed {
148 info!(
149 "Instance list changed during reconciliation for zone {}/{}: watch_event={:?}, current={:?}",
150 namespace, name, watch_instance_names, current_instance_names
151 );
152 return true;
153 }
154
155 // List is the same, but check if any lastReconciledAt timestamps changed
156 // Use InstanceReference as HashMap key (uses its Hash impl which hashes identity fields)
157 let watch_timestamps: HashMap<&InstanceReference, Option<&str>> = watch_instances
158 .iter()
159 .map(|inst| (inst, inst.last_reconciled_at.as_deref()))
160 .collect();
161
162 let current_timestamps: HashMap<&InstanceReference, Option<&str>> = current_instances
163 .iter()
164 .map(|inst| (inst, inst.last_reconciled_at.as_deref()))
165 .collect();
166
167 let timestamps_changed = watch_timestamps.iter().any(|(inst_ref, watch_ts)| {
168 current_timestamps
169 .get(inst_ref)
170 .is_some_and(|current_ts| current_ts != watch_ts)
171 });
172
173 if timestamps_changed {
174 info!(
175 "Instance lastReconciledAt timestamps changed for zone {}/{}",
176 namespace, name
177 );
178 }
179
180 timestamps_changed
181}
182
183//
184// ============================================================
185// Endpoint and Instance Utilities
186// ============================================================
187//
188
189/// Execute an operation on all endpoints for a list of instance references.
190///
191/// This is the event-driven instance-based approach that operates on instances
192/// discovered via spec.bind9InstancesFrom selectors.
193///
194/// # Arguments
195///
196/// * `client` - Kubernetes API client
197/// * `instance_refs` - List of instance references to process
198/// * `with_rndc_key` - Whether to load and pass RNDC keys for each instance
199/// * `port_name` - Port name to use for endpoints (e.g., "rndc-api", "dns-tcp")
200/// * `operation` - Async closure to execute for each endpoint
201///
202/// # Returns
203///
204/// * `Ok((first_endpoint, total_endpoints))` - First endpoint found and total count
205///
206/// # Errors
207///
208/// Returns an error if all operations fail or if critical API calls fail.
209pub async fn for_each_instance_endpoint<F, Fut>(
210 client: &Client,
211 instance_refs: &[crate::crd::InstanceReference],
212 with_rndc_key: bool,
213 port_name: &str,
214 operation: F,
215) -> Result<(Option<String>, usize)>
216where
217 F: Fn(String, String, Option<RndcKeyData>) -> Fut,
218 Fut: std::future::Future<Output = Result<()>>,
219{
220 let mut first_endpoint: Option<String> = None;
221 let mut total_endpoints = 0;
222 let mut errors: Vec<String> = Vec::new();
223
224 for instance_ref in instance_refs {
225 info!(
226 "Processing endpoints for instance {}/{}",
227 instance_ref.namespace, instance_ref.name
228 );
229
230 // Load RNDC key for this specific instance if requested
231 let key_data = if with_rndc_key {
232 Some(load_rndc_key(client, &instance_ref.namespace, &instance_ref.name).await?)
233 } else {
234 None
235 };
236
237 // Get all endpoints for this instance's service
238 let endpoints = get_endpoint(
239 client,
240 &instance_ref.namespace,
241 &instance_ref.name,
242 port_name,
243 )
244 .await?;
245
246 info!(
247 "Found {} endpoint(s) for instance {}/{}",
248 endpoints.len(),
249 instance_ref.namespace,
250 instance_ref.name
251 );
252
253 for endpoint in &endpoints {
254 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
255
256 // Save the first endpoint
257 if first_endpoint.is_none() {
258 first_endpoint = Some(pod_endpoint.clone());
259 }
260
261 // Execute the operation on this endpoint
262 if let Err(e) = operation(
263 pod_endpoint.clone(),
264 instance_ref.name.clone(),
265 key_data.clone(),
266 )
267 .await
268 {
269 error!(
270 "Failed operation on endpoint {} (instance {}/{}): {}",
271 pod_endpoint, instance_ref.namespace, instance_ref.name, e
272 );
273 errors.push(format!(
274 "endpoint {pod_endpoint} (instance {}/{}): {e}",
275 instance_ref.namespace, instance_ref.name
276 ));
277 } else {
278 total_endpoints += 1;
279 }
280 }
281 }
282
283 // If ALL operations failed, return an error
284 if total_endpoints == 0 && !errors.is_empty() {
285 return Err(anyhow!(
286 "All operations failed. Errors: {}",
287 errors.join("; ")
288 ));
289 }
290
291 Ok((first_endpoint, total_endpoints))
292}
293
294/// Load RNDC key from the instance's secret.
295///
296/// # Arguments
297///
298/// * `client` - Kubernetes API client
299/// * `namespace` - Namespace of the instance
300/// * `instance_name` - Name of the instance
301///
302/// # Returns
303///
304/// Parsed RNDC key data
305///
306/// # Errors
307///
308/// Returns an error if the secret is not found or cannot be parsed
309pub async fn load_rndc_key(
310 client: &Client,
311 namespace: &str,
312 instance_name: &str,
313) -> Result<RndcKeyData> {
314 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
315 let secret_name = format!("{instance_name}-rndc-key");
316
317 let secret = secret_api.get(&secret_name).await.context(format!(
318 "Failed to get RNDC secret {secret_name} in namespace {namespace}"
319 ))?;
320
321 let data = secret
322 .data
323 .as_ref()
324 .ok_or_else(|| anyhow!("Secret {secret_name} has no data"))?;
325
326 // Convert ByteString to Vec<u8>
327 let mut converted_data = std::collections::BTreeMap::new();
328 for (key, value) in data {
329 converted_data.insert(key.clone(), value.0.clone());
330 }
331
332 crate::bind9::Bind9Manager::parse_rndc_secret_data(&converted_data)
333}
334
335/// Get all ready endpoints for a service.
336///
337/// Queries the Kubernetes Endpoints API to find all ready pod IPs and ports
338/// for a given service. The port_name must match the name field in the
339/// service's port specification.
340///
341/// # Arguments
342///
343/// * `client` - Kubernetes API client
344/// * `namespace` - Namespace of the service
345/// * `service_name` - Name of the service (usually same as instance name)
346/// * `port_name` - Name of the port to query (e.g., "rndc-api", "dns-tcp")
347///
348/// # Returns
349///
350/// Vector of endpoint addresses with IP and port
351///
352/// # Errors
353///
354/// Returns an error if:
355/// - Failed to get endpoints from API
356/// - No ready addresses found
357pub async fn get_endpoint(
358 client: &Client,
359 namespace: &str,
360 service_name: &str,
361 port_name: &str,
362) -> Result<Vec<EndpointAddress>> {
363 let endpoints_api: Api<Endpoints> = Api::namespaced(client.clone(), namespace);
364 let endpoints = endpoints_api.get(service_name).await.context(format!(
365 "Failed to get endpoints for service {service_name}"
366 ))?;
367
368 let mut result = Vec::new();
369
370 // Endpoints are organized into subsets. Each subset has:
371 // - addresses: List of ready pod IPs
372 // - ports: List of container ports
373 if let Some(subsets) = endpoints.subsets {
374 for subset in subsets {
375 // Find the port in this subset
376 if let Some(ports) = subset.ports {
377 if let Some(endpoint_port) = ports
378 .iter()
379 .find(|p| p.name.as_ref().is_some_and(|name| name == port_name))
380 {
381 let port = endpoint_port.port;
382
383 // Get all ready addresses for this subset
384 if let Some(addresses) = subset.addresses {
385 for addr in addresses {
386 result.push(EndpointAddress {
387 ip: addr.ip.clone(),
388 port,
389 });
390 }
391 }
392 }
393 }
394 }
395 }
396
397 if result.is_empty() {
398 return Err(anyhow!(
399 "No ready endpoints found for service {service_name} with port '{port_name}'"
400 ));
401 }
402
403 Ok(result)
404}