bindy/reconcilers/
dnszone.rs

1// Copyright (c) 2025 Erick Bourgeois, firestoned
2#![allow(dead_code)]
3// SPDX-License-Identifier: MIT
4
5//! DNS zone reconciliation logic.
6//!
7//! This module handles the creation and management of DNS zones on BIND9 servers.
8//! It supports both primary and secondary zone configurations.
9
10use crate::bind9::RndcKeyData;
11use crate::constants::{ANNOTATION_ZONE_OWNER, ANNOTATION_ZONE_PREVIOUS_OWNER};
12use crate::crd::{Condition, DNSZone, DNSZoneSpec, DNSZoneStatus};
13use anyhow::{anyhow, Context, Result};
14use bindcar::{ZONE_TYPE_PRIMARY, ZONE_TYPE_SECONDARY};
15use chrono::Utc;
16use k8s_openapi::api::core::v1::{Endpoints, Pod, Secret};
17use kube::{
18    api::{ListParams, Patch, PatchParams},
19    client::Client,
20    Api, ResourceExt,
21};
22use serde_json::json;
23use std::collections::HashSet;
24use tracing::{debug, info, warn};
25
26/// Helper function to extract and validate cluster reference from `DNSZoneSpec`.
27///
28/// Returns the cluster name, whether from clusterRef or clusterProviderRef.
29/// Validates that exactly one is specified (mutual exclusivity).
30///
31/// This function is public so it can be used by other reconcilers (e.g., records reconciler).
32///
33/// # Errors
34///
35/// Returns an error if:
36/// - Both `clusterRef` and `clusterProviderRef` are specified (mutual exclusivity violation)
37/// - Neither `clusterRef` nor `clusterProviderRef` is specified (at least one required)
38pub fn get_cluster_ref_from_spec(
39    spec: &DNSZoneSpec,
40    namespace: &str,
41    name: &str,
42) -> Result<String> {
43    match (&spec.cluster_ref, &spec.cluster_provider_ref) {
44        (Some(ref cluster_name), None) => Ok(cluster_name.clone()),
45        (None, Some(ref cluster_provider_name)) => Ok(cluster_provider_name.clone()),
46        (Some(_), Some(_)) => Err(anyhow!(
47            "DNSZone {namespace}/{name} has both clusterRef and clusterProviderRef specified. \
48            Only one must be specified."
49        )),
50        (None, None) => Err(anyhow!(
51            "DNSZone {namespace}/{name} has neither clusterRef nor clusterProviderRef specified. \
52            Exactly one must be specified."
53        )),
54    }
55}
56
57/// Reconciles a `DNSZone` resource.
58///
59/// Creates or updates DNS zone files on BIND9 instances that match the zone's
60/// instance selector. Supports both primary and secondary zone types.
61///
62/// # Zone Types
63///
64/// - **Primary**: Authoritative zone with SOA record and local zone file
65/// - **Secondary**: Replica zone that transfers from primary servers
66///
67/// # Arguments
68///
69/// * `client` - Kubernetes API client for finding matching `Bind9Instances`
70/// * `dnszone` - The `DNSZone` resource to reconcile
71/// * `zone_manager` - BIND9 manager for creating zone files
72///
73/// # Returns
74///
75/// * `Ok(())` - If zone was created/updated successfully
76/// * `Err(_)` - If zone creation failed or configuration is invalid
77///
78/// # Example
79///
80/// ```rust,no_run
81/// use bindy::reconcilers::reconcile_dnszone;
82/// use bindy::crd::DNSZone;
83/// use bindy::bind9::Bind9Manager;
84/// use kube::Client;
85///
86/// async fn handle_zone(zone: DNSZone) -> anyhow::Result<()> {
87///     let client = Client::try_default().await?;
88///     let manager = Bind9Manager::new();
89///     reconcile_dnszone(client, zone, &manager).await?;
90///     Ok(())
91/// }
92/// ```
93///
94/// # Errors
95///
96/// Returns an error if Kubernetes API operations fail or BIND9 zone operations fail.
97#[allow(clippy::too_many_lines)]
98pub async fn reconcile_dnszone(
99    client: Client,
100    dnszone: DNSZone,
101    zone_manager: &crate::bind9::Bind9Manager,
102) -> Result<()> {
103    let namespace = dnszone.namespace().unwrap_or_default();
104    let name = dnszone.name_any();
105
106    info!("Reconciling DNSZone: {}/{}", namespace, name);
107    debug!(
108        namespace = %namespace,
109        name = %name,
110        generation = ?dnszone.metadata.generation,
111        "Starting DNSZone reconciliation"
112    );
113
114    // Create centralized status updater to batch all status changes
115    let mut status_updater = crate::reconcilers::status::DNSZoneStatusUpdater::new(&dnszone);
116
117    // Extract spec
118    let spec = &dnszone.spec;
119
120    // Guard clause: Validate exactly one cluster reference is provided
121    let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
122    let is_cluster_provider = spec.cluster_provider_ref.is_some();
123
124    info!(
125        "DNSZone {}/{} references cluster '{}' (is_cluster_provider={}, cluster_ref={:?}, cluster_provider_ref={:?})",
126        namespace, name, cluster_ref, is_cluster_provider, spec.cluster_ref, spec.cluster_provider_ref
127    );
128
129    // Determine if this is the first reconciliation or if spec has changed
130    let current_generation = dnszone.metadata.generation;
131    let observed_generation = dnszone.status.as_ref().and_then(|s| s.observed_generation);
132
133    let first_reconciliation = observed_generation.is_none();
134    let spec_changed =
135        crate::reconcilers::should_reconcile(current_generation, observed_generation);
136
137    info!(
138        "Reconciling zone {} (first_reconciliation={}, spec_changed={})",
139        spec.zone_name, first_reconciliation, spec_changed
140    );
141
142    // BIND9 configuration: Always ensure zones exist on all instances
143    // This implements true declarative reconciliation - if a pod restarts without
144    // persistent storage, the reconciler will detect the missing zone and recreate it.
145    // The add_zones() function is idempotent, so this is safe to call every reconciliation.
146    //
147    // NOTE: We ALWAYS configure zones, not just when spec changes. This ensures:
148    // - Zones are recreated if pods restart without persistent volumes
149    // - New instances added to the cluster get zones automatically
150    // - Drift detection: if someone manually deletes a zone, it's recreated
151    // - True Kubernetes declarative reconciliation: actual state continuously matches desired state
152    let (primary_count, secondary_count) = {
153        debug!("Ensuring BIND9 zone exists on all instances (declarative reconciliation)");
154
155        // Set initial Progressing status (in-memory)
156        status_updater.set_condition(
157            "Progressing",
158            "True",
159            "PrimaryReconciling",
160            "Configuring zone on primary servers",
161        );
162
163        // Get current primary IPs for secondary zone configuration
164        let primary_ips =
165            match find_all_primary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider)
166                .await
167            {
168                Ok(ips) if !ips.is_empty() => {
169                    info!(
170                        "Found {} primary server(s) for cluster {}: {:?}",
171                        ips.len(),
172                        cluster_ref,
173                        ips
174                    );
175                    ips
176                }
177                Ok(_) => {
178                    status_updater.set_condition(
179                        "Degraded",
180                        "True",
181                        "PrimaryFailed",
182                        &format!("No primary servers found for cluster {cluster_ref}"),
183                    );
184                    // Apply status before returning error
185                    status_updater.apply(&client).await?;
186                    return Err(anyhow!(
187                    "No primary servers found for cluster {cluster_ref} - cannot configure zones"
188                ));
189                }
190                Err(e) => {
191                    status_updater.set_condition(
192                        "Degraded",
193                        "True",
194                        "PrimaryFailed",
195                        &format!("Failed to find primary servers: {e}"),
196                    );
197                    // Apply status before returning error
198                    status_updater.apply(&client).await?;
199                    return Err(e);
200                }
201            };
202
203        // Add/update zone on all primary instances
204        let primary_count = match add_dnszone(client.clone(), dnszone.clone(), zone_manager).await {
205            Ok(count) => {
206                // Update status after successful primary reconciliation (in-memory)
207                status_updater.set_condition(
208                    "Progressing",
209                    "True",
210                    "PrimaryReconciled",
211                    &format!(
212                        "Zone {} configured on {} primary server(s)",
213                        spec.zone_name, count
214                    ),
215                );
216                count
217            }
218            Err(e) => {
219                status_updater.set_condition(
220                    "Degraded",
221                    "True",
222                    "PrimaryFailed",
223                    &format!("Failed to configure zone on primary servers: {e}"),
224                );
225                // Apply status before returning error
226                status_updater.apply(&client).await?;
227                return Err(e);
228            }
229        };
230
231        // Update to secondary reconciliation phase (in-memory)
232        status_updater.set_condition(
233            "Progressing",
234            "True",
235            "SecondaryReconciling",
236            "Configuring zone on secondary servers",
237        );
238
239        // Add/update zone on all secondary instances with primaries configured
240        let secondary_count = match add_dnszone_to_secondaries(
241            client.clone(),
242            dnszone.clone(),
243            zone_manager,
244            &primary_ips,
245        )
246        .await
247        {
248            Ok(count) => {
249                // Update status after successful secondary reconciliation (in-memory)
250                if count > 0 {
251                    status_updater.set_condition(
252                        "Progressing",
253                        "True",
254                        "SecondaryReconciled",
255                        &format!(
256                            "Zone {} configured on {} secondary server(s)",
257                            spec.zone_name, count
258                        ),
259                    );
260                }
261                count
262            }
263            Err(e) => {
264                // Secondary failure is non-fatal - primaries still work
265                warn!(
266                    "Failed to configure zone on secondary servers: {}. Primary servers are still operational.",
267                    e
268                );
269                status_updater.set_condition(
270                    "Degraded",
271                    "True",
272                    "SecondaryFailed",
273                    &format!(
274                        "Zone configured on {primary_count} primary server(s) but secondary configuration failed: {e}"
275                    ),
276                );
277                0
278            }
279        };
280
281        (primary_count, secondary_count)
282    };
283
284    // Discover DNS records that match the zone's label selectors (in-memory status update)
285    status_updater.set_condition(
286        "Progressing",
287        "True",
288        "RecordsDiscovering",
289        "Discovering DNS records via label selectors",
290    );
291
292    let record_refs = match reconcile_zone_records(client.clone(), dnszone.clone()).await {
293        Ok(refs) => {
294            info!(
295                "Discovered {} DNS record(s) for zone {} via label selectors",
296                refs.len(),
297                spec.zone_name
298            );
299            refs
300        }
301        Err(e) => {
302            // Record discovery failure is non-fatal - the zone itself is still configured
303            warn!(
304                "Failed to discover DNS records for zone {}: {}. Zone is configured but record discovery failed.",
305                spec.zone_name, e
306            );
307            status_updater.set_condition(
308                "Degraded",
309                "True",
310                "RecordDiscoveryFailed",
311                &format!("Zone configured but record discovery failed: {e}"),
312            );
313            vec![]
314        }
315    };
316
317    let record_count = record_refs.len();
318
319    // Update DNSZone status with discovered records (in-memory)
320    status_updater.set_records(record_refs.clone());
321
322    // Check if all discovered records are ready and trigger zone transfers if needed
323    if record_count > 0 {
324        let all_records_ready = check_all_records_ready(&client, &namespace, &record_refs).await?;
325
326        if all_records_ready {
327            info!(
328                "All {} record(s) for zone {} are ready, triggering zone transfers to secondaries",
329                record_count, spec.zone_name
330            );
331
332            // Trigger zone transfers to all secondaries
333            match trigger_zone_transfers(
334                &client,
335                &namespace,
336                &spec.zone_name,
337                &cluster_ref,
338                is_cluster_provider,
339                zone_manager,
340            )
341            .await
342            {
343                Ok(transfer_count) => {
344                    info!(
345                        "Successfully triggered zone transfer for {} to {} secondary instance(s)",
346                        spec.zone_name, transfer_count
347                    );
348                }
349                Err(e) => {
350                    warn!(
351                        "Failed to trigger zone transfers for {}: {}. Zone is configured but secondaries may be out of sync.",
352                        spec.zone_name, e
353                    );
354                    status_updater.set_condition(
355                        "Degraded",
356                        "True",
357                        "TransferFailed",
358                        &format!("Zone configured but zone transfer failed: {e}"),
359                    );
360                }
361            }
362        } else {
363            info!(
364                "Not all records for zone {} are ready yet, skipping zone transfer",
365                spec.zone_name
366            );
367        }
368    }
369
370    // Re-fetch secondary IPs to store in status (in-memory)
371    let secondary_ips =
372        find_all_secondary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider)
373            .await
374            .unwrap_or_default();
375    status_updater.set_secondary_ips(secondary_ips);
376
377    // Set observed generation (in-memory)
378    status_updater.set_observed_generation(current_generation);
379
380    // All reconciliation complete - set Ready status (in-memory)
381    status_updater.set_condition(
382        "Ready",
383        "True",
384        "ReconcileSucceeded",
385        &format!(
386            "Zone {} configured on {} primary and {} secondary server(s), discovered {} DNS record(s) for cluster {}",
387            spec.zone_name, primary_count, secondary_count, record_count, cluster_ref
388        ),
389    );
390
391    // Apply all status changes in a single atomic operation
392    status_updater.apply(&client).await?;
393
394    Ok(())
395}
396
397/// Adds a DNS zone to all primary instances in the cluster.
398///
399/// # Arguments
400///
401/// * `client` - Kubernetes API client
402/// * `dnszone` - The `DNSZone` resource
403/// * `zone_manager` - BIND9 manager for adding zone
404///
405/// # Returns
406///
407/// * `Ok(usize)` - Number of primary endpoints successfully configured
408/// * `Err(_)` - If zone addition failed
409///
410/// # Errors
411///
412/// Returns an error if BIND9 zone addition fails.
413///
414/// # Panics
415///
416/// Panics if the RNDC key is not loaded by the helper function (should never happen in practice).
417pub async fn add_dnszone(
418    client: Client,
419    dnszone: DNSZone,
420    zone_manager: &crate::bind9::Bind9Manager,
421) -> Result<usize> {
422    let namespace = dnszone.namespace().unwrap_or_default();
423    let name = dnszone.name_any();
424    let spec = &dnszone.spec;
425
426    // Extract and validate cluster reference
427    let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
428    let is_cluster_provider = spec.cluster_provider_ref.is_some();
429
430    info!("Adding DNSZone: {}", name);
431
432    // Find secondary pod IPs for zone transfer configuration
433    let secondary_ips =
434        find_all_secondary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
435
436    if secondary_ips.is_empty() {
437        warn!(
438            "No secondary servers found for cluster {} - zone transfers will not be configured",
439            cluster_ref
440        );
441    } else {
442        info!(
443            "Found {} secondary server(s) for cluster {} - zone transfers will be configured: {:?}",
444            secondary_ips.len(),
445            cluster_ref,
446            secondary_ips
447        );
448    }
449
450    // Use the common helper to iterate through all endpoints
451    // Load RNDC key (true) since zone addition requires it
452    // Use "http" port for HTTP API operations
453    let (first_endpoint, total_endpoints) = for_each_primary_endpoint(
454        &client,
455        &namespace,
456        &cluster_ref,
457        is_cluster_provider,
458        true, // with_rndc_key = true for zone addition
459        "http", // Use HTTP API port for zone addition via bindcar API
460        |pod_endpoint, instance_name, rndc_key| {
461            let zone_name = spec.zone_name.clone();
462            let soa_record = spec.soa_record.clone();
463            let name_server_ips = spec.name_server_ips.clone();
464            let zone_manager = zone_manager.clone();
465            let secondary_ips_clone = secondary_ips.clone();
466
467            async move {
468                // SAFETY: RNDC key is guaranteed to be Some when with_rndc_key=true
469                // The for_each_primary_endpoint helper loads the key when with_rndc_key=true
470                let key_data = rndc_key.expect("RNDC key should be loaded when with_rndc_key=true");
471
472                // Pass secondary IPs for zone transfer configuration
473                let secondary_ips_ref = if secondary_ips_clone.is_empty() {
474                    None
475                } else {
476                    Some(secondary_ips_clone.as_slice())
477                };
478
479                let was_added = zone_manager
480                    .add_zones(
481                        &zone_name,
482                        ZONE_TYPE_PRIMARY,
483                        &pod_endpoint,
484                        &key_data,
485                        Some(&soa_record),
486                        name_server_ips.as_ref(),
487                        secondary_ips_ref,
488                        None, // primary_ips only for secondary zones
489                    )
490                    .await
491                    .context(format!(
492                        "Failed to add zone {zone_name} to endpoint {pod_endpoint} (instance: {instance_name})"
493                    ))?;
494
495                if was_added {
496                    info!(
497                        "Successfully added zone {} to endpoint {} (instance: {})",
498                        zone_name, pod_endpoint, instance_name
499                    );
500                }
501
502                Ok(())
503            }
504        },
505    )
506    .await?;
507
508    info!(
509        "Successfully added zone {} to {} endpoint(s) for cluster {}",
510        spec.zone_name, total_endpoints, cluster_ref
511    );
512
513    // Note: We don't need to reload after addzone because:
514    // 1. rndc addzone immediately adds the zone to BIND9's running config
515    // 2. The zone file will be created automatically when records are added via dynamic updates
516    // 3. Reloading would fail if the zone file doesn't exist yet
517
518    // Notify secondaries about the new zone via the first endpoint
519    // This triggers zone transfer (AXFR) from primary to secondaries
520    if let Some(first_pod_endpoint) = first_endpoint {
521        info!(
522            "Notifying secondaries about new zone {} for cluster {}",
523            spec.zone_name, cluster_ref
524        );
525        if let Err(e) = zone_manager
526            .notify_zone(&spec.zone_name, &first_pod_endpoint)
527            .await
528        {
529            // Don't fail if NOTIFY fails - the zone was successfully created
530            // Secondaries will sync via SOA refresh timer
531            warn!(
532                "Failed to notify secondaries for zone {}: {}. Secondaries will sync via SOA refresh timer.",
533                spec.zone_name, e
534            );
535        }
536    } else {
537        warn!(
538            "No endpoints found for zone {}, cannot notify secondaries",
539            spec.zone_name
540        );
541    }
542
543    Ok(total_endpoints)
544}
545
546/// Adds a DNS zone to all secondary instances in the cluster with primaries configured.
547///
548/// Creates secondary zones on all secondary instances, configuring them to transfer
549/// from the provided primary server IPs. If a zone already exists on a secondary,
550/// it checks if the primaries list matches and updates it if necessary.
551///
552/// # Arguments
553///
554/// * `client` - Kubernetes API client
555/// * `dnszone` - The `DNSZone` resource
556/// * `zone_manager` - BIND9 manager for adding zone
557/// * `primary_ips` - List of primary server IPs to configure in the primaries field
558///
559/// # Returns
560///
561/// * `Ok(usize)` - Number of secondary endpoints successfully configured
562/// * `Err(_)` - If zone addition failed
563///
564/// # Errors
565///
566/// Returns an error if BIND9 zone addition fails on any secondary instance.
567#[allow(clippy::too_many_lines)]
568pub async fn add_dnszone_to_secondaries(
569    client: Client,
570    dnszone: DNSZone,
571    zone_manager: &crate::bind9::Bind9Manager,
572    primary_ips: &[String],
573) -> Result<usize> {
574    let namespace = dnszone.namespace().unwrap_or_default();
575    let name = dnszone.name_any();
576    let spec = &dnszone.spec;
577
578    // Extract and validate cluster reference
579    let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
580    let is_cluster_provider = spec.cluster_provider_ref.is_some();
581
582    if primary_ips.is_empty() {
583        warn!(
584            "No primary IPs provided for secondary zone {} - skipping secondary configuration",
585            spec.zone_name
586        );
587        return Ok(0);
588    }
589
590    info!(
591        "Adding DNSZone {} to secondary instances with primaries: {:?}",
592        name, primary_ips
593    );
594
595    // Find all secondary pods
596    let secondary_pods =
597        find_all_secondary_pods(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
598
599    if secondary_pods.is_empty() {
600        info!(
601            "No secondary servers found for cluster {} - skipping secondary zone configuration",
602            cluster_ref
603        );
604        return Ok(0);
605    }
606
607    info!(
608        "Found {} secondary pod(s) for cluster {}",
609        secondary_pods.len(),
610        cluster_ref
611    );
612
613    // Get unique (instance_name, namespace) tuples from secondary pods
614    let mut instance_tuples: Vec<(String, String)> = secondary_pods
615        .iter()
616        .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
617        .collect();
618    instance_tuples.sort();
619    instance_tuples.dedup();
620
621    if instance_tuples.is_empty() {
622        return Err(anyhow!(
623            "No secondary instances found for cluster {cluster_ref}"
624        ));
625    }
626
627    let mut total_endpoints = 0;
628
629    // Iterate through each secondary instance and add zone to all its endpoints
630    for (instance_name, instance_namespace) in &instance_tuples {
631        info!(
632            "Processing secondary instance {}/{} for zone {}",
633            instance_namespace, instance_name, spec.zone_name
634        );
635
636        // Load RNDC key for this specific instance
637        // Each instance has its own RNDC secret for security isolation
638        let key_data = load_rndc_key(&client, instance_namespace, instance_name).await?;
639
640        // Get all endpoints for this secondary instance
641        let endpoints = get_endpoint(&client, instance_namespace, instance_name, "http").await?;
642
643        info!(
644            "Found {} endpoint(s) for secondary instance {}",
645            endpoints.len(),
646            instance_name
647        );
648
649        for endpoint in &endpoints {
650            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
651
652            info!(
653                "Adding secondary zone {} to endpoint {} (instance: {}) with primaries: {:?}",
654                spec.zone_name, pod_endpoint, instance_name, primary_ips
655            );
656
657            let was_added = zone_manager
658                .add_zones(
659                    &spec.zone_name,
660                    ZONE_TYPE_SECONDARY,
661                    &pod_endpoint,
662                    &key_data,
663                    None, // No SOA record for secondary zones
664                    None, // No name_server_ips for secondary zones
665                    None, // No secondary_ips for secondary zones
666                    Some(primary_ips),
667                )
668                .await
669                .context(format!(
670                    "Failed to add secondary zone {} to endpoint {} (instance: {})",
671                    spec.zone_name, pod_endpoint, instance_name
672                ))?;
673
674            if was_added {
675                info!(
676                    "Successfully added secondary zone {} to endpoint {} (instance: {})",
677                    spec.zone_name, pod_endpoint, instance_name
678                );
679            } else {
680                info!(
681                    "Secondary zone {} already exists on endpoint {} (instance: {})",
682                    spec.zone_name, pod_endpoint, instance_name
683                );
684            }
685
686            total_endpoints += 1;
687        }
688    }
689
690    info!(
691        "Successfully configured secondary zone {} on {} endpoint(s) across {} instance(s) for cluster {}",
692        spec.zone_name,
693        total_endpoints,
694        instance_tuples.len(),
695        cluster_ref
696    );
697
698    Ok(total_endpoints)
699}
700
701/// Reconciles DNS records for a zone by discovering records that match the zone's label selectors.
702///
703/// This function implements the core of the zone/record ownership model:
704/// 1. Discovers records matching the zone's label selectors
705/// 2. Tags matched records with zone ownership annotation and status.zone field
706/// 3. Untags previously matched records that no longer match
707/// 4. Returns references to currently matched records for status tracking
708///
709/// Record reconcilers use the `bindy.firestoned.io/zone` annotation to determine
710/// which zone to update in BIND9. When a record loses the annotation, the record
711/// reconciler will delete it from BIND9.
712///
713/// # Arguments
714///
715/// * `client` - Kubernetes API client for querying DNS records
716/// * `dnszone` - The `DNSZone` resource with label selectors
717///
718/// # Returns
719///
720/// * `Ok(Vec<RecordReference>)` - List of currently matched DNS records
721/// * `Err(_)` - If record discovery or tagging fails
722///
723/// # Errors
724///
725/// Returns an error if Kubernetes API operations fail.
726#[allow(clippy::too_many_lines)]
727async fn reconcile_zone_records(
728    client: Client,
729    dnszone: DNSZone,
730) -> Result<Vec<crate::crd::RecordReference>> {
731    let namespace = dnszone.namespace().unwrap_or_default();
732    let spec = &dnszone.spec;
733    let zone_name = &spec.zone_name;
734
735    // Early return if no label selectors are defined
736    let Some(ref records_from) = spec.records_from else {
737        info!(
738            "No label selectors defined for zone {}, skipping record discovery",
739            zone_name
740        );
741        // If no selectors, untag ALL previously matched records
742        return Ok(Vec::new());
743    };
744
745    info!(
746        "Discovering DNS records for zone {} using {} label selector(s)",
747        zone_name,
748        records_from.len()
749    );
750
751    let mut all_record_refs = Vec::new();
752
753    // Query all record types and filter by label selectors
754    for record_source in records_from {
755        let selector = &record_source.selector;
756
757        // Discover each record type
758        all_record_refs.extend(discover_a_records(&client, &namespace, selector).await?);
759        all_record_refs.extend(discover_aaaa_records(&client, &namespace, selector).await?);
760        all_record_refs.extend(discover_txt_records(&client, &namespace, selector).await?);
761        all_record_refs.extend(discover_cname_records(&client, &namespace, selector).await?);
762        all_record_refs.extend(discover_mx_records(&client, &namespace, selector).await?);
763        all_record_refs.extend(discover_ns_records(&client, &namespace, selector).await?);
764        all_record_refs.extend(discover_srv_records(&client, &namespace, selector).await?);
765        all_record_refs.extend(discover_caa_records(&client, &namespace, selector).await?);
766    }
767
768    info!(
769        "Discovered {} DNS record(s) for zone {}",
770        all_record_refs.len(),
771        zone_name
772    );
773
774    // Get previously matched records from current status
775    let previous_records: HashSet<String> = dnszone
776        .status
777        .as_ref()
778        .map(|s| {
779            s.records
780                .iter()
781                .map(|r| format!("{}/{}", r.kind, r.name))
782                .collect()
783        })
784        .unwrap_or_default();
785
786    // Create set of currently matched records
787    let current_records: HashSet<String> = all_record_refs
788        .iter()
789        .map(|r| format!("{}/{}", r.kind, r.name))
790        .collect();
791
792    // Tag newly matched records (in current but not in previous)
793    for record_ref in &all_record_refs {
794        let record_key = format!("{}/{}", record_ref.kind, record_ref.name);
795        if !previous_records.contains(&record_key) {
796            info!(
797                "Newly matched record: {} {}/{}",
798                record_ref.kind, namespace, record_ref.name
799            );
800            tag_record_with_zone(
801                &client,
802                &namespace,
803                &record_ref.kind,
804                &record_ref.name,
805                zone_name,
806            )
807            .await?;
808        }
809    }
810
811    // Untag previously matched records that no longer match or were deleted
812    // (in previous but not in current)
813    for prev_record_key in &previous_records {
814        if !current_records.contains(prev_record_key.as_str()) {
815            // Parse kind and name from "Kind/name" format
816            if let Some((kind, name)) = prev_record_key.split_once('/') {
817                warn!(
818                    "Record no longer matches zone {} (unmatched or deleted): {} {}/{}",
819                    zone_name, kind, namespace, name
820                );
821
822                // Try to untag the record, but don't fail if it was deleted
823                // If the record was deleted, the API will return NotFound, which is fine
824                if let Err(e) =
825                    untag_record_from_zone(&client, &namespace, kind, name, zone_name).await
826                {
827                    // Check if error is because record was deleted (NotFound)
828                    if e.to_string().contains("NotFound") || e.to_string().contains("not found") {
829                        info!(
830                            "Record {} {}/{} was deleted, removing from zone {} status",
831                            kind, namespace, name, zone_name
832                        );
833                    } else {
834                        // Other errors should be logged but not fail the reconciliation
835                        warn!(
836                            "Failed to untag record {} {}/{} from zone {}: {}",
837                            kind, namespace, name, zone_name, e
838                        );
839                    }
840                }
841                // Continue regardless - the record will be removed from status.records
842                // when we return all_record_refs (which doesn't include this record)
843            }
844        }
845    }
846
847    Ok(all_record_refs)
848}
849
850/// Tags a DNS record with zone ownership annotation and updates its `status.zone` field.
851///
852/// This function is called when a `DNSZone`'s label selector matches a record.
853/// It sets both the `bindy.firestoned.io/zone` annotation and the `status.zone` field.
854///
855/// # Arguments
856///
857/// * `client` - Kubernetes API client
858/// * `namespace` - Namespace of the record
859/// * `kind` - Record kind (e.g., `ARecord`, `CNAMERecord`)
860/// * `name` - Record name
861/// * `zone_fqdn` - Fully qualified domain name of the zone (e.g., `"example.com"`)
862///
863/// # Returns
864///
865/// * `Ok(())` - If the record was tagged successfully
866/// * `Err(_)` - If tagging failed
867async fn tag_record_with_zone(
868    client: &Client,
869    namespace: &str,
870    kind: &str,
871    name: &str,
872    zone_fqdn: &str,
873) -> Result<()> {
874    debug!(
875        "Tagging {} {}/{} with zone {}",
876        kind, namespace, name, zone_fqdn
877    );
878
879    // Convert kind to plural resource name (e.g., "ARecord" -> "arecords")
880    let plural = format!("{}s", kind.to_lowercase());
881
882    // Create GroupVersionKind for the resource
883    let gvk = kube::core::GroupVersionKind {
884        group: "bindy.firestoned.io".to_string(),
885        version: "v1beta1".to_string(),
886        kind: kind.to_string(),
887    };
888
889    // Use kube's Discovery API to create ApiResource
890    let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
891
892    // Create a dynamic API client
893    let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
894        client.clone(),
895        namespace,
896        &api_resource,
897    );
898
899    // Patch metadata to add annotation
900    let annotation_patch = json!({
901        "metadata": {
902            "annotations": {
903                ANNOTATION_ZONE_OWNER: zone_fqdn
904            }
905        }
906    });
907
908    api.patch(
909        name,
910        &PatchParams::default(),
911        &Patch::Merge(&annotation_patch),
912    )
913    .await
914    .with_context(|| format!("Failed to add zone annotation to {kind} {namespace}/{name}"))?;
915
916    // Patch status to set zone field
917    let status_patch = json!({
918        "status": {
919            "zone": zone_fqdn
920        }
921    });
922
923    api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
924        .await
925        .with_context(|| format!("Failed to set status.zone on {kind} {namespace}/{name}"))?;
926
927    info!(
928        "Successfully tagged {} {}/{} with zone {}",
929        kind, namespace, name, zone_fqdn
930    );
931
932    Ok(())
933}
934
935/// Untags a DNS record that no longer matches a zone's selector.
936///
937/// This function removes the zone ownership annotation and `status.zone` field,
938/// and optionally sets a `"previous-zone"` annotation for tracking.
939///
940/// # Arguments
941///
942/// * `client` - Kubernetes API client
943/// * `namespace` - Namespace of the record
944/// * `kind` - Record kind (e.g., `ARecord`, `CNAMERecord`)
945/// * `name` - Record name
946/// * `previous_zone_fqdn` - FQDN of the zone that previously owned this record
947///
948/// # Returns
949///
950/// * `Ok(())` - If the record was untagged successfully
951/// * `Err(_)` - If untagging failed
952async fn untag_record_from_zone(
953    client: &Client,
954    namespace: &str,
955    kind: &str,
956    name: &str,
957    previous_zone_fqdn: &str,
958) -> Result<()> {
959    debug!(
960        "Untagging {} {}/{} from zone {}",
961        kind, namespace, name, previous_zone_fqdn
962    );
963
964    // Convert kind to plural resource name
965    let plural = format!("{}s", kind.to_lowercase());
966
967    // Create GroupVersionKind for the resource
968    let gvk = kube::core::GroupVersionKind {
969        group: "bindy.firestoned.io".to_string(),
970        version: "v1beta1".to_string(),
971        kind: kind.to_string(),
972    };
973
974    // Use kube's Discovery API to create ApiResource
975    let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
976
977    // Create a dynamic API client
978    let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
979        client.clone(),
980        namespace,
981        &api_resource,
982    );
983
984    // Patch metadata to remove zone annotation and add previous-zone annotation
985    let annotation_patch = json!({
986        "metadata": {
987            "annotations": {
988                ANNOTATION_ZONE_OWNER: null,
989                ANNOTATION_ZONE_PREVIOUS_OWNER: previous_zone_fqdn
990            }
991        }
992    });
993
994    api.patch(
995        name,
996        &PatchParams::default(),
997        &Patch::Merge(&annotation_patch),
998    )
999    .await
1000    .with_context(|| format!("Failed to remove zone annotation from {kind} {namespace}/{name}"))?;
1001
1002    // Patch status to remove zone field
1003    let status_patch = json!({
1004        "status": {
1005            "zone": null
1006        }
1007    });
1008
1009    api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
1010        .await
1011        .with_context(|| format!("Failed to clear status.zone on {kind} {namespace}/{name}"))?;
1012
1013    info!(
1014        "Successfully untagged {} {}/{} from zone {}",
1015        kind, namespace, name, previous_zone_fqdn
1016    );
1017
1018    Ok(())
1019}
1020
1021/// Helper function to discover A records matching a label selector.
1022async fn discover_a_records(
1023    client: &Client,
1024    namespace: &str,
1025    selector: &crate::crd::LabelSelector,
1026) -> Result<Vec<crate::crd::RecordReference>> {
1027    use crate::crd::ARecord;
1028    use std::collections::BTreeMap;
1029
1030    let api: Api<ARecord> = Api::namespaced(client.clone(), namespace);
1031    let records = api.list(&ListParams::default()).await?;
1032
1033    let mut record_refs = Vec::new();
1034    for record in records {
1035        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1036
1037        if !selector.matches(&labels) {
1038            continue;
1039        }
1040
1041        debug!("Discovered A record {}/{}", namespace, record.name_any());
1042
1043        record_refs.push(crate::crd::RecordReference {
1044            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1045            kind: "ARecord".to_string(),
1046            name: record.name_any(),
1047        });
1048    }
1049
1050    Ok(record_refs)
1051}
1052
1053/// Helper function to discover AAAA records matching a label selector.
1054async fn discover_aaaa_records(
1055    client: &Client,
1056    namespace: &str,
1057    selector: &crate::crd::LabelSelector,
1058) -> Result<Vec<crate::crd::RecordReference>> {
1059    use crate::crd::AAAARecord;
1060    use std::collections::BTreeMap;
1061
1062    let api: Api<AAAARecord> = Api::namespaced(client.clone(), namespace);
1063    let records = api.list(&ListParams::default()).await?;
1064
1065    let mut record_refs = Vec::new();
1066    for record in records {
1067        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1068
1069        if !selector.matches(&labels) {
1070            continue;
1071        }
1072
1073        debug!("Discovered AAAA record {}/{}", namespace, record.name_any());
1074
1075        record_refs.push(crate::crd::RecordReference {
1076            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1077            kind: "AAAARecord".to_string(),
1078            name: record.name_any(),
1079        });
1080    }
1081
1082    Ok(record_refs)
1083}
1084
1085/// Helper function to discover TXT records matching a label selector.
1086async fn discover_txt_records(
1087    client: &Client,
1088    namespace: &str,
1089    selector: &crate::crd::LabelSelector,
1090) -> Result<Vec<crate::crd::RecordReference>> {
1091    use crate::crd::TXTRecord;
1092    use std::collections::BTreeMap;
1093
1094    let api: Api<TXTRecord> = Api::namespaced(client.clone(), namespace);
1095    let records = api.list(&ListParams::default()).await?;
1096
1097    let mut record_refs = Vec::new();
1098    for record in records {
1099        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1100
1101        if !selector.matches(&labels) {
1102            continue;
1103        }
1104
1105        debug!("Discovered TXT record {}/{}", namespace, record.name_any());
1106
1107        record_refs.push(crate::crd::RecordReference {
1108            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1109            kind: "TXTRecord".to_string(),
1110            name: record.name_any(),
1111        });
1112    }
1113
1114    Ok(record_refs)
1115}
1116
1117/// Helper function to discover CNAME records matching a label selector.
1118async fn discover_cname_records(
1119    client: &Client,
1120    namespace: &str,
1121    selector: &crate::crd::LabelSelector,
1122) -> Result<Vec<crate::crd::RecordReference>> {
1123    use crate::crd::CNAMERecord;
1124    use std::collections::BTreeMap;
1125
1126    let api: Api<CNAMERecord> = Api::namespaced(client.clone(), namespace);
1127    let records = api.list(&ListParams::default()).await?;
1128
1129    let mut record_refs = Vec::new();
1130    for record in records {
1131        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1132
1133        if !selector.matches(&labels) {
1134            continue;
1135        }
1136
1137        debug!(
1138            "Discovered CNAME record {}/{}",
1139            namespace,
1140            record.name_any()
1141        );
1142
1143        record_refs.push(crate::crd::RecordReference {
1144            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1145            kind: "CNAMERecord".to_string(),
1146            name: record.name_any(),
1147        });
1148    }
1149
1150    Ok(record_refs)
1151}
1152
1153/// Helper function to discover MX records matching a label selector.
1154async fn discover_mx_records(
1155    client: &Client,
1156    namespace: &str,
1157    selector: &crate::crd::LabelSelector,
1158) -> Result<Vec<crate::crd::RecordReference>> {
1159    use crate::crd::MXRecord;
1160    use std::collections::BTreeMap;
1161
1162    let api: Api<MXRecord> = Api::namespaced(client.clone(), namespace);
1163    let records = api.list(&ListParams::default()).await?;
1164
1165    let mut record_refs = Vec::new();
1166    for record in records {
1167        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1168
1169        if !selector.matches(&labels) {
1170            continue;
1171        }
1172
1173        debug!("Discovered MX record {}/{}", namespace, record.name_any());
1174
1175        record_refs.push(crate::crd::RecordReference {
1176            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1177            kind: "MXRecord".to_string(),
1178            name: record.name_any(),
1179        });
1180    }
1181
1182    Ok(record_refs)
1183}
1184
1185/// Helper function to discover NS records matching a label selector.
1186async fn discover_ns_records(
1187    client: &Client,
1188    namespace: &str,
1189    selector: &crate::crd::LabelSelector,
1190) -> Result<Vec<crate::crd::RecordReference>> {
1191    use crate::crd::NSRecord;
1192    use std::collections::BTreeMap;
1193
1194    let api: Api<NSRecord> = Api::namespaced(client.clone(), namespace);
1195    let records = api.list(&ListParams::default()).await?;
1196
1197    let mut record_refs = Vec::new();
1198    for record in records {
1199        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1200
1201        if !selector.matches(&labels) {
1202            continue;
1203        }
1204
1205        debug!("Discovered NS record {}/{}", namespace, record.name_any());
1206
1207        record_refs.push(crate::crd::RecordReference {
1208            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1209            kind: "NSRecord".to_string(),
1210            name: record.name_any(),
1211        });
1212    }
1213
1214    Ok(record_refs)
1215}
1216
1217/// Helper function to discover SRV records matching a label selector.
1218async fn discover_srv_records(
1219    client: &Client,
1220    namespace: &str,
1221    selector: &crate::crd::LabelSelector,
1222) -> Result<Vec<crate::crd::RecordReference>> {
1223    use crate::crd::SRVRecord;
1224    use std::collections::BTreeMap;
1225
1226    let api: Api<SRVRecord> = Api::namespaced(client.clone(), namespace);
1227    let records = api.list(&ListParams::default()).await?;
1228
1229    let mut record_refs = Vec::new();
1230    for record in records {
1231        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1232
1233        if !selector.matches(&labels) {
1234            continue;
1235        }
1236
1237        debug!("Discovered SRV record {}/{}", namespace, record.name_any());
1238
1239        record_refs.push(crate::crd::RecordReference {
1240            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1241            kind: "SRVRecord".to_string(),
1242            name: record.name_any(),
1243        });
1244    }
1245
1246    Ok(record_refs)
1247}
1248
1249/// Helper function to discover CAA records matching a label selector.
1250async fn discover_caa_records(
1251    client: &Client,
1252    namespace: &str,
1253    selector: &crate::crd::LabelSelector,
1254) -> Result<Vec<crate::crd::RecordReference>> {
1255    use crate::crd::CAARecord;
1256    use std::collections::BTreeMap;
1257
1258    let api: Api<CAARecord> = Api::namespaced(client.clone(), namespace);
1259    let records = api.list(&ListParams::default()).await?;
1260
1261    let mut record_refs = Vec::new();
1262    for record in records {
1263        let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1264
1265        if !selector.matches(&labels) {
1266            continue;
1267        }
1268
1269        debug!("Discovered CAA record {}/{}", namespace, record.name_any());
1270
1271        record_refs.push(crate::crd::RecordReference {
1272            api_version: "bindy.firestoned.io/v1beta1".to_string(),
1273            kind: "CAARecord".to_string(),
1274            name: record.name_any(),
1275        });
1276    }
1277
1278    Ok(record_refs)
1279}
1280
1281/// Deletes a DNS zone and its associated zone files.
1282///
1283/// # Arguments
1284///
1285/// * `_client` - Kubernetes API client (unused, for future extensions)
1286/// * `dnszone` - The `DNSZone` resource to delete
1287/// * `zone_manager` - BIND9 manager for removing zone files
1288///
1289/// # Returns
1290///
1291/// * `Ok(())` - If zone was deleted successfully
1292/// * `Err(_)` - If zone deletion failed
1293///
1294/// # Errors
1295///
1296/// Returns an error if BIND9 zone deletion fails.
1297pub async fn delete_dnszone(
1298    client: Client,
1299    dnszone: DNSZone,
1300    zone_manager: &crate::bind9::Bind9Manager,
1301) -> Result<()> {
1302    let namespace = dnszone.namespace().unwrap_or_default();
1303    let name = dnszone.name_any();
1304    let spec = &dnszone.spec;
1305
1306    // Extract and validate cluster reference
1307    let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
1308    let is_cluster_provider = spec.cluster_provider_ref.is_some();
1309
1310    info!("Deleting DNSZone: {}", name);
1311
1312    // Use the common helper to iterate through all endpoints
1313    // Don't load RNDC key (false) since zone deletion doesn't require it
1314    // Use "http" port for HTTP API operations
1315    let (_first_endpoint, total_endpoints) = for_each_primary_endpoint(
1316        &client,
1317        &namespace,
1318        &cluster_ref,
1319        is_cluster_provider,
1320        false, // with_rndc_key = false for zone deletion
1321        "http", // Use HTTP API port for zone deletion via bindcar API
1322        |pod_endpoint, instance_name, _rndc_key| {
1323            let zone_name = spec.zone_name.clone();
1324            let zone_manager = zone_manager.clone();
1325
1326            async move {
1327                info!(
1328                    "Deleting zone {} from endpoint {} (instance: {})",
1329                    zone_name, pod_endpoint, instance_name
1330                );
1331
1332                // Attempt to delete zone - if it fails (zone not found, endpoint unreachable, etc.),
1333                // log a warning but don't fail the deletion. This ensures DNSZones can be deleted
1334                // even if BIND9 instances are unavailable or the zone was already removed.
1335                if let Err(e) = zone_manager.delete_zone(&zone_name, &pod_endpoint).await {
1336                    warn!(
1337                        "Failed to delete zone {} from endpoint {} (instance: {}): {}. Continuing with deletion anyway.",
1338                        zone_name, pod_endpoint, instance_name, e
1339                    );
1340                } else {
1341                    debug!(
1342                        "Successfully deleted zone {} from endpoint {} (instance: {})",
1343                        zone_name, pod_endpoint, instance_name
1344                    );
1345                }
1346
1347                Ok(())
1348            }
1349        },
1350    )
1351    .await?;
1352
1353    info!(
1354        "Successfully deleted zone {} from {} primary endpoint(s) for cluster {}",
1355        spec.zone_name, total_endpoints, cluster_ref
1356    );
1357
1358    // Delete from all secondary instances
1359    let secondary_pods =
1360        find_all_secondary_pods(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
1361
1362    if !secondary_pods.is_empty() {
1363        // Get unique (instance_name, namespace) tuples
1364        let mut instance_tuples: Vec<(String, String)> = secondary_pods
1365            .iter()
1366            .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
1367            .collect();
1368        instance_tuples.sort();
1369        instance_tuples.dedup();
1370
1371        let mut secondary_endpoints_deleted = 0;
1372
1373        for (instance_name, instance_namespace) in &instance_tuples {
1374            let endpoints =
1375                get_endpoint(&client, instance_namespace, instance_name, "http").await?;
1376
1377            for endpoint in &endpoints {
1378                let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
1379
1380                info!(
1381                    "Deleting zone {} from secondary endpoint {} (instance: {})",
1382                    spec.zone_name, pod_endpoint, instance_name
1383                );
1384
1385                // Attempt to delete zone - if it fails, log a warning but don't fail the deletion
1386                if let Err(e) = zone_manager
1387                    .delete_zone(&spec.zone_name, &pod_endpoint)
1388                    .await
1389                {
1390                    warn!(
1391                        "Failed to delete zone {} from secondary endpoint {} (instance: {}): {}. Continuing with deletion anyway.",
1392                        spec.zone_name, pod_endpoint, instance_name, e
1393                    );
1394                } else {
1395                    debug!(
1396                        "Successfully deleted zone {} from secondary endpoint {} (instance: {})",
1397                        spec.zone_name, pod_endpoint, instance_name
1398                    );
1399                    secondary_endpoints_deleted += 1;
1400                }
1401            }
1402        }
1403
1404        info!(
1405            "Successfully deleted zone {} from {} secondary endpoint(s) for cluster {}",
1406            spec.zone_name, secondary_endpoints_deleted, cluster_ref
1407        );
1408    }
1409
1410    // Note: We don't need to reload after delzone because:
1411    // 1. rndc delzone immediately removes the zone from BIND9's running config
1412    // 2. BIND9 will clean up the zone file and journal files automatically
1413
1414    Ok(())
1415}
1416
1417/// Find `Bind9Instance` resources matching a label selector
1418async fn find_matching_instances(
1419    client: &Client,
1420    namespace: &str,
1421    selector: &crate::crd::LabelSelector,
1422) -> Result<Vec<String>> {
1423    use crate::crd::Bind9Instance;
1424
1425    let api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
1426
1427    // Build label selector string
1428    let label_selector = build_label_selector(selector);
1429
1430    let params = kube::api::ListParams::default();
1431    let params = if let Some(selector_str) = label_selector {
1432        params.labels(&selector_str)
1433    } else {
1434        params
1435    };
1436
1437    let instances = api.list(&params).await?;
1438
1439    let instance_names: Vec<String> = instances
1440        .items
1441        .iter()
1442        .map(kube::ResourceExt::name_any)
1443        .collect();
1444
1445    Ok(instance_names)
1446}
1447
1448/// Build a Kubernetes label selector string from our `LabelSelector`
1449pub(crate) fn build_label_selector(selector: &crate::crd::LabelSelector) -> Option<String> {
1450    let mut parts = Vec::new();
1451
1452    // Add match labels
1453    if let Some(labels) = &selector.match_labels {
1454        for (key, value) in labels {
1455            parts.push(format!("{key}={value}"));
1456        }
1457    }
1458
1459    if parts.is_empty() {
1460        None
1461    } else {
1462        Some(parts.join(","))
1463    }
1464}
1465
1466/// Helper struct for pod information
1467#[derive(Clone)]
1468pub struct PodInfo {
1469    pub name: String,
1470    pub ip: String,
1471    pub instance_name: String,
1472    pub namespace: String,
1473}
1474
1475/// Find ALL PRIMARY pods for the given `Bind9Cluster` or `ClusterBind9Provider`
1476///
1477/// Returns all running pods for PRIMARY instances in the cluster to ensure zone changes
1478/// are applied to all primary replicas consistently.
1479///
1480/// # Arguments
1481///
1482/// * `client` - Kubernetes API client
1483/// * `namespace` - Namespace to search in (ignored if `is_cluster_provider` is true)
1484/// * `cluster_name` - Name of the `Bind9Cluster` or `ClusterBind9Provider`
1485/// * `is_cluster_provider` - If true, searches all namespaces; if false, searches only the specified namespace
1486///
1487/// # Returns
1488///
1489/// A vector of `PodInfo` containing all running PRIMARY pods
1490///
1491/// # Errors
1492///
1493/// Returns an error if Kubernetes API operations fail
1494pub async fn find_all_primary_pods(
1495    client: &Client,
1496    namespace: &str,
1497    cluster_name: &str,
1498    is_cluster_provider: bool,
1499) -> Result<Vec<PodInfo>> {
1500    use crate::crd::{Bind9Instance, ServerRole};
1501
1502    // First, find all Bind9Instance resources that belong to this cluster and have role=primary
1503    let instance_api: Api<Bind9Instance> = if is_cluster_provider {
1504        Api::all(client.clone())
1505    } else {
1506        Api::namespaced(client.clone(), namespace)
1507    };
1508    let instances = instance_api.list(&ListParams::default()).await?;
1509
1510    // Store tuples of (instance_name, instance_namespace)
1511    let mut primary_instances: Vec<(String, String)> = Vec::new();
1512    for instance in instances.items {
1513        if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Primary {
1514            if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
1515                primary_instances.push((name, ns));
1516            }
1517        }
1518    }
1519
1520    if primary_instances.is_empty() {
1521        let search_scope = if is_cluster_provider {
1522            "all namespaces".to_string()
1523        } else {
1524            format!("namespace {namespace}")
1525        };
1526        return Err(anyhow!(
1527            "No PRIMARY Bind9Instance resources found for cluster {cluster_name} in {search_scope}"
1528        ));
1529    }
1530
1531    info!(
1532        "Found {} PRIMARY instance(s) for cluster {}: {:?}",
1533        primary_instances.len(),
1534        cluster_name,
1535        primary_instances
1536    );
1537
1538    let mut all_pod_infos = Vec::new();
1539
1540    for (instance_name, instance_namespace) in &primary_instances {
1541        // Now find all pods for this primary instance in its namespace
1542        let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
1543        // List pods with label selector matching the instance
1544        let label_selector = format!("app=bind9,instance={instance_name}");
1545        let lp = ListParams::default().labels(&label_selector);
1546
1547        let pods = pod_api.list(&lp).await?;
1548
1549        debug!(
1550            "Found {} pod(s) for PRIMARY instance {}",
1551            pods.items.len(),
1552            instance_name
1553        );
1554
1555        for pod in &pods.items {
1556            let pod_name = pod
1557                .metadata
1558                .name
1559                .as_ref()
1560                .ok_or_else(|| anyhow!("Pod has no name"))?
1561                .clone();
1562
1563            // Get pod IP
1564            let pod_ip = pod
1565                .status
1566                .as_ref()
1567                .and_then(|s| s.pod_ip.as_ref())
1568                .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
1569                .clone();
1570
1571            // Check if pod is running
1572            let phase = pod
1573                .status
1574                .as_ref()
1575                .and_then(|s| s.phase.as_ref())
1576                .map(String::as_str);
1577
1578            if phase == Some("Running") {
1579                all_pod_infos.push(PodInfo {
1580                    name: pod_name.clone(),
1581                    ip: pod_ip.clone(),
1582                    instance_name: instance_name.clone(),
1583                    namespace: instance_namespace.clone(),
1584                });
1585                debug!(
1586                    "Found running pod {} with IP {} in namespace {}",
1587                    pod_name, pod_ip, instance_namespace
1588                );
1589            } else {
1590                debug!(
1591                    "Skipping pod {} (phase: {:?}, not running)",
1592                    pod_name, phase
1593                );
1594            }
1595        }
1596    }
1597
1598    if all_pod_infos.is_empty() {
1599        return Err(anyhow!(
1600            "No running PRIMARY pods found for cluster {cluster_name} in namespace {namespace}"
1601        ));
1602    }
1603
1604    info!(
1605        "Found {} running PRIMARY pod(s) across {} instance(s) for cluster {}",
1606        all_pod_infos.len(),
1607        primary_instances.len(),
1608        cluster_name
1609    );
1610
1611    Ok(all_pod_infos)
1612}
1613
1614/// Find all SECONDARY pod IPs for a given cluster or global cluster.
1615///
1616/// This is a helper function that calls `find_all_secondary_pods` and extracts only the IPs.
1617///
1618/// Returns IP addresses of all running secondary pods in the cluster.
1619/// These IPs are used for configuring also-notify and allow-transfer on primary zones.
1620async fn find_all_secondary_pod_ips(
1621    client: &Client,
1622    namespace: &str,
1623    cluster_name: &str,
1624    is_cluster_provider: bool,
1625) -> Result<Vec<String>> {
1626    info!("Finding SECONDARY pod IPs for cluster {}", cluster_name);
1627
1628    let secondary_pods =
1629        find_all_secondary_pods(client, namespace, cluster_name, is_cluster_provider).await?;
1630
1631    let secondary_ips: Vec<String> = secondary_pods.iter().map(|pod| pod.ip.clone()).collect();
1632
1633    info!(
1634        "Found {} running SECONDARY pod IP(s) for cluster {}: {:?}",
1635        secondary_ips.len(),
1636        cluster_name,
1637        secondary_ips
1638    );
1639
1640    Ok(secondary_ips)
1641}
1642
1643/// Find all PRIMARY pod IPs for a given cluster or global cluster.
1644///
1645/// Returns IP addresses of all running primary pods in the cluster.
1646/// These IPs are used for configuring primaries on secondary zones.
1647async fn find_all_primary_pod_ips(
1648    client: &Client,
1649    namespace: &str,
1650    cluster_name: &str,
1651    is_cluster_provider: bool,
1652) -> Result<Vec<String>> {
1653    info!("Finding PRIMARY pod IPs for cluster {}", cluster_name);
1654
1655    let primary_pods =
1656        find_all_primary_pods(client, namespace, cluster_name, is_cluster_provider).await?;
1657
1658    let primary_ips: Vec<String> = primary_pods.iter().map(|pod| pod.ip.clone()).collect();
1659
1660    info!(
1661        "Found {} running PRIMARY pod IP(s) for cluster {}: {:?}",
1662        primary_ips.len(),
1663        cluster_name,
1664        primary_ips
1665    );
1666
1667    Ok(primary_ips)
1668}
1669
1670/// Find all SECONDARY pods for a given cluster or global cluster.
1671///
1672/// Returns structured pod information including IP, name, instance name, and namespace.
1673/// Similar to `find_all_primary_pods` but for secondary instances.
1674///
1675/// # Arguments
1676///
1677/// * `client` - Kubernetes API client
1678/// * `namespace` - Namespace to search in (ignored if `is_cluster_provider` is true)
1679/// * `cluster_name` - Name of the `Bind9Cluster` or `ClusterBind9Provider`
1680/// * `is_cluster_provider` - If true, searches all namespaces; if false, searches only the specified namespace
1681///
1682/// # Returns
1683///
1684/// A vector of `PodInfo` containing all running SECONDARY pods
1685async fn find_all_secondary_pods(
1686    client: &Client,
1687    namespace: &str,
1688    cluster_name: &str,
1689    is_cluster_provider: bool,
1690) -> Result<Vec<PodInfo>> {
1691    use crate::crd::{Bind9Instance, ServerRole};
1692
1693    // Find all Bind9Instance resources with role=SECONDARY for this cluster
1694    let instance_api: Api<Bind9Instance> = if is_cluster_provider {
1695        Api::all(client.clone())
1696    } else {
1697        Api::namespaced(client.clone(), namespace)
1698    };
1699    let instances = instance_api.list(&ListParams::default()).await?;
1700
1701    // Store tuples of (instance_name, instance_namespace)
1702    let mut secondary_instances: Vec<(String, String)> = Vec::new();
1703    for instance in instances.items {
1704        if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Secondary
1705        {
1706            if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
1707                secondary_instances.push((name, ns));
1708            }
1709        }
1710    }
1711
1712    if secondary_instances.is_empty() {
1713        info!("No SECONDARY instances found for cluster {cluster_name}");
1714        return Ok(Vec::new());
1715    }
1716
1717    info!(
1718        "Found {} SECONDARY instance(s) for cluster {}: {:?}",
1719        secondary_instances.len(),
1720        cluster_name,
1721        secondary_instances
1722    );
1723
1724    let mut all_pod_infos = Vec::new();
1725
1726    for (instance_name, instance_namespace) in &secondary_instances {
1727        // Find all pods for this secondary instance in its namespace
1728        let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
1729        let label_selector = format!("app=bind9,instance={instance_name}");
1730        let lp = ListParams::default().labels(&label_selector);
1731
1732        let pods = pod_api.list(&lp).await?;
1733
1734        debug!(
1735            "Found {} pod(s) for SECONDARY instance {}",
1736            pods.items.len(),
1737            instance_name
1738        );
1739
1740        for pod in &pods.items {
1741            let pod_name = pod
1742                .metadata
1743                .name
1744                .as_ref()
1745                .ok_or_else(|| anyhow!("Pod has no name"))?
1746                .clone();
1747
1748            // Get pod IP
1749            let pod_ip = pod
1750                .status
1751                .as_ref()
1752                .and_then(|s| s.pod_ip.as_ref())
1753                .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
1754                .clone();
1755
1756            // Check if pod is running
1757            let phase = pod
1758                .status
1759                .as_ref()
1760                .and_then(|s| s.phase.as_ref())
1761                .map(String::as_str);
1762
1763            if phase == Some("Running") {
1764                all_pod_infos.push(PodInfo {
1765                    name: pod_name.clone(),
1766                    ip: pod_ip.clone(),
1767                    instance_name: instance_name.clone(),
1768                    namespace: instance_namespace.clone(),
1769                });
1770                debug!(
1771                    "Found running secondary pod {} with IP {} in namespace {}",
1772                    pod_name, pod_ip, instance_namespace
1773                );
1774            } else {
1775                debug!(
1776                    "Skipping secondary pod {} (phase: {:?}, not running)",
1777                    pod_name, phase
1778                );
1779            }
1780        }
1781    }
1782
1783    info!(
1784        "Found {} running SECONDARY pod(s) across {} instance(s) for cluster {}",
1785        all_pod_infos.len(),
1786        secondary_instances.len(),
1787        cluster_name
1788    );
1789
1790    Ok(all_pod_infos)
1791}
1792
1793/// Load RNDC key from the instance's secret
1794async fn load_rndc_key(
1795    client: &Client,
1796    namespace: &str,
1797    instance_name: &str,
1798) -> Result<RndcKeyData> {
1799    let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
1800    let secret_name = format!("{instance_name}-rndc-key");
1801
1802    let secret = secret_api.get(&secret_name).await.context(format!(
1803        "Failed to get RNDC secret {secret_name} in namespace {namespace}"
1804    ))?;
1805
1806    let data = secret
1807        .data
1808        .as_ref()
1809        .ok_or_else(|| anyhow!("Secret {secret_name} has no data"))?;
1810
1811    // Convert ByteString to Vec<u8>
1812    let mut converted_data = std::collections::BTreeMap::new();
1813    for (key, value) in data {
1814        converted_data.insert(key.clone(), value.0.clone());
1815    }
1816
1817    crate::bind9::Bind9Manager::parse_rndc_secret_data(&converted_data)
1818}
1819
1820/// Check if the operator is running inside a Kubernetes cluster
1821///
1822/// Detects the environment by checking for the presence of the Kubernetes service account token,
1823/// which is automatically mounted in all pods running in the cluster.
1824///
1825/// Returns `true` if running in-cluster, `false` if running locally (e.g., via kubectl proxy)
1826fn is_running_in_cluster() -> bool {
1827    std::path::Path::new("/var/run/secrets/kubernetes.io/serviceaccount/token").exists()
1828}
1829
1830/// Update a single condition on the `DNSZone` status
1831///
1832/// This is a lightweight status update that only modifies the conditions field.
1833/// Use this for intermediate status updates during reconciliation.
1834async fn update_condition(
1835    client: &Client,
1836    dnszone: &DNSZone,
1837    condition_type: &str,
1838    status: &str,
1839    reason: &str,
1840    message: &str,
1841) -> Result<()> {
1842    let api: Api<DNSZone> =
1843        Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1844
1845    let condition = Condition {
1846        r#type: condition_type.to_string(),
1847        status: status.to_string(),
1848        last_transition_time: Some(Utc::now().to_rfc3339()),
1849        reason: Some(reason.to_string()),
1850        message: Some(message.to_string()),
1851    };
1852
1853    // Preserve existing status fields, only update conditions
1854    let current_status = dnszone.status.as_ref();
1855    let new_status = DNSZoneStatus {
1856        conditions: vec![condition],
1857        observed_generation: current_status
1858            .and_then(|s| s.observed_generation)
1859            .or(dnszone.metadata.generation),
1860        record_count: current_status.and_then(|s| s.record_count),
1861        secondary_ips: current_status.and_then(|s| s.secondary_ips.clone()),
1862        records: current_status
1863            .map(|s| s.records.clone())
1864            .unwrap_or_default(),
1865    };
1866
1867    let patch = json!({
1868        "status": new_status
1869    });
1870
1871    api.patch_status(
1872        &dnszone.name_any(),
1873        &PatchParams::default(),
1874        &Patch::Merge(&patch),
1875    )
1876    .await?;
1877
1878    debug!(
1879        "Updated DNSZone {}/{} condition: type={}, status={}, reason={}",
1880        dnszone.namespace().unwrap_or_default(),
1881        dnszone.name_any(),
1882        condition_type,
1883        status,
1884        reason
1885    );
1886
1887    Ok(())
1888}
1889
1890/// Update `DNSZone` status including secondary IPs
1891async fn update_status_with_secondaries(
1892    client: &Client,
1893    dnszone: &DNSZone,
1894    condition_type: &str,
1895    status: &str,
1896    reason: &str,
1897    message: &str,
1898    secondary_ips: Vec<String>,
1899) -> Result<()> {
1900    let api: Api<DNSZone> =
1901        Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1902
1903    let condition = Condition {
1904        r#type: condition_type.to_string(),
1905        status: status.to_string(),
1906        last_transition_time: Some(Utc::now().to_rfc3339()),
1907        reason: Some(reason.to_string()),
1908        message: Some(message.to_string()),
1909    };
1910
1911    let new_status = DNSZoneStatus {
1912        conditions: vec![condition],
1913        observed_generation: dnszone.metadata.generation,
1914        record_count: dnszone.status.as_ref().and_then(|s| s.record_count),
1915        secondary_ips: if secondary_ips.is_empty() {
1916            None
1917        } else {
1918            Some(secondary_ips)
1919        },
1920        records: dnszone
1921            .status
1922            .as_ref()
1923            .map(|s| s.records.clone())
1924            .unwrap_or_default(),
1925    };
1926
1927    let patch = json!({
1928        "status": new_status
1929    });
1930
1931    api.patch_status(
1932        &dnszone.name_any(),
1933        &PatchParams::default(),
1934        &Patch::Merge(&patch),
1935    )
1936    .await?;
1937
1938    info!(
1939        "Updated DNSZone {}/{} status: {}={}",
1940        dnszone.namespace().unwrap_or_default(),
1941        dnszone.name_any(),
1942        condition_type,
1943        status
1944    );
1945
1946    Ok(())
1947}
1948
1949async fn update_status(
1950    client: &Client,
1951    dnszone: &DNSZone,
1952    condition_type: &str,
1953    status: &str,
1954    message: &str,
1955) -> Result<()> {
1956    let api: Api<DNSZone> =
1957        Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1958
1959    // Check if status has actually changed
1960    let current_status = &dnszone.status;
1961    let status_changed = if let Some(current) = current_status {
1962        if let Some(current_condition) = current.conditions.first() {
1963            // Check if condition changed
1964            current_condition.r#type != condition_type
1965                || current_condition.status != status
1966                || current_condition.message.as_deref() != Some(message)
1967        } else {
1968            // No conditions exist, need to update
1969            true
1970        }
1971    } else {
1972        // No status exists, need to update
1973        true
1974    };
1975
1976    // Only update if status has changed
1977    if !status_changed {
1978        debug!(
1979            namespace = %dnszone.namespace().unwrap_or_default(),
1980            name = %dnszone.name_any(),
1981            "Status unchanged, skipping update"
1982        );
1983        info!(
1984            "DNSZone {}/{} status unchanged, skipping update",
1985            dnszone.namespace().unwrap_or_default(),
1986            dnszone.name_any()
1987        );
1988        return Ok(());
1989    }
1990
1991    debug!(
1992        condition_type = %condition_type,
1993        status = %status,
1994        message = %message,
1995        "Preparing status update"
1996    );
1997
1998    let condition = Condition {
1999        r#type: condition_type.to_string(),
2000        status: status.to_string(),
2001        reason: Some(condition_type.to_string()),
2002        message: Some(message.to_string()),
2003        last_transition_time: Some(Utc::now().to_rfc3339()),
2004    };
2005
2006    let new_status = DNSZoneStatus {
2007        conditions: vec![condition],
2008        observed_generation: dnszone.metadata.generation,
2009        record_count: None,
2010        secondary_ips: dnszone
2011            .status
2012            .as_ref()
2013            .and_then(|s| s.secondary_ips.clone()),
2014        records: dnszone
2015            .status
2016            .as_ref()
2017            .map(|s| s.records.clone())
2018            .unwrap_or_default(),
2019    };
2020
2021    info!(
2022        "Updating DNSZone {}/{} status",
2023        dnszone.namespace().unwrap_or_default(),
2024        dnszone.name_any()
2025    );
2026
2027    let patch = json!({ "status": new_status });
2028    api.patch_status(
2029        &dnszone.name_any(),
2030        &PatchParams::default(),
2031        &Patch::Merge(patch),
2032    )
2033    .await?;
2034
2035    Ok(())
2036}
2037
2038/// Structure representing an endpoint (pod IP and port)
2039#[derive(Debug, Clone)]
2040pub struct EndpointAddress {
2041    /// IP address of the pod
2042    pub ip: String,
2043    /// Container port number
2044    pub port: i32,
2045}
2046
2047/// Execute an operation on all endpoints of all primary instances in a cluster.
2048///
2049/// This helper function handles the common pattern of:
2050/// 1. Finding all primary pods for a cluster
2051/// 2. Collecting unique instance names
2052/// 3. Optionally loading RNDC key from the first instance
2053/// 4. Getting endpoints for each instance
2054/// 5. Executing a provided operation on each endpoint
2055///
2056/// # Arguments
2057/// * `client` - Kubernetes API client
2058/// * `namespace` - Namespace of the cluster
2059/// * `cluster_ref` - Name of the `Bind9Cluster`
2060/// * `with_rndc_key` - Whether to load RNDC key from first instance
2061/// * `operation` - Async closure to execute for each endpoint
2062///   - Arguments: `(pod_endpoint: String, instance_name: String, rndc_key: Option<RndcKeyData>)`
2063///   - Returns: `Result<()>`
2064///
2065/// # Returns
2066/// Returns `Ok((first_endpoint, total_count))` where:
2067/// - `first_endpoint` - Optional first endpoint encountered (useful for NOTIFY operations)
2068/// - `total_count` - Total number of endpoints processed
2069///
2070/// # Errors
2071/// Returns error if:
2072/// - No primary pods found for the cluster
2073/// - Failed to load RNDC key (if requested)
2074/// - Failed to get endpoints for any instance
2075/// - The operation closure returns an error for any endpoint
2076pub async fn for_each_primary_endpoint<F, Fut>(
2077    client: &Client,
2078    namespace: &str,
2079    cluster_ref: &str,
2080    is_cluster_provider: bool,
2081    with_rndc_key: bool,
2082    port_name: &str,
2083    operation: F,
2084) -> Result<(Option<String>, usize)>
2085where
2086    F: Fn(String, String, Option<RndcKeyData>) -> Fut,
2087    Fut: std::future::Future<Output = Result<()>>,
2088{
2089    // Find all PRIMARY pods to get the unique instance names
2090    let primary_pods =
2091        find_all_primary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
2092
2093    info!(
2094        "Found {} PRIMARY pod(s) for cluster {}",
2095        primary_pods.len(),
2096        cluster_ref
2097    );
2098
2099    // Collect unique (instance_name, namespace) tuples from the primary pods
2100    // Each instance may have multiple pods (replicas)
2101    let mut instance_tuples: Vec<(String, String)> = primary_pods
2102        .iter()
2103        .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
2104        .collect();
2105    instance_tuples.sort();
2106    instance_tuples.dedup();
2107
2108    info!(
2109        "Found {} primary instance(s) for cluster {}: {:?}",
2110        instance_tuples.len(),
2111        cluster_ref,
2112        instance_tuples
2113    );
2114
2115    let mut first_endpoint: Option<String> = None;
2116    let mut total_endpoints = 0;
2117
2118    // Loop through each primary instance and get its endpoints
2119    // Important: With EmptyDir storage (per-pod, non-shared), each primary pod maintains its own
2120    // zone files. We need to process ALL pods across ALL instances.
2121    for (instance_name, instance_namespace) in &instance_tuples {
2122        info!(
2123            "Getting endpoints for instance {}/{} in cluster {}",
2124            instance_namespace, instance_name, cluster_ref
2125        );
2126
2127        // Load RNDC key for this specific instance if requested
2128        // Each instance has its own RNDC secret for security isolation
2129        let key_data = if with_rndc_key {
2130            Some(load_rndc_key(client, instance_namespace, instance_name).await?)
2131        } else {
2132            None
2133        };
2134
2135        // Get all endpoints for this instance's service
2136        // The Endpoints API gives us pod IPs with their container ports (not service ports)
2137        let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
2138
2139        info!(
2140            "Found {} endpoint(s) for instance {}",
2141            endpoints.len(),
2142            instance_name
2143        );
2144
2145        for endpoint in &endpoints {
2146            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
2147
2148            // Save the first endpoint
2149            if first_endpoint.is_none() {
2150                first_endpoint = Some(pod_endpoint.clone());
2151            }
2152
2153            // Execute the operation on this endpoint with this instance's RNDC key
2154            operation(pod_endpoint, instance_name.clone(), key_data.clone()).await?;
2155
2156            total_endpoints += 1;
2157        }
2158    }
2159
2160    Ok((first_endpoint, total_endpoints))
2161}
2162
2163/// Execute an operation on all SECONDARY endpoints for a cluster.
2164///
2165/// Similar to `for_each_primary_endpoint`, but operates on SECONDARY instances.
2166/// Useful for triggering zone transfers or other secondary-specific operations.
2167///
2168/// # Arguments
2169///
2170/// * `client` - Kubernetes API client
2171/// * `namespace` - Namespace to search for instances
2172/// * `cluster_ref` - Cluster reference name
2173/// * `is_cluster_provider` - Whether this is a cluster provider (cluster-scoped)
2174/// * `with_rndc_key` - Whether to load and pass RNDC keys for each instance
2175/// * `port_name` - Port name to use for endpoints (e.g., "rndc-api", "dns-tcp")
2176/// * `operation` - Async closure to execute for each endpoint
2177///
2178/// # Returns
2179///
2180/// * `Ok((first_endpoint, total_endpoints))` - First endpoint found and total count
2181///
2182/// # Errors
2183///
2184/// Returns an error if:
2185/// - Failed to find secondary pods
2186/// - Failed to load RNDC keys
2187/// - Failed to get service endpoints
2188/// - The operation closure returns an error for any endpoint
2189pub async fn for_each_secondary_endpoint<F, Fut>(
2190    client: &Client,
2191    namespace: &str,
2192    cluster_ref: &str,
2193    is_cluster_provider: bool,
2194    with_rndc_key: bool,
2195    port_name: &str,
2196    operation: F,
2197) -> Result<(Option<String>, usize)>
2198where
2199    F: Fn(String, String, Option<RndcKeyData>) -> Fut,
2200    Fut: std::future::Future<Output = Result<()>>,
2201{
2202    // Find all SECONDARY pods to get the unique instance names
2203    let secondary_pods =
2204        find_all_secondary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
2205
2206    info!(
2207        "Found {} SECONDARY pod(s) for cluster {}",
2208        secondary_pods.len(),
2209        cluster_ref
2210    );
2211
2212    // Collect unique (instance_name, namespace) tuples from the secondary pods
2213    // Each instance may have multiple pods (replicas)
2214    let mut instance_tuples: Vec<(String, String)> = secondary_pods
2215        .iter()
2216        .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
2217        .collect();
2218    instance_tuples.sort();
2219    instance_tuples.dedup();
2220
2221    info!(
2222        "Found {} secondary instance(s) for cluster {}: {:?}",
2223        instance_tuples.len(),
2224        cluster_ref,
2225        instance_tuples
2226    );
2227
2228    let mut first_endpoint: Option<String> = None;
2229    let mut total_endpoints = 0;
2230
2231    // Loop through each secondary instance and get its endpoints
2232    for (instance_name, instance_namespace) in &instance_tuples {
2233        info!(
2234            "Getting endpoints for secondary instance {}/{} in cluster {}",
2235            instance_namespace, instance_name, cluster_ref
2236        );
2237
2238        // Load RNDC key for this specific instance if requested
2239        // Each instance has its own RNDC secret for security isolation
2240        let key_data = if with_rndc_key {
2241            Some(load_rndc_key(client, instance_namespace, instance_name).await?)
2242        } else {
2243            None
2244        };
2245
2246        // Get all endpoints for this instance's service
2247        // The Endpoints API gives us pod IPs with their container ports (not service ports)
2248        let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
2249
2250        info!(
2251            "Found {} endpoint(s) for secondary instance {}",
2252            endpoints.len(),
2253            instance_name
2254        );
2255
2256        for endpoint in &endpoints {
2257            let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
2258
2259            // Save the first endpoint
2260            if first_endpoint.is_none() {
2261                first_endpoint = Some(pod_endpoint.clone());
2262            }
2263
2264            // Execute the operation on this endpoint with this instance's RNDC key
2265            operation(pod_endpoint, instance_name.clone(), key_data.clone()).await?;
2266
2267            total_endpoints += 1;
2268        }
2269    }
2270
2271    Ok((first_endpoint, total_endpoints))
2272}
2273
2274/// Get all endpoints for a service with a specific port name
2275///
2276/// Looks up the Kubernetes Endpoints object associated with a service and returns
2277/// all pod IP addresses with their corresponding container ports.
2278///
2279/// When connecting directly to pod IPs, you must use the container port from the endpoints,
2280/// not the service port.
2281///
2282/// # Arguments
2283/// * `client` - Kubernetes API client
2284/// * `namespace` - Namespace of the service/endpoints
2285/// * `service_name` - Name of the service (endpoints have the same name)
2286/// * `port_name` - Name of the port to lookup (e.g., "http", "dns-tcp")
2287///
2288/// # Returns
2289/// Vector of `EndpointAddress` containing pod IP and container port pairs
2290///
2291/// # Errors
2292/// Returns error if:
2293/// - Endpoints object doesn't exist
2294/// - Port name not found in any endpoint subset
2295/// - No ready addresses found
2296pub async fn get_endpoint(
2297    client: &Client,
2298    namespace: &str,
2299    service_name: &str,
2300    port_name: &str,
2301) -> Result<Vec<EndpointAddress>> {
2302    let endpoints_api: Api<Endpoints> = Api::namespaced(client.clone(), namespace);
2303    let endpoints = endpoints_api.get(service_name).await.context(format!(
2304        "Failed to get endpoints for service {service_name}"
2305    ))?;
2306
2307    let mut result = Vec::new();
2308
2309    // Endpoints are organized into subsets. Each subset has:
2310    // - addresses: List of ready pod IPs
2311    // - ports: List of container ports
2312    if let Some(subsets) = endpoints.subsets {
2313        for subset in subsets {
2314            // Find the port in this subset
2315            if let Some(ports) = subset.ports {
2316                if let Some(endpoint_port) = ports
2317                    .iter()
2318                    .find(|p| p.name.as_ref().is_some_and(|name| name == port_name))
2319                {
2320                    let port = endpoint_port.port;
2321
2322                    // Get all ready addresses for this subset
2323                    if let Some(addresses) = subset.addresses {
2324                        for addr in addresses {
2325                            result.push(EndpointAddress {
2326                                ip: addr.ip.clone(),
2327                                port,
2328                            });
2329                        }
2330                    }
2331                }
2332            }
2333        }
2334    }
2335
2336    if result.is_empty() {
2337        return Err(anyhow!(
2338            "No ready endpoints found for service {service_name} with port '{port_name}'"
2339        ));
2340    }
2341
2342    Ok(result)
2343}
2344
2345/// Check if all discovered records are ready.
2346///
2347/// Queries each record in the list and checks if it has a "Ready" condition with status="True".
2348///
2349/// # Arguments
2350///
2351/// * `client` - Kubernetes API client
2352/// * `namespace` - Namespace containing the records
2353/// * `record_refs` - List of record references from `DNSZone.status.records`
2354///
2355/// # Returns
2356///
2357/// * `Ok(true)` - All records are ready
2358/// * `Ok(false)` - Some records are not ready
2359/// * `Err(_)` - API error
2360async fn check_all_records_ready(
2361    client: &Client,
2362    namespace: &str,
2363    record_refs: &[crate::crd::RecordReference],
2364) -> Result<bool> {
2365    use crate::crd::{
2366        AAAARecord, ARecord, CAARecord, CNAMERecord, MXRecord, NSRecord, SRVRecord, TXTRecord,
2367    };
2368
2369    for record_ref in record_refs {
2370        let is_ready = match record_ref.kind.as_str() {
2371            "ARecord" => {
2372                let api: Api<ARecord> = Api::namespaced(client.clone(), namespace);
2373                check_record_ready(&api, &record_ref.name).await?
2374            }
2375            "AAAARecord" => {
2376                let api: Api<AAAARecord> = Api::namespaced(client.clone(), namespace);
2377                check_record_ready(&api, &record_ref.name).await?
2378            }
2379            "TXTRecord" => {
2380                let api: Api<TXTRecord> = Api::namespaced(client.clone(), namespace);
2381                check_record_ready(&api, &record_ref.name).await?
2382            }
2383            "CNAMERecord" => {
2384                let api: Api<CNAMERecord> = Api::namespaced(client.clone(), namespace);
2385                check_record_ready(&api, &record_ref.name).await?
2386            }
2387            "MXRecord" => {
2388                let api: Api<MXRecord> = Api::namespaced(client.clone(), namespace);
2389                check_record_ready(&api, &record_ref.name).await?
2390            }
2391            "NSRecord" => {
2392                let api: Api<NSRecord> = Api::namespaced(client.clone(), namespace);
2393                check_record_ready(&api, &record_ref.name).await?
2394            }
2395            "SRVRecord" => {
2396                let api: Api<SRVRecord> = Api::namespaced(client.clone(), namespace);
2397                check_record_ready(&api, &record_ref.name).await?
2398            }
2399            "CAARecord" => {
2400                let api: Api<CAARecord> = Api::namespaced(client.clone(), namespace);
2401                check_record_ready(&api, &record_ref.name).await?
2402            }
2403            _ => {
2404                warn!(
2405                    "Unknown record kind: {}, skipping readiness check",
2406                    record_ref.kind
2407                );
2408                false
2409            }
2410        };
2411
2412        if !is_ready {
2413            debug!(
2414                "Record {}/{} (kind: {}) is not ready yet",
2415                namespace, record_ref.name, record_ref.kind
2416            );
2417            return Ok(false);
2418        }
2419    }
2420
2421    Ok(true)
2422}
2423
2424/// Check if a specific record is ready by examining its status conditions.
2425async fn check_record_ready<T>(api: &Api<T>, name: &str) -> Result<bool>
2426where
2427    T: kube::Resource<DynamicType = ()>
2428        + Clone
2429        + serde::de::DeserializeOwned
2430        + serde::Serialize
2431        + std::fmt::Debug
2432        + Send
2433        + Sync,
2434    <T as kube::Resource>::DynamicType: Default,
2435{
2436    let record = match api.get(name).await {
2437        Ok(r) => r,
2438        Err(e) => {
2439            warn!("Failed to get record {}: {}", name, e);
2440            return Ok(false);
2441        }
2442    };
2443
2444    // Use serde_json to access the status field dynamically
2445    let record_json = serde_json::to_value(&record)?;
2446    let status = record_json.get("status");
2447
2448    if let Some(status_obj) = status {
2449        if let Some(conditions) = status_obj.get("conditions").and_then(|c| c.as_array()) {
2450            for condition in conditions {
2451                if let (Some(type_val), Some(status_val)) = (
2452                    condition.get("type").and_then(|t| t.as_str()),
2453                    condition.get("status").and_then(|s| s.as_str()),
2454                ) {
2455                    if type_val == "Ready" && status_val == "True" {
2456                        return Ok(true);
2457                    }
2458                }
2459            }
2460        }
2461    }
2462
2463    Ok(false)
2464}
2465
2466/// Find all `DNSZones` that have selected a given record via label selectors.
2467///
2468/// This function is used by the watch mapper to determine which `DNSZones` should be
2469/// reconciled when a DNS record changes. It checks each `DNSZone`'s `status.records` list
2470/// to see if the record is present.
2471///
2472/// # Arguments
2473///
2474/// * `client` - Kubernetes API client
2475/// * `record_namespace` - Namespace of the record
2476/// * `record_kind` - Kind of the record (e.g., `"ARecord"`, `"TXTRecord"`)
2477/// * `record_name` - Name of the record resource
2478///
2479/// # Returns
2480///
2481/// A vector of tuples containing `(zone_name, zone_namespace)` for all `DNSZones` that have
2482/// selected this record.
2483///
2484/// # Errors
2485///
2486/// Returns an error if Kubernetes API operations fail.
2487pub async fn find_zones_selecting_record(
2488    client: &Client,
2489    record_namespace: &str,
2490    record_kind: &str,
2491    record_name: &str,
2492) -> Result<Vec<(String, String)>> {
2493    let api: Api<DNSZone> = Api::namespaced(client.clone(), record_namespace);
2494    let zones = api.list(&ListParams::default()).await?;
2495
2496    let mut selecting_zones = vec![];
2497
2498    for zone in zones {
2499        let Some(ref status) = zone.status else {
2500            continue;
2501        };
2502
2503        // Check if this record is in the zone's status.records list
2504        let is_selected = status
2505            .records
2506            .iter()
2507            .any(|r| r.kind == record_kind && r.name == record_name);
2508
2509        if is_selected {
2510            let zone_name = zone.name_any();
2511            let zone_namespace = zone.namespace().unwrap_or_default();
2512            selecting_zones.push((zone_name, zone_namespace));
2513        }
2514    }
2515
2516    Ok(selecting_zones)
2517}
2518
2519/// Trigger zone transfers to all secondary instances.
2520///
2521/// Uses the `rndc retransfer` command to initiate zone transfers from primaries to secondaries.
2522///
2523/// # Arguments
2524///
2525/// * `client` - Kubernetes API client
2526/// * `namespace` - Namespace containing the BIND9 instances
2527/// * `zone_name` - Name of the zone to transfer
2528/// * `cluster_ref` - Cluster reference
2529/// * `is_cluster_provider` - Whether this is a cluster provider reference
2530/// * `zone_manager` - BIND9 manager for zone operations
2531///
2532/// # Returns
2533///
2534/// * `Ok(usize)` - Number of secondaries that successfully initiated transfer
2535/// * `Err(_)` - If no secondaries found or all transfers failed
2536async fn trigger_zone_transfers(
2537    client: &Client,
2538    namespace: &str,
2539    zone_name: &str,
2540    cluster_ref: &str,
2541    is_cluster_provider: bool,
2542    zone_manager: &crate::bind9::Bind9Manager,
2543) -> Result<usize> {
2544    let (_first_endpoint, total_endpoints) = for_each_secondary_endpoint(
2545        client,
2546        namespace,
2547        cluster_ref,
2548        is_cluster_provider,
2549        false,  // with_rndc_key = false (not needed for retransfer)
2550        "http", // Use bindcar HTTP API port for zone operations
2551        |secondary_endpoint, instance_name, _rndc_key| {
2552            let zone_name = zone_name.to_string();
2553            let zone_manager = zone_manager.clone();
2554
2555            async move {
2556                zone_manager
2557                    .retransfer_zone(&zone_name, &secondary_endpoint)
2558                    .await
2559                    .with_context(|| format!(
2560                        "Failed to trigger zone transfer for {zone_name} on secondary {secondary_endpoint} (instance: {instance_name})"
2561                    ))?;
2562
2563                info!(
2564                    "Triggered zone transfer for {zone_name} on secondary {secondary_endpoint} (instance: {instance_name})"
2565                );
2566
2567                Ok(())
2568            }
2569        },
2570    )
2571    .await?;
2572
2573    if total_endpoints == 0 {
2574        warn!(
2575            "No secondary instances found for zone {} in cluster {}",
2576            zone_name, cluster_ref
2577        );
2578    }
2579
2580    Ok(total_endpoints)
2581}