1#![allow(dead_code)]
3use crate::bind9::RndcKeyData;
11use crate::constants::{ANNOTATION_ZONE_OWNER, ANNOTATION_ZONE_PREVIOUS_OWNER};
12use crate::crd::{Condition, DNSZone, DNSZoneSpec, DNSZoneStatus};
13use anyhow::{anyhow, Context, Result};
14use bindcar::{ZONE_TYPE_PRIMARY, ZONE_TYPE_SECONDARY};
15use chrono::Utc;
16use k8s_openapi::api::core::v1::{Endpoints, Pod, Secret};
17use kube::{
18 api::{ListParams, Patch, PatchParams},
19 client::Client,
20 Api, ResourceExt,
21};
22use serde_json::json;
23use std::collections::HashSet;
24use tracing::{debug, info, warn};
25
26pub fn get_cluster_ref_from_spec(
39 spec: &DNSZoneSpec,
40 namespace: &str,
41 name: &str,
42) -> Result<String> {
43 match (&spec.cluster_ref, &spec.cluster_provider_ref) {
44 (Some(ref cluster_name), None) => Ok(cluster_name.clone()),
45 (None, Some(ref cluster_provider_name)) => Ok(cluster_provider_name.clone()),
46 (Some(_), Some(_)) => Err(anyhow!(
47 "DNSZone {namespace}/{name} has both clusterRef and clusterProviderRef specified. \
48 Only one must be specified."
49 )),
50 (None, None) => Err(anyhow!(
51 "DNSZone {namespace}/{name} has neither clusterRef nor clusterProviderRef specified. \
52 Exactly one must be specified."
53 )),
54 }
55}
56
57#[allow(clippy::too_many_lines)]
98pub async fn reconcile_dnszone(
99 client: Client,
100 dnszone: DNSZone,
101 zone_manager: &crate::bind9::Bind9Manager,
102) -> Result<()> {
103 let namespace = dnszone.namespace().unwrap_or_default();
104 let name = dnszone.name_any();
105
106 info!("Reconciling DNSZone: {}/{}", namespace, name);
107 debug!(
108 namespace = %namespace,
109 name = %name,
110 generation = ?dnszone.metadata.generation,
111 "Starting DNSZone reconciliation"
112 );
113
114 let mut status_updater = crate::reconcilers::status::DNSZoneStatusUpdater::new(&dnszone);
116
117 let spec = &dnszone.spec;
119
120 let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
122 let is_cluster_provider = spec.cluster_provider_ref.is_some();
123
124 info!(
125 "DNSZone {}/{} references cluster '{}' (is_cluster_provider={}, cluster_ref={:?}, cluster_provider_ref={:?})",
126 namespace, name, cluster_ref, is_cluster_provider, spec.cluster_ref, spec.cluster_provider_ref
127 );
128
129 let current_generation = dnszone.metadata.generation;
131 let observed_generation = dnszone.status.as_ref().and_then(|s| s.observed_generation);
132
133 let first_reconciliation = observed_generation.is_none();
134 let spec_changed =
135 crate::reconcilers::should_reconcile(current_generation, observed_generation);
136
137 info!(
138 "Reconciling zone {} (first_reconciliation={}, spec_changed={})",
139 spec.zone_name, first_reconciliation, spec_changed
140 );
141
142 let (primary_count, secondary_count) = {
153 debug!("Ensuring BIND9 zone exists on all instances (declarative reconciliation)");
154
155 status_updater.set_condition(
157 "Progressing",
158 "True",
159 "PrimaryReconciling",
160 "Configuring zone on primary servers",
161 );
162
163 let primary_ips =
165 match find_all_primary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider)
166 .await
167 {
168 Ok(ips) if !ips.is_empty() => {
169 info!(
170 "Found {} primary server(s) for cluster {}: {:?}",
171 ips.len(),
172 cluster_ref,
173 ips
174 );
175 ips
176 }
177 Ok(_) => {
178 status_updater.set_condition(
179 "Degraded",
180 "True",
181 "PrimaryFailed",
182 &format!("No primary servers found for cluster {cluster_ref}"),
183 );
184 status_updater.apply(&client).await?;
186 return Err(anyhow!(
187 "No primary servers found for cluster {cluster_ref} - cannot configure zones"
188 ));
189 }
190 Err(e) => {
191 status_updater.set_condition(
192 "Degraded",
193 "True",
194 "PrimaryFailed",
195 &format!("Failed to find primary servers: {e}"),
196 );
197 status_updater.apply(&client).await?;
199 return Err(e);
200 }
201 };
202
203 let primary_count = match add_dnszone(client.clone(), dnszone.clone(), zone_manager).await {
205 Ok(count) => {
206 status_updater.set_condition(
208 "Progressing",
209 "True",
210 "PrimaryReconciled",
211 &format!(
212 "Zone {} configured on {} primary server(s)",
213 spec.zone_name, count
214 ),
215 );
216 count
217 }
218 Err(e) => {
219 status_updater.set_condition(
220 "Degraded",
221 "True",
222 "PrimaryFailed",
223 &format!("Failed to configure zone on primary servers: {e}"),
224 );
225 status_updater.apply(&client).await?;
227 return Err(e);
228 }
229 };
230
231 status_updater.set_condition(
233 "Progressing",
234 "True",
235 "SecondaryReconciling",
236 "Configuring zone on secondary servers",
237 );
238
239 let secondary_count = match add_dnszone_to_secondaries(
241 client.clone(),
242 dnszone.clone(),
243 zone_manager,
244 &primary_ips,
245 )
246 .await
247 {
248 Ok(count) => {
249 if count > 0 {
251 status_updater.set_condition(
252 "Progressing",
253 "True",
254 "SecondaryReconciled",
255 &format!(
256 "Zone {} configured on {} secondary server(s)",
257 spec.zone_name, count
258 ),
259 );
260 }
261 count
262 }
263 Err(e) => {
264 warn!(
266 "Failed to configure zone on secondary servers: {}. Primary servers are still operational.",
267 e
268 );
269 status_updater.set_condition(
270 "Degraded",
271 "True",
272 "SecondaryFailed",
273 &format!(
274 "Zone configured on {primary_count} primary server(s) but secondary configuration failed: {e}"
275 ),
276 );
277 0
278 }
279 };
280
281 (primary_count, secondary_count)
282 };
283
284 status_updater.set_condition(
286 "Progressing",
287 "True",
288 "RecordsDiscovering",
289 "Discovering DNS records via label selectors",
290 );
291
292 let record_refs = match reconcile_zone_records(client.clone(), dnszone.clone()).await {
293 Ok(refs) => {
294 info!(
295 "Discovered {} DNS record(s) for zone {} via label selectors",
296 refs.len(),
297 spec.zone_name
298 );
299 refs
300 }
301 Err(e) => {
302 warn!(
304 "Failed to discover DNS records for zone {}: {}. Zone is configured but record discovery failed.",
305 spec.zone_name, e
306 );
307 status_updater.set_condition(
308 "Degraded",
309 "True",
310 "RecordDiscoveryFailed",
311 &format!("Zone configured but record discovery failed: {e}"),
312 );
313 vec![]
314 }
315 };
316
317 let record_count = record_refs.len();
318
319 status_updater.set_records(record_refs.clone());
321
322 if record_count > 0 {
324 let all_records_ready = check_all_records_ready(&client, &namespace, &record_refs).await?;
325
326 if all_records_ready {
327 info!(
328 "All {} record(s) for zone {} are ready, triggering zone transfers to secondaries",
329 record_count, spec.zone_name
330 );
331
332 match trigger_zone_transfers(
334 &client,
335 &namespace,
336 &spec.zone_name,
337 &cluster_ref,
338 is_cluster_provider,
339 zone_manager,
340 )
341 .await
342 {
343 Ok(transfer_count) => {
344 info!(
345 "Successfully triggered zone transfer for {} to {} secondary instance(s)",
346 spec.zone_name, transfer_count
347 );
348 }
349 Err(e) => {
350 warn!(
351 "Failed to trigger zone transfers for {}: {}. Zone is configured but secondaries may be out of sync.",
352 spec.zone_name, e
353 );
354 status_updater.set_condition(
355 "Degraded",
356 "True",
357 "TransferFailed",
358 &format!("Zone configured but zone transfer failed: {e}"),
359 );
360 }
361 }
362 } else {
363 info!(
364 "Not all records for zone {} are ready yet, skipping zone transfer",
365 spec.zone_name
366 );
367 }
368 }
369
370 let secondary_ips =
372 find_all_secondary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider)
373 .await
374 .unwrap_or_default();
375 status_updater.set_secondary_ips(secondary_ips);
376
377 status_updater.set_observed_generation(current_generation);
379
380 status_updater.set_condition(
382 "Ready",
383 "True",
384 "ReconcileSucceeded",
385 &format!(
386 "Zone {} configured on {} primary and {} secondary server(s), discovered {} DNS record(s) for cluster {}",
387 spec.zone_name, primary_count, secondary_count, record_count, cluster_ref
388 ),
389 );
390
391 status_updater.apply(&client).await?;
393
394 Ok(())
395}
396
397pub async fn add_dnszone(
418 client: Client,
419 dnszone: DNSZone,
420 zone_manager: &crate::bind9::Bind9Manager,
421) -> Result<usize> {
422 let namespace = dnszone.namespace().unwrap_or_default();
423 let name = dnszone.name_any();
424 let spec = &dnszone.spec;
425
426 let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
428 let is_cluster_provider = spec.cluster_provider_ref.is_some();
429
430 info!("Adding DNSZone: {}", name);
431
432 let secondary_ips =
434 find_all_secondary_pod_ips(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
435
436 if secondary_ips.is_empty() {
437 warn!(
438 "No secondary servers found for cluster {} - zone transfers will not be configured",
439 cluster_ref
440 );
441 } else {
442 info!(
443 "Found {} secondary server(s) for cluster {} - zone transfers will be configured: {:?}",
444 secondary_ips.len(),
445 cluster_ref,
446 secondary_ips
447 );
448 }
449
450 let (first_endpoint, total_endpoints) = for_each_primary_endpoint(
454 &client,
455 &namespace,
456 &cluster_ref,
457 is_cluster_provider,
458 true, "http", |pod_endpoint, instance_name, rndc_key| {
461 let zone_name = spec.zone_name.clone();
462 let soa_record = spec.soa_record.clone();
463 let name_server_ips = spec.name_server_ips.clone();
464 let zone_manager = zone_manager.clone();
465 let secondary_ips_clone = secondary_ips.clone();
466
467 async move {
468 let key_data = rndc_key.expect("RNDC key should be loaded when with_rndc_key=true");
471
472 let secondary_ips_ref = if secondary_ips_clone.is_empty() {
474 None
475 } else {
476 Some(secondary_ips_clone.as_slice())
477 };
478
479 let was_added = zone_manager
480 .add_zones(
481 &zone_name,
482 ZONE_TYPE_PRIMARY,
483 &pod_endpoint,
484 &key_data,
485 Some(&soa_record),
486 name_server_ips.as_ref(),
487 secondary_ips_ref,
488 None, )
490 .await
491 .context(format!(
492 "Failed to add zone {zone_name} to endpoint {pod_endpoint} (instance: {instance_name})"
493 ))?;
494
495 if was_added {
496 info!(
497 "Successfully added zone {} to endpoint {} (instance: {})",
498 zone_name, pod_endpoint, instance_name
499 );
500 }
501
502 Ok(())
503 }
504 },
505 )
506 .await?;
507
508 info!(
509 "Successfully added zone {} to {} endpoint(s) for cluster {}",
510 spec.zone_name, total_endpoints, cluster_ref
511 );
512
513 if let Some(first_pod_endpoint) = first_endpoint {
521 info!(
522 "Notifying secondaries about new zone {} for cluster {}",
523 spec.zone_name, cluster_ref
524 );
525 if let Err(e) = zone_manager
526 .notify_zone(&spec.zone_name, &first_pod_endpoint)
527 .await
528 {
529 warn!(
532 "Failed to notify secondaries for zone {}: {}. Secondaries will sync via SOA refresh timer.",
533 spec.zone_name, e
534 );
535 }
536 } else {
537 warn!(
538 "No endpoints found for zone {}, cannot notify secondaries",
539 spec.zone_name
540 );
541 }
542
543 Ok(total_endpoints)
544}
545
546#[allow(clippy::too_many_lines)]
568pub async fn add_dnszone_to_secondaries(
569 client: Client,
570 dnszone: DNSZone,
571 zone_manager: &crate::bind9::Bind9Manager,
572 primary_ips: &[String],
573) -> Result<usize> {
574 let namespace = dnszone.namespace().unwrap_or_default();
575 let name = dnszone.name_any();
576 let spec = &dnszone.spec;
577
578 let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
580 let is_cluster_provider = spec.cluster_provider_ref.is_some();
581
582 if primary_ips.is_empty() {
583 warn!(
584 "No primary IPs provided for secondary zone {} - skipping secondary configuration",
585 spec.zone_name
586 );
587 return Ok(0);
588 }
589
590 info!(
591 "Adding DNSZone {} to secondary instances with primaries: {:?}",
592 name, primary_ips
593 );
594
595 let secondary_pods =
597 find_all_secondary_pods(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
598
599 if secondary_pods.is_empty() {
600 info!(
601 "No secondary servers found for cluster {} - skipping secondary zone configuration",
602 cluster_ref
603 );
604 return Ok(0);
605 }
606
607 info!(
608 "Found {} secondary pod(s) for cluster {}",
609 secondary_pods.len(),
610 cluster_ref
611 );
612
613 let mut instance_tuples: Vec<(String, String)> = secondary_pods
615 .iter()
616 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
617 .collect();
618 instance_tuples.sort();
619 instance_tuples.dedup();
620
621 if instance_tuples.is_empty() {
622 return Err(anyhow!(
623 "No secondary instances found for cluster {cluster_ref}"
624 ));
625 }
626
627 let mut total_endpoints = 0;
628
629 for (instance_name, instance_namespace) in &instance_tuples {
631 info!(
632 "Processing secondary instance {}/{} for zone {}",
633 instance_namespace, instance_name, spec.zone_name
634 );
635
636 let key_data = load_rndc_key(&client, instance_namespace, instance_name).await?;
639
640 let endpoints = get_endpoint(&client, instance_namespace, instance_name, "http").await?;
642
643 info!(
644 "Found {} endpoint(s) for secondary instance {}",
645 endpoints.len(),
646 instance_name
647 );
648
649 for endpoint in &endpoints {
650 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
651
652 info!(
653 "Adding secondary zone {} to endpoint {} (instance: {}) with primaries: {:?}",
654 spec.zone_name, pod_endpoint, instance_name, primary_ips
655 );
656
657 let was_added = zone_manager
658 .add_zones(
659 &spec.zone_name,
660 ZONE_TYPE_SECONDARY,
661 &pod_endpoint,
662 &key_data,
663 None, None, None, Some(primary_ips),
667 )
668 .await
669 .context(format!(
670 "Failed to add secondary zone {} to endpoint {} (instance: {})",
671 spec.zone_name, pod_endpoint, instance_name
672 ))?;
673
674 if was_added {
675 info!(
676 "Successfully added secondary zone {} to endpoint {} (instance: {})",
677 spec.zone_name, pod_endpoint, instance_name
678 );
679 } else {
680 info!(
681 "Secondary zone {} already exists on endpoint {} (instance: {})",
682 spec.zone_name, pod_endpoint, instance_name
683 );
684 }
685
686 total_endpoints += 1;
687 }
688 }
689
690 info!(
691 "Successfully configured secondary zone {} on {} endpoint(s) across {} instance(s) for cluster {}",
692 spec.zone_name,
693 total_endpoints,
694 instance_tuples.len(),
695 cluster_ref
696 );
697
698 Ok(total_endpoints)
699}
700
701#[allow(clippy::too_many_lines)]
727async fn reconcile_zone_records(
728 client: Client,
729 dnszone: DNSZone,
730) -> Result<Vec<crate::crd::RecordReference>> {
731 let namespace = dnszone.namespace().unwrap_or_default();
732 let spec = &dnszone.spec;
733 let zone_name = &spec.zone_name;
734
735 let Some(ref records_from) = spec.records_from else {
737 info!(
738 "No label selectors defined for zone {}, skipping record discovery",
739 zone_name
740 );
741 return Ok(Vec::new());
743 };
744
745 info!(
746 "Discovering DNS records for zone {} using {} label selector(s)",
747 zone_name,
748 records_from.len()
749 );
750
751 let mut all_record_refs = Vec::new();
752
753 for record_source in records_from {
755 let selector = &record_source.selector;
756
757 all_record_refs.extend(discover_a_records(&client, &namespace, selector).await?);
759 all_record_refs.extend(discover_aaaa_records(&client, &namespace, selector).await?);
760 all_record_refs.extend(discover_txt_records(&client, &namespace, selector).await?);
761 all_record_refs.extend(discover_cname_records(&client, &namespace, selector).await?);
762 all_record_refs.extend(discover_mx_records(&client, &namespace, selector).await?);
763 all_record_refs.extend(discover_ns_records(&client, &namespace, selector).await?);
764 all_record_refs.extend(discover_srv_records(&client, &namespace, selector).await?);
765 all_record_refs.extend(discover_caa_records(&client, &namespace, selector).await?);
766 }
767
768 info!(
769 "Discovered {} DNS record(s) for zone {}",
770 all_record_refs.len(),
771 zone_name
772 );
773
774 let previous_records: HashSet<String> = dnszone
776 .status
777 .as_ref()
778 .map(|s| {
779 s.records
780 .iter()
781 .map(|r| format!("{}/{}", r.kind, r.name))
782 .collect()
783 })
784 .unwrap_or_default();
785
786 let current_records: HashSet<String> = all_record_refs
788 .iter()
789 .map(|r| format!("{}/{}", r.kind, r.name))
790 .collect();
791
792 for record_ref in &all_record_refs {
794 let record_key = format!("{}/{}", record_ref.kind, record_ref.name);
795 if !previous_records.contains(&record_key) {
796 info!(
797 "Newly matched record: {} {}/{}",
798 record_ref.kind, namespace, record_ref.name
799 );
800 tag_record_with_zone(
801 &client,
802 &namespace,
803 &record_ref.kind,
804 &record_ref.name,
805 zone_name,
806 )
807 .await?;
808 }
809 }
810
811 for prev_record_key in &previous_records {
814 if !current_records.contains(prev_record_key.as_str()) {
815 if let Some((kind, name)) = prev_record_key.split_once('/') {
817 warn!(
818 "Record no longer matches zone {} (unmatched or deleted): {} {}/{}",
819 zone_name, kind, namespace, name
820 );
821
822 if let Err(e) =
825 untag_record_from_zone(&client, &namespace, kind, name, zone_name).await
826 {
827 if e.to_string().contains("NotFound") || e.to_string().contains("not found") {
829 info!(
830 "Record {} {}/{} was deleted, removing from zone {} status",
831 kind, namespace, name, zone_name
832 );
833 } else {
834 warn!(
836 "Failed to untag record {} {}/{} from zone {}: {}",
837 kind, namespace, name, zone_name, e
838 );
839 }
840 }
841 }
844 }
845 }
846
847 Ok(all_record_refs)
848}
849
850async fn tag_record_with_zone(
868 client: &Client,
869 namespace: &str,
870 kind: &str,
871 name: &str,
872 zone_fqdn: &str,
873) -> Result<()> {
874 debug!(
875 "Tagging {} {}/{} with zone {}",
876 kind, namespace, name, zone_fqdn
877 );
878
879 let plural = format!("{}s", kind.to_lowercase());
881
882 let gvk = kube::core::GroupVersionKind {
884 group: "bindy.firestoned.io".to_string(),
885 version: "v1beta1".to_string(),
886 kind: kind.to_string(),
887 };
888
889 let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
891
892 let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
894 client.clone(),
895 namespace,
896 &api_resource,
897 );
898
899 let annotation_patch = json!({
901 "metadata": {
902 "annotations": {
903 ANNOTATION_ZONE_OWNER: zone_fqdn
904 }
905 }
906 });
907
908 api.patch(
909 name,
910 &PatchParams::default(),
911 &Patch::Merge(&annotation_patch),
912 )
913 .await
914 .with_context(|| format!("Failed to add zone annotation to {kind} {namespace}/{name}"))?;
915
916 let status_patch = json!({
918 "status": {
919 "zone": zone_fqdn
920 }
921 });
922
923 api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
924 .await
925 .with_context(|| format!("Failed to set status.zone on {kind} {namespace}/{name}"))?;
926
927 info!(
928 "Successfully tagged {} {}/{} with zone {}",
929 kind, namespace, name, zone_fqdn
930 );
931
932 Ok(())
933}
934
935async fn untag_record_from_zone(
953 client: &Client,
954 namespace: &str,
955 kind: &str,
956 name: &str,
957 previous_zone_fqdn: &str,
958) -> Result<()> {
959 debug!(
960 "Untagging {} {}/{} from zone {}",
961 kind, namespace, name, previous_zone_fqdn
962 );
963
964 let plural = format!("{}s", kind.to_lowercase());
966
967 let gvk = kube::core::GroupVersionKind {
969 group: "bindy.firestoned.io".to_string(),
970 version: "v1beta1".to_string(),
971 kind: kind.to_string(),
972 };
973
974 let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
976
977 let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
979 client.clone(),
980 namespace,
981 &api_resource,
982 );
983
984 let annotation_patch = json!({
986 "metadata": {
987 "annotations": {
988 ANNOTATION_ZONE_OWNER: null,
989 ANNOTATION_ZONE_PREVIOUS_OWNER: previous_zone_fqdn
990 }
991 }
992 });
993
994 api.patch(
995 name,
996 &PatchParams::default(),
997 &Patch::Merge(&annotation_patch),
998 )
999 .await
1000 .with_context(|| format!("Failed to remove zone annotation from {kind} {namespace}/{name}"))?;
1001
1002 let status_patch = json!({
1004 "status": {
1005 "zone": null
1006 }
1007 });
1008
1009 api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
1010 .await
1011 .with_context(|| format!("Failed to clear status.zone on {kind} {namespace}/{name}"))?;
1012
1013 info!(
1014 "Successfully untagged {} {}/{} from zone {}",
1015 kind, namespace, name, previous_zone_fqdn
1016 );
1017
1018 Ok(())
1019}
1020
1021async fn discover_a_records(
1023 client: &Client,
1024 namespace: &str,
1025 selector: &crate::crd::LabelSelector,
1026) -> Result<Vec<crate::crd::RecordReference>> {
1027 use crate::crd::ARecord;
1028 use std::collections::BTreeMap;
1029
1030 let api: Api<ARecord> = Api::namespaced(client.clone(), namespace);
1031 let records = api.list(&ListParams::default()).await?;
1032
1033 let mut record_refs = Vec::new();
1034 for record in records {
1035 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1036
1037 if !selector.matches(&labels) {
1038 continue;
1039 }
1040
1041 debug!("Discovered A record {}/{}", namespace, record.name_any());
1042
1043 record_refs.push(crate::crd::RecordReference {
1044 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1045 kind: "ARecord".to_string(),
1046 name: record.name_any(),
1047 });
1048 }
1049
1050 Ok(record_refs)
1051}
1052
1053async fn discover_aaaa_records(
1055 client: &Client,
1056 namespace: &str,
1057 selector: &crate::crd::LabelSelector,
1058) -> Result<Vec<crate::crd::RecordReference>> {
1059 use crate::crd::AAAARecord;
1060 use std::collections::BTreeMap;
1061
1062 let api: Api<AAAARecord> = Api::namespaced(client.clone(), namespace);
1063 let records = api.list(&ListParams::default()).await?;
1064
1065 let mut record_refs = Vec::new();
1066 for record in records {
1067 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1068
1069 if !selector.matches(&labels) {
1070 continue;
1071 }
1072
1073 debug!("Discovered AAAA record {}/{}", namespace, record.name_any());
1074
1075 record_refs.push(crate::crd::RecordReference {
1076 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1077 kind: "AAAARecord".to_string(),
1078 name: record.name_any(),
1079 });
1080 }
1081
1082 Ok(record_refs)
1083}
1084
1085async fn discover_txt_records(
1087 client: &Client,
1088 namespace: &str,
1089 selector: &crate::crd::LabelSelector,
1090) -> Result<Vec<crate::crd::RecordReference>> {
1091 use crate::crd::TXTRecord;
1092 use std::collections::BTreeMap;
1093
1094 let api: Api<TXTRecord> = Api::namespaced(client.clone(), namespace);
1095 let records = api.list(&ListParams::default()).await?;
1096
1097 let mut record_refs = Vec::new();
1098 for record in records {
1099 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1100
1101 if !selector.matches(&labels) {
1102 continue;
1103 }
1104
1105 debug!("Discovered TXT record {}/{}", namespace, record.name_any());
1106
1107 record_refs.push(crate::crd::RecordReference {
1108 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1109 kind: "TXTRecord".to_string(),
1110 name: record.name_any(),
1111 });
1112 }
1113
1114 Ok(record_refs)
1115}
1116
1117async fn discover_cname_records(
1119 client: &Client,
1120 namespace: &str,
1121 selector: &crate::crd::LabelSelector,
1122) -> Result<Vec<crate::crd::RecordReference>> {
1123 use crate::crd::CNAMERecord;
1124 use std::collections::BTreeMap;
1125
1126 let api: Api<CNAMERecord> = Api::namespaced(client.clone(), namespace);
1127 let records = api.list(&ListParams::default()).await?;
1128
1129 let mut record_refs = Vec::new();
1130 for record in records {
1131 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1132
1133 if !selector.matches(&labels) {
1134 continue;
1135 }
1136
1137 debug!(
1138 "Discovered CNAME record {}/{}",
1139 namespace,
1140 record.name_any()
1141 );
1142
1143 record_refs.push(crate::crd::RecordReference {
1144 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1145 kind: "CNAMERecord".to_string(),
1146 name: record.name_any(),
1147 });
1148 }
1149
1150 Ok(record_refs)
1151}
1152
1153async fn discover_mx_records(
1155 client: &Client,
1156 namespace: &str,
1157 selector: &crate::crd::LabelSelector,
1158) -> Result<Vec<crate::crd::RecordReference>> {
1159 use crate::crd::MXRecord;
1160 use std::collections::BTreeMap;
1161
1162 let api: Api<MXRecord> = Api::namespaced(client.clone(), namespace);
1163 let records = api.list(&ListParams::default()).await?;
1164
1165 let mut record_refs = Vec::new();
1166 for record in records {
1167 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1168
1169 if !selector.matches(&labels) {
1170 continue;
1171 }
1172
1173 debug!("Discovered MX record {}/{}", namespace, record.name_any());
1174
1175 record_refs.push(crate::crd::RecordReference {
1176 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1177 kind: "MXRecord".to_string(),
1178 name: record.name_any(),
1179 });
1180 }
1181
1182 Ok(record_refs)
1183}
1184
1185async fn discover_ns_records(
1187 client: &Client,
1188 namespace: &str,
1189 selector: &crate::crd::LabelSelector,
1190) -> Result<Vec<crate::crd::RecordReference>> {
1191 use crate::crd::NSRecord;
1192 use std::collections::BTreeMap;
1193
1194 let api: Api<NSRecord> = Api::namespaced(client.clone(), namespace);
1195 let records = api.list(&ListParams::default()).await?;
1196
1197 let mut record_refs = Vec::new();
1198 for record in records {
1199 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1200
1201 if !selector.matches(&labels) {
1202 continue;
1203 }
1204
1205 debug!("Discovered NS record {}/{}", namespace, record.name_any());
1206
1207 record_refs.push(crate::crd::RecordReference {
1208 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1209 kind: "NSRecord".to_string(),
1210 name: record.name_any(),
1211 });
1212 }
1213
1214 Ok(record_refs)
1215}
1216
1217async fn discover_srv_records(
1219 client: &Client,
1220 namespace: &str,
1221 selector: &crate::crd::LabelSelector,
1222) -> Result<Vec<crate::crd::RecordReference>> {
1223 use crate::crd::SRVRecord;
1224 use std::collections::BTreeMap;
1225
1226 let api: Api<SRVRecord> = Api::namespaced(client.clone(), namespace);
1227 let records = api.list(&ListParams::default()).await?;
1228
1229 let mut record_refs = Vec::new();
1230 for record in records {
1231 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1232
1233 if !selector.matches(&labels) {
1234 continue;
1235 }
1236
1237 debug!("Discovered SRV record {}/{}", namespace, record.name_any());
1238
1239 record_refs.push(crate::crd::RecordReference {
1240 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1241 kind: "SRVRecord".to_string(),
1242 name: record.name_any(),
1243 });
1244 }
1245
1246 Ok(record_refs)
1247}
1248
1249async fn discover_caa_records(
1251 client: &Client,
1252 namespace: &str,
1253 selector: &crate::crd::LabelSelector,
1254) -> Result<Vec<crate::crd::RecordReference>> {
1255 use crate::crd::CAARecord;
1256 use std::collections::BTreeMap;
1257
1258 let api: Api<CAARecord> = Api::namespaced(client.clone(), namespace);
1259 let records = api.list(&ListParams::default()).await?;
1260
1261 let mut record_refs = Vec::new();
1262 for record in records {
1263 let labels: BTreeMap<String, String> = record.metadata.labels.clone().unwrap_or_default();
1264
1265 if !selector.matches(&labels) {
1266 continue;
1267 }
1268
1269 debug!("Discovered CAA record {}/{}", namespace, record.name_any());
1270
1271 record_refs.push(crate::crd::RecordReference {
1272 api_version: "bindy.firestoned.io/v1beta1".to_string(),
1273 kind: "CAARecord".to_string(),
1274 name: record.name_any(),
1275 });
1276 }
1277
1278 Ok(record_refs)
1279}
1280
1281pub async fn delete_dnszone(
1298 client: Client,
1299 dnszone: DNSZone,
1300 zone_manager: &crate::bind9::Bind9Manager,
1301) -> Result<()> {
1302 let namespace = dnszone.namespace().unwrap_or_default();
1303 let name = dnszone.name_any();
1304 let spec = &dnszone.spec;
1305
1306 let cluster_ref = get_cluster_ref_from_spec(spec, &namespace, &name)?;
1308 let is_cluster_provider = spec.cluster_provider_ref.is_some();
1309
1310 info!("Deleting DNSZone: {}", name);
1311
1312 let (_first_endpoint, total_endpoints) = for_each_primary_endpoint(
1316 &client,
1317 &namespace,
1318 &cluster_ref,
1319 is_cluster_provider,
1320 false, "http", |pod_endpoint, instance_name, _rndc_key| {
1323 let zone_name = spec.zone_name.clone();
1324 let zone_manager = zone_manager.clone();
1325
1326 async move {
1327 info!(
1328 "Deleting zone {} from endpoint {} (instance: {})",
1329 zone_name, pod_endpoint, instance_name
1330 );
1331
1332 if let Err(e) = zone_manager.delete_zone(&zone_name, &pod_endpoint).await {
1336 warn!(
1337 "Failed to delete zone {} from endpoint {} (instance: {}): {}. Continuing with deletion anyway.",
1338 zone_name, pod_endpoint, instance_name, e
1339 );
1340 } else {
1341 debug!(
1342 "Successfully deleted zone {} from endpoint {} (instance: {})",
1343 zone_name, pod_endpoint, instance_name
1344 );
1345 }
1346
1347 Ok(())
1348 }
1349 },
1350 )
1351 .await?;
1352
1353 info!(
1354 "Successfully deleted zone {} from {} primary endpoint(s) for cluster {}",
1355 spec.zone_name, total_endpoints, cluster_ref
1356 );
1357
1358 let secondary_pods =
1360 find_all_secondary_pods(&client, &namespace, &cluster_ref, is_cluster_provider).await?;
1361
1362 if !secondary_pods.is_empty() {
1363 let mut instance_tuples: Vec<(String, String)> = secondary_pods
1365 .iter()
1366 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
1367 .collect();
1368 instance_tuples.sort();
1369 instance_tuples.dedup();
1370
1371 let mut secondary_endpoints_deleted = 0;
1372
1373 for (instance_name, instance_namespace) in &instance_tuples {
1374 let endpoints =
1375 get_endpoint(&client, instance_namespace, instance_name, "http").await?;
1376
1377 for endpoint in &endpoints {
1378 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
1379
1380 info!(
1381 "Deleting zone {} from secondary endpoint {} (instance: {})",
1382 spec.zone_name, pod_endpoint, instance_name
1383 );
1384
1385 if let Err(e) = zone_manager
1387 .delete_zone(&spec.zone_name, &pod_endpoint)
1388 .await
1389 {
1390 warn!(
1391 "Failed to delete zone {} from secondary endpoint {} (instance: {}): {}. Continuing with deletion anyway.",
1392 spec.zone_name, pod_endpoint, instance_name, e
1393 );
1394 } else {
1395 debug!(
1396 "Successfully deleted zone {} from secondary endpoint {} (instance: {})",
1397 spec.zone_name, pod_endpoint, instance_name
1398 );
1399 secondary_endpoints_deleted += 1;
1400 }
1401 }
1402 }
1403
1404 info!(
1405 "Successfully deleted zone {} from {} secondary endpoint(s) for cluster {}",
1406 spec.zone_name, secondary_endpoints_deleted, cluster_ref
1407 );
1408 }
1409
1410 Ok(())
1415}
1416
1417async fn find_matching_instances(
1419 client: &Client,
1420 namespace: &str,
1421 selector: &crate::crd::LabelSelector,
1422) -> Result<Vec<String>> {
1423 use crate::crd::Bind9Instance;
1424
1425 let api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
1426
1427 let label_selector = build_label_selector(selector);
1429
1430 let params = kube::api::ListParams::default();
1431 let params = if let Some(selector_str) = label_selector {
1432 params.labels(&selector_str)
1433 } else {
1434 params
1435 };
1436
1437 let instances = api.list(¶ms).await?;
1438
1439 let instance_names: Vec<String> = instances
1440 .items
1441 .iter()
1442 .map(kube::ResourceExt::name_any)
1443 .collect();
1444
1445 Ok(instance_names)
1446}
1447
1448pub(crate) fn build_label_selector(selector: &crate::crd::LabelSelector) -> Option<String> {
1450 let mut parts = Vec::new();
1451
1452 if let Some(labels) = &selector.match_labels {
1454 for (key, value) in labels {
1455 parts.push(format!("{key}={value}"));
1456 }
1457 }
1458
1459 if parts.is_empty() {
1460 None
1461 } else {
1462 Some(parts.join(","))
1463 }
1464}
1465
1466#[derive(Clone)]
1468pub struct PodInfo {
1469 pub name: String,
1470 pub ip: String,
1471 pub instance_name: String,
1472 pub namespace: String,
1473}
1474
1475pub async fn find_all_primary_pods(
1495 client: &Client,
1496 namespace: &str,
1497 cluster_name: &str,
1498 is_cluster_provider: bool,
1499) -> Result<Vec<PodInfo>> {
1500 use crate::crd::{Bind9Instance, ServerRole};
1501
1502 let instance_api: Api<Bind9Instance> = if is_cluster_provider {
1504 Api::all(client.clone())
1505 } else {
1506 Api::namespaced(client.clone(), namespace)
1507 };
1508 let instances = instance_api.list(&ListParams::default()).await?;
1509
1510 let mut primary_instances: Vec<(String, String)> = Vec::new();
1512 for instance in instances.items {
1513 if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Primary {
1514 if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
1515 primary_instances.push((name, ns));
1516 }
1517 }
1518 }
1519
1520 if primary_instances.is_empty() {
1521 let search_scope = if is_cluster_provider {
1522 "all namespaces".to_string()
1523 } else {
1524 format!("namespace {namespace}")
1525 };
1526 return Err(anyhow!(
1527 "No PRIMARY Bind9Instance resources found for cluster {cluster_name} in {search_scope}"
1528 ));
1529 }
1530
1531 info!(
1532 "Found {} PRIMARY instance(s) for cluster {}: {:?}",
1533 primary_instances.len(),
1534 cluster_name,
1535 primary_instances
1536 );
1537
1538 let mut all_pod_infos = Vec::new();
1539
1540 for (instance_name, instance_namespace) in &primary_instances {
1541 let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
1543 let label_selector = format!("app=bind9,instance={instance_name}");
1545 let lp = ListParams::default().labels(&label_selector);
1546
1547 let pods = pod_api.list(&lp).await?;
1548
1549 debug!(
1550 "Found {} pod(s) for PRIMARY instance {}",
1551 pods.items.len(),
1552 instance_name
1553 );
1554
1555 for pod in &pods.items {
1556 let pod_name = pod
1557 .metadata
1558 .name
1559 .as_ref()
1560 .ok_or_else(|| anyhow!("Pod has no name"))?
1561 .clone();
1562
1563 let pod_ip = pod
1565 .status
1566 .as_ref()
1567 .and_then(|s| s.pod_ip.as_ref())
1568 .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
1569 .clone();
1570
1571 let phase = pod
1573 .status
1574 .as_ref()
1575 .and_then(|s| s.phase.as_ref())
1576 .map(String::as_str);
1577
1578 if phase == Some("Running") {
1579 all_pod_infos.push(PodInfo {
1580 name: pod_name.clone(),
1581 ip: pod_ip.clone(),
1582 instance_name: instance_name.clone(),
1583 namespace: instance_namespace.clone(),
1584 });
1585 debug!(
1586 "Found running pod {} with IP {} in namespace {}",
1587 pod_name, pod_ip, instance_namespace
1588 );
1589 } else {
1590 debug!(
1591 "Skipping pod {} (phase: {:?}, not running)",
1592 pod_name, phase
1593 );
1594 }
1595 }
1596 }
1597
1598 if all_pod_infos.is_empty() {
1599 return Err(anyhow!(
1600 "No running PRIMARY pods found for cluster {cluster_name} in namespace {namespace}"
1601 ));
1602 }
1603
1604 info!(
1605 "Found {} running PRIMARY pod(s) across {} instance(s) for cluster {}",
1606 all_pod_infos.len(),
1607 primary_instances.len(),
1608 cluster_name
1609 );
1610
1611 Ok(all_pod_infos)
1612}
1613
1614async fn find_all_secondary_pod_ips(
1621 client: &Client,
1622 namespace: &str,
1623 cluster_name: &str,
1624 is_cluster_provider: bool,
1625) -> Result<Vec<String>> {
1626 info!("Finding SECONDARY pod IPs for cluster {}", cluster_name);
1627
1628 let secondary_pods =
1629 find_all_secondary_pods(client, namespace, cluster_name, is_cluster_provider).await?;
1630
1631 let secondary_ips: Vec<String> = secondary_pods.iter().map(|pod| pod.ip.clone()).collect();
1632
1633 info!(
1634 "Found {} running SECONDARY pod IP(s) for cluster {}: {:?}",
1635 secondary_ips.len(),
1636 cluster_name,
1637 secondary_ips
1638 );
1639
1640 Ok(secondary_ips)
1641}
1642
1643async fn find_all_primary_pod_ips(
1648 client: &Client,
1649 namespace: &str,
1650 cluster_name: &str,
1651 is_cluster_provider: bool,
1652) -> Result<Vec<String>> {
1653 info!("Finding PRIMARY pod IPs for cluster {}", cluster_name);
1654
1655 let primary_pods =
1656 find_all_primary_pods(client, namespace, cluster_name, is_cluster_provider).await?;
1657
1658 let primary_ips: Vec<String> = primary_pods.iter().map(|pod| pod.ip.clone()).collect();
1659
1660 info!(
1661 "Found {} running PRIMARY pod IP(s) for cluster {}: {:?}",
1662 primary_ips.len(),
1663 cluster_name,
1664 primary_ips
1665 );
1666
1667 Ok(primary_ips)
1668}
1669
1670async fn find_all_secondary_pods(
1686 client: &Client,
1687 namespace: &str,
1688 cluster_name: &str,
1689 is_cluster_provider: bool,
1690) -> Result<Vec<PodInfo>> {
1691 use crate::crd::{Bind9Instance, ServerRole};
1692
1693 let instance_api: Api<Bind9Instance> = if is_cluster_provider {
1695 Api::all(client.clone())
1696 } else {
1697 Api::namespaced(client.clone(), namespace)
1698 };
1699 let instances = instance_api.list(&ListParams::default()).await?;
1700
1701 let mut secondary_instances: Vec<(String, String)> = Vec::new();
1703 for instance in instances.items {
1704 if instance.spec.cluster_ref == cluster_name && instance.spec.role == ServerRole::Secondary
1705 {
1706 if let (Some(name), Some(ns)) = (instance.metadata.name, instance.metadata.namespace) {
1707 secondary_instances.push((name, ns));
1708 }
1709 }
1710 }
1711
1712 if secondary_instances.is_empty() {
1713 info!("No SECONDARY instances found for cluster {cluster_name}");
1714 return Ok(Vec::new());
1715 }
1716
1717 info!(
1718 "Found {} SECONDARY instance(s) for cluster {}: {:?}",
1719 secondary_instances.len(),
1720 cluster_name,
1721 secondary_instances
1722 );
1723
1724 let mut all_pod_infos = Vec::new();
1725
1726 for (instance_name, instance_namespace) in &secondary_instances {
1727 let pod_api: Api<Pod> = Api::namespaced(client.clone(), instance_namespace);
1729 let label_selector = format!("app=bind9,instance={instance_name}");
1730 let lp = ListParams::default().labels(&label_selector);
1731
1732 let pods = pod_api.list(&lp).await?;
1733
1734 debug!(
1735 "Found {} pod(s) for SECONDARY instance {}",
1736 pods.items.len(),
1737 instance_name
1738 );
1739
1740 for pod in &pods.items {
1741 let pod_name = pod
1742 .metadata
1743 .name
1744 .as_ref()
1745 .ok_or_else(|| anyhow!("Pod has no name"))?
1746 .clone();
1747
1748 let pod_ip = pod
1750 .status
1751 .as_ref()
1752 .and_then(|s| s.pod_ip.as_ref())
1753 .ok_or_else(|| anyhow!("Pod {pod_name} has no IP address"))?
1754 .clone();
1755
1756 let phase = pod
1758 .status
1759 .as_ref()
1760 .and_then(|s| s.phase.as_ref())
1761 .map(String::as_str);
1762
1763 if phase == Some("Running") {
1764 all_pod_infos.push(PodInfo {
1765 name: pod_name.clone(),
1766 ip: pod_ip.clone(),
1767 instance_name: instance_name.clone(),
1768 namespace: instance_namespace.clone(),
1769 });
1770 debug!(
1771 "Found running secondary pod {} with IP {} in namespace {}",
1772 pod_name, pod_ip, instance_namespace
1773 );
1774 } else {
1775 debug!(
1776 "Skipping secondary pod {} (phase: {:?}, not running)",
1777 pod_name, phase
1778 );
1779 }
1780 }
1781 }
1782
1783 info!(
1784 "Found {} running SECONDARY pod(s) across {} instance(s) for cluster {}",
1785 all_pod_infos.len(),
1786 secondary_instances.len(),
1787 cluster_name
1788 );
1789
1790 Ok(all_pod_infos)
1791}
1792
1793async fn load_rndc_key(
1795 client: &Client,
1796 namespace: &str,
1797 instance_name: &str,
1798) -> Result<RndcKeyData> {
1799 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
1800 let secret_name = format!("{instance_name}-rndc-key");
1801
1802 let secret = secret_api.get(&secret_name).await.context(format!(
1803 "Failed to get RNDC secret {secret_name} in namespace {namespace}"
1804 ))?;
1805
1806 let data = secret
1807 .data
1808 .as_ref()
1809 .ok_or_else(|| anyhow!("Secret {secret_name} has no data"))?;
1810
1811 let mut converted_data = std::collections::BTreeMap::new();
1813 for (key, value) in data {
1814 converted_data.insert(key.clone(), value.0.clone());
1815 }
1816
1817 crate::bind9::Bind9Manager::parse_rndc_secret_data(&converted_data)
1818}
1819
1820fn is_running_in_cluster() -> bool {
1827 std::path::Path::new("/var/run/secrets/kubernetes.io/serviceaccount/token").exists()
1828}
1829
1830async fn update_condition(
1835 client: &Client,
1836 dnszone: &DNSZone,
1837 condition_type: &str,
1838 status: &str,
1839 reason: &str,
1840 message: &str,
1841) -> Result<()> {
1842 let api: Api<DNSZone> =
1843 Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1844
1845 let condition = Condition {
1846 r#type: condition_type.to_string(),
1847 status: status.to_string(),
1848 last_transition_time: Some(Utc::now().to_rfc3339()),
1849 reason: Some(reason.to_string()),
1850 message: Some(message.to_string()),
1851 };
1852
1853 let current_status = dnszone.status.as_ref();
1855 let new_status = DNSZoneStatus {
1856 conditions: vec![condition],
1857 observed_generation: current_status
1858 .and_then(|s| s.observed_generation)
1859 .or(dnszone.metadata.generation),
1860 record_count: current_status.and_then(|s| s.record_count),
1861 secondary_ips: current_status.and_then(|s| s.secondary_ips.clone()),
1862 records: current_status
1863 .map(|s| s.records.clone())
1864 .unwrap_or_default(),
1865 };
1866
1867 let patch = json!({
1868 "status": new_status
1869 });
1870
1871 api.patch_status(
1872 &dnszone.name_any(),
1873 &PatchParams::default(),
1874 &Patch::Merge(&patch),
1875 )
1876 .await?;
1877
1878 debug!(
1879 "Updated DNSZone {}/{} condition: type={}, status={}, reason={}",
1880 dnszone.namespace().unwrap_or_default(),
1881 dnszone.name_any(),
1882 condition_type,
1883 status,
1884 reason
1885 );
1886
1887 Ok(())
1888}
1889
1890async fn update_status_with_secondaries(
1892 client: &Client,
1893 dnszone: &DNSZone,
1894 condition_type: &str,
1895 status: &str,
1896 reason: &str,
1897 message: &str,
1898 secondary_ips: Vec<String>,
1899) -> Result<()> {
1900 let api: Api<DNSZone> =
1901 Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1902
1903 let condition = Condition {
1904 r#type: condition_type.to_string(),
1905 status: status.to_string(),
1906 last_transition_time: Some(Utc::now().to_rfc3339()),
1907 reason: Some(reason.to_string()),
1908 message: Some(message.to_string()),
1909 };
1910
1911 let new_status = DNSZoneStatus {
1912 conditions: vec![condition],
1913 observed_generation: dnszone.metadata.generation,
1914 record_count: dnszone.status.as_ref().and_then(|s| s.record_count),
1915 secondary_ips: if secondary_ips.is_empty() {
1916 None
1917 } else {
1918 Some(secondary_ips)
1919 },
1920 records: dnszone
1921 .status
1922 .as_ref()
1923 .map(|s| s.records.clone())
1924 .unwrap_or_default(),
1925 };
1926
1927 let patch = json!({
1928 "status": new_status
1929 });
1930
1931 api.patch_status(
1932 &dnszone.name_any(),
1933 &PatchParams::default(),
1934 &Patch::Merge(&patch),
1935 )
1936 .await?;
1937
1938 info!(
1939 "Updated DNSZone {}/{} status: {}={}",
1940 dnszone.namespace().unwrap_or_default(),
1941 dnszone.name_any(),
1942 condition_type,
1943 status
1944 );
1945
1946 Ok(())
1947}
1948
1949async fn update_status(
1950 client: &Client,
1951 dnszone: &DNSZone,
1952 condition_type: &str,
1953 status: &str,
1954 message: &str,
1955) -> Result<()> {
1956 let api: Api<DNSZone> =
1957 Api::namespaced(client.clone(), &dnszone.namespace().unwrap_or_default());
1958
1959 let current_status = &dnszone.status;
1961 let status_changed = if let Some(current) = current_status {
1962 if let Some(current_condition) = current.conditions.first() {
1963 current_condition.r#type != condition_type
1965 || current_condition.status != status
1966 || current_condition.message.as_deref() != Some(message)
1967 } else {
1968 true
1970 }
1971 } else {
1972 true
1974 };
1975
1976 if !status_changed {
1978 debug!(
1979 namespace = %dnszone.namespace().unwrap_or_default(),
1980 name = %dnszone.name_any(),
1981 "Status unchanged, skipping update"
1982 );
1983 info!(
1984 "DNSZone {}/{} status unchanged, skipping update",
1985 dnszone.namespace().unwrap_or_default(),
1986 dnszone.name_any()
1987 );
1988 return Ok(());
1989 }
1990
1991 debug!(
1992 condition_type = %condition_type,
1993 status = %status,
1994 message = %message,
1995 "Preparing status update"
1996 );
1997
1998 let condition = Condition {
1999 r#type: condition_type.to_string(),
2000 status: status.to_string(),
2001 reason: Some(condition_type.to_string()),
2002 message: Some(message.to_string()),
2003 last_transition_time: Some(Utc::now().to_rfc3339()),
2004 };
2005
2006 let new_status = DNSZoneStatus {
2007 conditions: vec![condition],
2008 observed_generation: dnszone.metadata.generation,
2009 record_count: None,
2010 secondary_ips: dnszone
2011 .status
2012 .as_ref()
2013 .and_then(|s| s.secondary_ips.clone()),
2014 records: dnszone
2015 .status
2016 .as_ref()
2017 .map(|s| s.records.clone())
2018 .unwrap_or_default(),
2019 };
2020
2021 info!(
2022 "Updating DNSZone {}/{} status",
2023 dnszone.namespace().unwrap_or_default(),
2024 dnszone.name_any()
2025 );
2026
2027 let patch = json!({ "status": new_status });
2028 api.patch_status(
2029 &dnszone.name_any(),
2030 &PatchParams::default(),
2031 &Patch::Merge(patch),
2032 )
2033 .await?;
2034
2035 Ok(())
2036}
2037
2038#[derive(Debug, Clone)]
2040pub struct EndpointAddress {
2041 pub ip: String,
2043 pub port: i32,
2045}
2046
2047pub async fn for_each_primary_endpoint<F, Fut>(
2077 client: &Client,
2078 namespace: &str,
2079 cluster_ref: &str,
2080 is_cluster_provider: bool,
2081 with_rndc_key: bool,
2082 port_name: &str,
2083 operation: F,
2084) -> Result<(Option<String>, usize)>
2085where
2086 F: Fn(String, String, Option<RndcKeyData>) -> Fut,
2087 Fut: std::future::Future<Output = Result<()>>,
2088{
2089 let primary_pods =
2091 find_all_primary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
2092
2093 info!(
2094 "Found {} PRIMARY pod(s) for cluster {}",
2095 primary_pods.len(),
2096 cluster_ref
2097 );
2098
2099 let mut instance_tuples: Vec<(String, String)> = primary_pods
2102 .iter()
2103 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
2104 .collect();
2105 instance_tuples.sort();
2106 instance_tuples.dedup();
2107
2108 info!(
2109 "Found {} primary instance(s) for cluster {}: {:?}",
2110 instance_tuples.len(),
2111 cluster_ref,
2112 instance_tuples
2113 );
2114
2115 let mut first_endpoint: Option<String> = None;
2116 let mut total_endpoints = 0;
2117
2118 for (instance_name, instance_namespace) in &instance_tuples {
2122 info!(
2123 "Getting endpoints for instance {}/{} in cluster {}",
2124 instance_namespace, instance_name, cluster_ref
2125 );
2126
2127 let key_data = if with_rndc_key {
2130 Some(load_rndc_key(client, instance_namespace, instance_name).await?)
2131 } else {
2132 None
2133 };
2134
2135 let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
2138
2139 info!(
2140 "Found {} endpoint(s) for instance {}",
2141 endpoints.len(),
2142 instance_name
2143 );
2144
2145 for endpoint in &endpoints {
2146 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
2147
2148 if first_endpoint.is_none() {
2150 first_endpoint = Some(pod_endpoint.clone());
2151 }
2152
2153 operation(pod_endpoint, instance_name.clone(), key_data.clone()).await?;
2155
2156 total_endpoints += 1;
2157 }
2158 }
2159
2160 Ok((first_endpoint, total_endpoints))
2161}
2162
2163pub async fn for_each_secondary_endpoint<F, Fut>(
2190 client: &Client,
2191 namespace: &str,
2192 cluster_ref: &str,
2193 is_cluster_provider: bool,
2194 with_rndc_key: bool,
2195 port_name: &str,
2196 operation: F,
2197) -> Result<(Option<String>, usize)>
2198where
2199 F: Fn(String, String, Option<RndcKeyData>) -> Fut,
2200 Fut: std::future::Future<Output = Result<()>>,
2201{
2202 let secondary_pods =
2204 find_all_secondary_pods(client, namespace, cluster_ref, is_cluster_provider).await?;
2205
2206 info!(
2207 "Found {} SECONDARY pod(s) for cluster {}",
2208 secondary_pods.len(),
2209 cluster_ref
2210 );
2211
2212 let mut instance_tuples: Vec<(String, String)> = secondary_pods
2215 .iter()
2216 .map(|pod| (pod.instance_name.clone(), pod.namespace.clone()))
2217 .collect();
2218 instance_tuples.sort();
2219 instance_tuples.dedup();
2220
2221 info!(
2222 "Found {} secondary instance(s) for cluster {}: {:?}",
2223 instance_tuples.len(),
2224 cluster_ref,
2225 instance_tuples
2226 );
2227
2228 let mut first_endpoint: Option<String> = None;
2229 let mut total_endpoints = 0;
2230
2231 for (instance_name, instance_namespace) in &instance_tuples {
2233 info!(
2234 "Getting endpoints for secondary instance {}/{} in cluster {}",
2235 instance_namespace, instance_name, cluster_ref
2236 );
2237
2238 let key_data = if with_rndc_key {
2241 Some(load_rndc_key(client, instance_namespace, instance_name).await?)
2242 } else {
2243 None
2244 };
2245
2246 let endpoints = get_endpoint(client, instance_namespace, instance_name, port_name).await?;
2249
2250 info!(
2251 "Found {} endpoint(s) for secondary instance {}",
2252 endpoints.len(),
2253 instance_name
2254 );
2255
2256 for endpoint in &endpoints {
2257 let pod_endpoint = format!("{}:{}", endpoint.ip, endpoint.port);
2258
2259 if first_endpoint.is_none() {
2261 first_endpoint = Some(pod_endpoint.clone());
2262 }
2263
2264 operation(pod_endpoint, instance_name.clone(), key_data.clone()).await?;
2266
2267 total_endpoints += 1;
2268 }
2269 }
2270
2271 Ok((first_endpoint, total_endpoints))
2272}
2273
2274pub async fn get_endpoint(
2297 client: &Client,
2298 namespace: &str,
2299 service_name: &str,
2300 port_name: &str,
2301) -> Result<Vec<EndpointAddress>> {
2302 let endpoints_api: Api<Endpoints> = Api::namespaced(client.clone(), namespace);
2303 let endpoints = endpoints_api.get(service_name).await.context(format!(
2304 "Failed to get endpoints for service {service_name}"
2305 ))?;
2306
2307 let mut result = Vec::new();
2308
2309 if let Some(subsets) = endpoints.subsets {
2313 for subset in subsets {
2314 if let Some(ports) = subset.ports {
2316 if let Some(endpoint_port) = ports
2317 .iter()
2318 .find(|p| p.name.as_ref().is_some_and(|name| name == port_name))
2319 {
2320 let port = endpoint_port.port;
2321
2322 if let Some(addresses) = subset.addresses {
2324 for addr in addresses {
2325 result.push(EndpointAddress {
2326 ip: addr.ip.clone(),
2327 port,
2328 });
2329 }
2330 }
2331 }
2332 }
2333 }
2334 }
2335
2336 if result.is_empty() {
2337 return Err(anyhow!(
2338 "No ready endpoints found for service {service_name} with port '{port_name}'"
2339 ));
2340 }
2341
2342 Ok(result)
2343}
2344
2345async fn check_all_records_ready(
2361 client: &Client,
2362 namespace: &str,
2363 record_refs: &[crate::crd::RecordReference],
2364) -> Result<bool> {
2365 use crate::crd::{
2366 AAAARecord, ARecord, CAARecord, CNAMERecord, MXRecord, NSRecord, SRVRecord, TXTRecord,
2367 };
2368
2369 for record_ref in record_refs {
2370 let is_ready = match record_ref.kind.as_str() {
2371 "ARecord" => {
2372 let api: Api<ARecord> = Api::namespaced(client.clone(), namespace);
2373 check_record_ready(&api, &record_ref.name).await?
2374 }
2375 "AAAARecord" => {
2376 let api: Api<AAAARecord> = Api::namespaced(client.clone(), namespace);
2377 check_record_ready(&api, &record_ref.name).await?
2378 }
2379 "TXTRecord" => {
2380 let api: Api<TXTRecord> = Api::namespaced(client.clone(), namespace);
2381 check_record_ready(&api, &record_ref.name).await?
2382 }
2383 "CNAMERecord" => {
2384 let api: Api<CNAMERecord> = Api::namespaced(client.clone(), namespace);
2385 check_record_ready(&api, &record_ref.name).await?
2386 }
2387 "MXRecord" => {
2388 let api: Api<MXRecord> = Api::namespaced(client.clone(), namespace);
2389 check_record_ready(&api, &record_ref.name).await?
2390 }
2391 "NSRecord" => {
2392 let api: Api<NSRecord> = Api::namespaced(client.clone(), namespace);
2393 check_record_ready(&api, &record_ref.name).await?
2394 }
2395 "SRVRecord" => {
2396 let api: Api<SRVRecord> = Api::namespaced(client.clone(), namespace);
2397 check_record_ready(&api, &record_ref.name).await?
2398 }
2399 "CAARecord" => {
2400 let api: Api<CAARecord> = Api::namespaced(client.clone(), namespace);
2401 check_record_ready(&api, &record_ref.name).await?
2402 }
2403 _ => {
2404 warn!(
2405 "Unknown record kind: {}, skipping readiness check",
2406 record_ref.kind
2407 );
2408 false
2409 }
2410 };
2411
2412 if !is_ready {
2413 debug!(
2414 "Record {}/{} (kind: {}) is not ready yet",
2415 namespace, record_ref.name, record_ref.kind
2416 );
2417 return Ok(false);
2418 }
2419 }
2420
2421 Ok(true)
2422}
2423
2424async fn check_record_ready<T>(api: &Api<T>, name: &str) -> Result<bool>
2426where
2427 T: kube::Resource<DynamicType = ()>
2428 + Clone
2429 + serde::de::DeserializeOwned
2430 + serde::Serialize
2431 + std::fmt::Debug
2432 + Send
2433 + Sync,
2434 <T as kube::Resource>::DynamicType: Default,
2435{
2436 let record = match api.get(name).await {
2437 Ok(r) => r,
2438 Err(e) => {
2439 warn!("Failed to get record {}: {}", name, e);
2440 return Ok(false);
2441 }
2442 };
2443
2444 let record_json = serde_json::to_value(&record)?;
2446 let status = record_json.get("status");
2447
2448 if let Some(status_obj) = status {
2449 if let Some(conditions) = status_obj.get("conditions").and_then(|c| c.as_array()) {
2450 for condition in conditions {
2451 if let (Some(type_val), Some(status_val)) = (
2452 condition.get("type").and_then(|t| t.as_str()),
2453 condition.get("status").and_then(|s| s.as_str()),
2454 ) {
2455 if type_val == "Ready" && status_val == "True" {
2456 return Ok(true);
2457 }
2458 }
2459 }
2460 }
2461 }
2462
2463 Ok(false)
2464}
2465
2466pub async fn find_zones_selecting_record(
2488 client: &Client,
2489 record_namespace: &str,
2490 record_kind: &str,
2491 record_name: &str,
2492) -> Result<Vec<(String, String)>> {
2493 let api: Api<DNSZone> = Api::namespaced(client.clone(), record_namespace);
2494 let zones = api.list(&ListParams::default()).await?;
2495
2496 let mut selecting_zones = vec![];
2497
2498 for zone in zones {
2499 let Some(ref status) = zone.status else {
2500 continue;
2501 };
2502
2503 let is_selected = status
2505 .records
2506 .iter()
2507 .any(|r| r.kind == record_kind && r.name == record_name);
2508
2509 if is_selected {
2510 let zone_name = zone.name_any();
2511 let zone_namespace = zone.namespace().unwrap_or_default();
2512 selecting_zones.push((zone_name, zone_namespace));
2513 }
2514 }
2515
2516 Ok(selecting_zones)
2517}
2518
2519async fn trigger_zone_transfers(
2537 client: &Client,
2538 namespace: &str,
2539 zone_name: &str,
2540 cluster_ref: &str,
2541 is_cluster_provider: bool,
2542 zone_manager: &crate::bind9::Bind9Manager,
2543) -> Result<usize> {
2544 let (_first_endpoint, total_endpoints) = for_each_secondary_endpoint(
2545 client,
2546 namespace,
2547 cluster_ref,
2548 is_cluster_provider,
2549 false, "http", |secondary_endpoint, instance_name, _rndc_key| {
2552 let zone_name = zone_name.to_string();
2553 let zone_manager = zone_manager.clone();
2554
2555 async move {
2556 zone_manager
2557 .retransfer_zone(&zone_name, &secondary_endpoint)
2558 .await
2559 .with_context(|| format!(
2560 "Failed to trigger zone transfer for {zone_name} on secondary {secondary_endpoint} (instance: {instance_name})"
2561 ))?;
2562
2563 info!(
2564 "Triggered zone transfer for {zone_name} on secondary {secondary_endpoint} (instance: {instance_name})"
2565 );
2566
2567 Ok(())
2568 }
2569 },
2570 )
2571 .await?;
2572
2573 if total_endpoints == 0 {
2574 warn!(
2575 "No secondary instances found for zone {} in cluster {}",
2576 zone_name, cluster_ref
2577 );
2578 }
2579
2580 Ok(total_endpoints)
2581}