1#![allow(unused_imports)] use anyhow::{Context as AnyhowContext, Result};
12use kube::{
13 api::{ListParams, Patch, PatchParams},
14 Api, Client, ResourceExt,
15};
16use serde_json::json;
17use std::collections::HashSet;
18use tracing::{debug, info, warn};
19
20use crate::crd::DNSZone;
21use crate::reconcilers::pagination::list_all_paginated;
22
23#[allow(clippy::too_many_lines)]
49pub async fn reconcile_zone_records(
50 client: Client,
51 dnszone: DNSZone,
52) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
53 let namespace = dnszone.namespace().unwrap_or_default();
54 let spec = &dnszone.spec;
55 let zone_name = &spec.zone_name;
56
57 let Some(ref records_from) = spec.records_from else {
59 info!(
60 "No label selectors defined for zone {}, skipping record discovery",
61 zone_name
62 );
63 return Ok(Vec::new());
65 };
66
67 info!(
68 "Discovering DNS records for zone {} using {} label selector(s)",
69 zone_name,
70 records_from.len()
71 );
72
73 let mut all_record_refs = Vec::new();
74
75 for record_source in records_from {
77 let selector = &record_source.selector;
78
79 all_record_refs.extend(discover_a_records(&client, &namespace, selector, zone_name).await?);
81 all_record_refs
82 .extend(discover_aaaa_records(&client, &namespace, selector, zone_name).await?);
83 all_record_refs
84 .extend(discover_txt_records(&client, &namespace, selector, zone_name).await?);
85 all_record_refs
86 .extend(discover_cname_records(&client, &namespace, selector, zone_name).await?);
87 all_record_refs
88 .extend(discover_mx_records(&client, &namespace, selector, zone_name).await?);
89 all_record_refs
90 .extend(discover_ns_records(&client, &namespace, selector, zone_name).await?);
91 all_record_refs
92 .extend(discover_srv_records(&client, &namespace, selector, zone_name).await?);
93 all_record_refs
94 .extend(discover_caa_records(&client, &namespace, selector, zone_name).await?);
95 }
96
97 info!(
98 "Discovered {} DNS record(s) for zone {}",
99 all_record_refs.len(),
100 zone_name
101 );
102
103 let previous_records: HashSet<String> = dnszone
105 .status
106 .as_ref()
107 .map(|s| {
108 s.records
109 .iter()
110 .map(|r| format!("{}/{}", r.kind, r.name))
111 .collect()
112 })
113 .unwrap_or_default();
114
115 let current_records: HashSet<String> = all_record_refs
117 .iter()
118 .map(|r| format!("{}/{}", r.kind, r.name))
119 .collect();
120
121 for record_ref in &all_record_refs {
126 let record_key = format!("{}/{}", record_ref.kind, record_ref.name);
127 let is_new = !previous_records.contains(&record_key);
128
129 if is_new {
130 info!(
131 "Newly matched record: {} {}/{}",
132 record_ref.kind, namespace, record_ref.name
133 );
134 } else {
135 debug!(
136 "Re-tagging existing record to ensure status.zoneRef: {} {}/{}",
137 record_ref.kind, namespace, record_ref.name
138 );
139 }
140
141 tag_record_with_zone(
142 &client,
143 &namespace,
144 &record_ref.kind,
145 &record_ref.name,
146 zone_name,
147 &dnszone,
148 )
149 .await?;
150 }
151
152 for prev_record_key in &previous_records {
155 if !current_records.contains(prev_record_key.as_str()) {
156 if let Some((kind, name)) = prev_record_key.split_once('/') {
158 warn!(
159 "Record no longer matches zone {} (unmatched or deleted): {} {}/{}",
160 zone_name, kind, namespace, name
161 );
162
163 if let Err(e) =
166 untag_record_from_zone(&client, &namespace, kind, name, zone_name).await
167 {
168 if e.to_string().contains("NotFound") || e.to_string().contains("not found") {
170 info!(
171 "Record {} {}/{} was deleted, removing from zone {} status",
172 kind, namespace, name, zone_name
173 );
174 } else {
175 warn!(
177 "Failed to untag record {} {}/{} from zone {}: {}",
178 kind, namespace, name, zone_name, e
179 );
180 }
181 }
182 }
185 }
186 }
187
188 if let Some(status) = &dnszone.status {
191 let existing_timestamps: std::collections::HashMap<String, _> = status
192 .records
193 .iter()
194 .filter_map(|r| {
195 r.last_reconciled_at
196 .as_ref()
197 .map(|timestamp| (format!("{}/{}", r.kind, r.name), timestamp.clone()))
198 })
199 .collect();
200
201 for record_ref in &mut all_record_refs {
203 let key = format!("{}/{}", record_ref.kind, record_ref.name);
204 if let Some(existing_timestamp) = existing_timestamps.get(&key) {
205 record_ref.last_reconciled_at = Some(existing_timestamp.clone());
206 }
207 }
208 }
209
210 Ok(all_record_refs)
211}
212
213async fn tag_record_with_zone(
232 client: &Client,
233 namespace: &str,
234 kind: &str,
235 name: &str,
236 zone_fqdn: &str,
237 dnszone: &DNSZone,
238) -> Result<()> {
239 debug!(
240 "Tagging {} {}/{} with zone {}",
241 kind, namespace, name, zone_fqdn
242 );
243
244 let plural = format!("{}s", kind.to_lowercase());
246
247 let gvk = kube::core::GroupVersionKind {
249 group: "bindy.firestoned.io".to_string(),
250 version: "v1beta1".to_string(),
251 kind: kind.to_string(),
252 };
253
254 let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
256
257 let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
259 client.clone(),
260 namespace,
261 &api_resource,
262 );
263
264 let zone_ref = crate::crd::ZoneReference {
266 api_version: crate::constants::API_GROUP_VERSION.to_string(),
267 kind: crate::constants::KIND_DNS_ZONE.to_string(),
268 name: dnszone.name_any(),
269 namespace: dnszone.namespace().unwrap_or_default(),
270 zone_name: zone_fqdn.to_string(),
271 last_reconciled_at: None, };
273
274 let status_patch = json!({
276 "status": {
277 "zone": zone_fqdn,
278 "zoneRef": zone_ref
279 }
280 });
281
282 api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
283 .await
284 .with_context(|| {
285 format!("Failed to set status.zone and status.zoneRef on {kind} {namespace}/{name}")
286 })?;
287
288 info!(
289 "Successfully tagged {} {}/{} with zone {} (set status.zoneRef)",
290 kind, namespace, name, zone_fqdn
291 );
292
293 Ok(())
294}
295
296async fn untag_record_from_zone(
318 client: &Client,
319 namespace: &str,
320 kind: &str,
321 name: &str,
322 previous_zone_fqdn: &str,
323) -> Result<()> {
324 debug!(
325 "Untagging {} {}/{} from zone {} (clearing status.zoneRef)",
326 kind, namespace, name, previous_zone_fqdn
327 );
328
329 let plural = format!("{}s", kind.to_lowercase());
331
332 let gvk = kube::core::GroupVersionKind {
334 group: "bindy.firestoned.io".to_string(),
335 version: "v1beta1".to_string(),
336 kind: kind.to_string(),
337 };
338
339 let api_resource = kube::api::ApiResource::from_gvk_with_plural(&gvk, &plural);
341
342 let api = kube::api::Api::<kube::api::DynamicObject>::namespaced_with(
344 client.clone(),
345 namespace,
346 &api_resource,
347 );
348
349 let status_patch = json!({
351 "status": {
352 "zoneRef": null,
353 "zone": null }
355 });
356
357 api.patch_status(name, &PatchParams::default(), &Patch::Merge(&status_patch))
358 .await
359 .with_context(|| format!("Failed to clear status.zoneRef on {kind} {namespace}/{name}"))?;
360
361 info!(
362 "Successfully untagged {} {}/{} from zone {} (cleared status.zoneRef)",
363 kind, namespace, name, previous_zone_fqdn
364 );
365
366 Ok(())
367}
368
369trait DiscoverableRecord:
374 kube::Resource<DynamicType = (), Scope = k8s_openapi::NamespaceResourceScope>
375 + Clone
376 + std::fmt::Debug
377 + serde::de::DeserializeOwned
378 + kube::ResourceExt
379{
380 fn dns_record_kind() -> crate::crd::DNSRecordKind;
382
383 fn spec_name(&self) -> &str;
385
386 fn record_status(&self) -> Option<&crate::crd::RecordStatus>;
388}
389
390impl DiscoverableRecord for crate::crd::ARecord {
393 fn dns_record_kind() -> crate::crd::DNSRecordKind {
394 crate::crd::DNSRecordKind::A
395 }
396
397 fn spec_name(&self) -> &str {
398 &self.spec.name
399 }
400
401 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
402 self.status.as_ref()
403 }
404}
405
406impl DiscoverableRecord for crate::crd::AAAARecord {
407 fn dns_record_kind() -> crate::crd::DNSRecordKind {
408 crate::crd::DNSRecordKind::AAAA
409 }
410
411 fn spec_name(&self) -> &str {
412 &self.spec.name
413 }
414
415 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
416 self.status.as_ref()
417 }
418}
419
420impl DiscoverableRecord for crate::crd::TXTRecord {
421 fn dns_record_kind() -> crate::crd::DNSRecordKind {
422 crate::crd::DNSRecordKind::TXT
423 }
424
425 fn spec_name(&self) -> &str {
426 &self.spec.name
427 }
428
429 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
430 self.status.as_ref()
431 }
432}
433
434impl DiscoverableRecord for crate::crd::CNAMERecord {
435 fn dns_record_kind() -> crate::crd::DNSRecordKind {
436 crate::crd::DNSRecordKind::CNAME
437 }
438
439 fn spec_name(&self) -> &str {
440 &self.spec.name
441 }
442
443 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
444 self.status.as_ref()
445 }
446}
447
448impl DiscoverableRecord for crate::crd::MXRecord {
449 fn dns_record_kind() -> crate::crd::DNSRecordKind {
450 crate::crd::DNSRecordKind::MX
451 }
452
453 fn spec_name(&self) -> &str {
454 &self.spec.name
455 }
456
457 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
458 self.status.as_ref()
459 }
460}
461
462impl DiscoverableRecord for crate::crd::NSRecord {
463 fn dns_record_kind() -> crate::crd::DNSRecordKind {
464 crate::crd::DNSRecordKind::NS
465 }
466
467 fn spec_name(&self) -> &str {
468 &self.spec.name
469 }
470
471 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
472 self.status.as_ref()
473 }
474}
475
476impl DiscoverableRecord for crate::crd::SRVRecord {
477 fn dns_record_kind() -> crate::crd::DNSRecordKind {
478 crate::crd::DNSRecordKind::SRV
479 }
480
481 fn spec_name(&self) -> &str {
482 &self.spec.name
483 }
484
485 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
486 self.status.as_ref()
487 }
488}
489
490impl DiscoverableRecord for crate::crd::CAARecord {
491 fn dns_record_kind() -> crate::crd::DNSRecordKind {
492 crate::crd::DNSRecordKind::CAA
493 }
494
495 fn spec_name(&self) -> &str {
496 &self.spec.name
497 }
498
499 fn record_status(&self) -> Option<&crate::crd::RecordStatus> {
500 self.status.as_ref()
501 }
502}
503
504async fn discover_records_generic<T>(
528 client: &Client,
529 namespace: &str,
530 selector: &crate::crd::LabelSelector,
531 _zone_name: &str,
532) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>>
533where
534 T: DiscoverableRecord,
535{
536 use std::collections::BTreeMap;
537
538 let api: kube::Api<T> = kube::Api::namespaced(client.clone(), namespace);
539 let records = list_all_paginated(&api, kube::api::ListParams::default()).await?;
540
541 let mut record_refs = Vec::new();
542 for record in records {
543 let labels: BTreeMap<String, String> = record.meta().labels.clone().unwrap_or_default();
544
545 if !selector.matches(&labels) {
546 continue;
547 }
548
549 debug!(
550 "Discovered {} record {}/{}",
551 T::dns_record_kind().as_str(),
552 namespace,
553 record.name_any()
554 );
555
556 let last_reconciled_at = record
558 .record_status()
559 .and_then(|s| s.last_updated.as_ref())
560 .and_then(|ts| {
561 ts.parse::<k8s_openapi::jiff::Timestamp>()
563 .ok()
564 .map(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time)
565 });
566
567 record_refs.push(crate::crd::RecordReferenceWithTimestamp {
568 api_version: "bindy.firestoned.io/v1beta1".to_string(),
569 kind: T::dns_record_kind().as_str().to_string(),
570 name: record.name_any(),
571 namespace: namespace.to_string(),
572 record_name: Some(record.spec_name().to_string()),
573 last_reconciled_at,
574 });
575 }
576
577 Ok(record_refs)
578}
579
580async fn discover_a_records(
582 client: &Client,
583 namespace: &str,
584 selector: &crate::crd::LabelSelector,
585 zone_name: &str,
586) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
587 discover_records_generic::<crate::crd::ARecord>(client, namespace, selector, zone_name).await
588}
589
590async fn discover_aaaa_records(
592 client: &Client,
593 namespace: &str,
594 selector: &crate::crd::LabelSelector,
595 zone_name: &str,
596) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
597 discover_records_generic::<crate::crd::AAAARecord>(client, namespace, selector, zone_name).await
598}
599
600async fn discover_txt_records(
602 client: &Client,
603 namespace: &str,
604 selector: &crate::crd::LabelSelector,
605 zone_name: &str,
606) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
607 discover_records_generic::<crate::crd::TXTRecord>(client, namespace, selector, zone_name).await
608}
609
610async fn discover_cname_records(
612 client: &Client,
613 namespace: &str,
614 selector: &crate::crd::LabelSelector,
615 zone_name: &str,
616) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
617 discover_records_generic::<crate::crd::CNAMERecord>(client, namespace, selector, zone_name)
618 .await
619}
620
621async fn discover_mx_records(
623 client: &Client,
624 namespace: &str,
625 selector: &crate::crd::LabelSelector,
626 zone_name: &str,
627) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
628 discover_records_generic::<crate::crd::MXRecord>(client, namespace, selector, zone_name).await
629}
630
631async fn discover_ns_records(
633 client: &Client,
634 namespace: &str,
635 selector: &crate::crd::LabelSelector,
636 zone_name: &str,
637) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
638 discover_records_generic::<crate::crd::NSRecord>(client, namespace, selector, zone_name).await
639}
640
641async fn discover_srv_records(
643 client: &Client,
644 namespace: &str,
645 selector: &crate::crd::LabelSelector,
646 zone_name: &str,
647) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
648 discover_records_generic::<crate::crd::SRVRecord>(client, namespace, selector, zone_name).await
649}
650
651async fn discover_caa_records(
653 client: &Client,
654 namespace: &str,
655 selector: &crate::crd::LabelSelector,
656 zone_name: &str,
657) -> Result<Vec<crate::crd::RecordReferenceWithTimestamp>> {
658 discover_records_generic::<crate::crd::CAARecord>(client, namespace, selector, zone_name).await
659}
660
661pub async fn check_all_records_ready(
679 client: &Client,
680 namespace: &str,
681 record_refs: &[crate::crd::RecordReferenceWithTimestamp],
682) -> Result<bool> {
683 use crate::crd::{
684 AAAARecord, ARecord, CAARecord, CNAMERecord, DNSRecordKind, MXRecord, NSRecord, SRVRecord,
685 TXTRecord,
686 };
687
688 for record_ref in record_refs {
689 let kind = DNSRecordKind::from(record_ref.kind.as_str());
690 let is_ready = match kind {
691 DNSRecordKind::A => {
692 let api: Api<ARecord> = Api::namespaced(client.clone(), namespace);
693 check_record_ready(&api, &record_ref.name).await?
694 }
695 DNSRecordKind::AAAA => {
696 let api: Api<AAAARecord> = Api::namespaced(client.clone(), namespace);
697 check_record_ready(&api, &record_ref.name).await?
698 }
699 DNSRecordKind::TXT => {
700 let api: Api<TXTRecord> = Api::namespaced(client.clone(), namespace);
701 check_record_ready(&api, &record_ref.name).await?
702 }
703 DNSRecordKind::CNAME => {
704 let api: Api<CNAMERecord> = Api::namespaced(client.clone(), namespace);
705 check_record_ready(&api, &record_ref.name).await?
706 }
707 DNSRecordKind::MX => {
708 let api: Api<MXRecord> = Api::namespaced(client.clone(), namespace);
709 check_record_ready(&api, &record_ref.name).await?
710 }
711 DNSRecordKind::NS => {
712 let api: Api<NSRecord> = Api::namespaced(client.clone(), namespace);
713 check_record_ready(&api, &record_ref.name).await?
714 }
715 DNSRecordKind::SRV => {
716 let api: Api<SRVRecord> = Api::namespaced(client.clone(), namespace);
717 check_record_ready(&api, &record_ref.name).await?
718 }
719 DNSRecordKind::CAA => {
720 let api: Api<CAARecord> = Api::namespaced(client.clone(), namespace);
721 check_record_ready(&api, &record_ref.name).await?
722 }
723 };
724
725 if !is_ready {
726 debug!(
727 "Record {}/{} (kind: {}) is not ready yet",
728 namespace, record_ref.name, record_ref.kind
729 );
730 return Ok(false);
731 }
732 }
733
734 Ok(true)
735}
736
737async fn check_record_ready<T>(api: &Api<T>, name: &str) -> Result<bool>
739where
740 T: kube::Resource<DynamicType = ()>
741 + Clone
742 + serde::de::DeserializeOwned
743 + serde::Serialize
744 + std::fmt::Debug
745 + Send
746 + Sync,
747 <T as kube::Resource>::DynamicType: Default,
748{
749 let record = match api.get(name).await {
750 Ok(r) => r,
751 Err(e) => {
752 warn!("Failed to get record {}: {}", name, e);
753 return Ok(false);
754 }
755 };
756
757 let record_json = serde_json::to_value(&record)?;
759 let status = record_json.get("status");
760
761 if let Some(status_obj) = status {
762 if let Some(conditions) = status_obj.get("conditions").and_then(|c| c.as_array()) {
763 for condition in conditions {
764 if let (Some(type_val), Some(status_val)) = (
765 condition.get("type").and_then(|t| t.as_str()),
766 condition.get("status").and_then(|s| s.as_str()),
767 ) {
768 if type_val == "Ready" && status_val == "True" {
769 return Ok(true);
770 }
771 }
772 }
773 }
774 }
775
776 Ok(false)
777}
778
779pub async fn find_zones_selecting_record(
801 client: &Client,
802 record_namespace: &str,
803 record_kind: &str,
804 record_name: &str,
805) -> Result<Vec<(String, String)>> {
806 let api: Api<DNSZone> = Api::namespaced(client.clone(), record_namespace);
807 let zones = list_all_paginated(&api, ListParams::default()).await?;
808
809 let mut selecting_zones = vec![];
810
811 for zone in zones {
812 let Some(ref status) = zone.status else {
813 continue;
814 };
815
816 let is_selected = status
818 .records
819 .iter()
820 .any(|r| r.kind == record_kind && r.name == record_name);
821
822 if is_selected {
823 let zone_name = zone.name_any();
824 let zone_namespace = zone.namespace().unwrap_or_default();
825 selecting_zones.push((zone_name, zone_namespace));
826 }
827 }
828
829 Ok(selecting_zones)
830}
831pub async fn trigger_record_reconciliation(
852 client: &Client,
853 namespace: &str,
854 zone_name: &str,
855) -> Result<()> {
856 use crate::crd::{
857 AAAARecord, ARecord, CAARecord, CNAMERecord, MXRecord, NSRecord, SRVRecord, TXTRecord,
858 };
859
860 debug!(
861 "Triggering record reconciliation for zone {} in namespace {}",
862 zone_name, namespace
863 );
864
865 macro_rules! count_records {
869 ($record_type:ty, $type_name:expr) => {{
870 let api: Api<$record_type> = Api::namespaced(client.clone(), namespace);
871 let lp = ListParams::default();
872
873 match api.list(&lp).await {
874 Ok(records) => {
875 let matching_count = records
876 .items
877 .iter()
878 .filter(|r| {
879 r.status
881 .as_ref()
882 .and_then(|s| s.zone_ref.as_ref())
883 .map(|zr| zr.zone_name == zone_name)
884 .unwrap_or(false)
885 })
886 .count();
887
888 debug!(
889 "Found {} {} record(s) for zone {} (event-driven watches will trigger reconciliation)",
890 matching_count,
891 $type_name,
892 zone_name
893 );
894 }
895 Err(e) => {
896 warn!(
897 "Failed to list {} records in namespace {}: {}",
898 $type_name, namespace, e
899 );
900 }
901 }
902 }};
903 }
904
905 count_records!(ARecord, "A");
907 count_records!(AAAARecord, "AAAA");
908 count_records!(TXTRecord, "TXT");
909 count_records!(CNAMERecord, "CNAME");
910 count_records!(MXRecord, "MX");
911 count_records!(NSRecord, "NS");
912 count_records!(SRVRecord, "SRV");
913 count_records!(CAARecord, "CAA");
914
915 Ok(())
916}
917
918pub async fn discover_and_update_records(
940 client: &kube::Client,
941 dnszone: &crate::crd::DNSZone,
942 status_updater: &mut crate::reconcilers::status::DNSZoneStatusUpdater,
943) -> Result<(Vec<crate::crd::RecordReferenceWithTimestamp>, usize)> {
944 let spec = &dnszone.spec;
945
946 status_updater.set_condition(
948 "Progressing",
949 "True",
950 "RecordsDiscovering",
951 "Discovering DNS records via label selectors",
952 );
953
954 let record_refs = match reconcile_zone_records(client.clone(), dnszone.clone()).await {
956 Ok(refs) => {
957 info!(
958 "Discovered {} DNS record(s) for zone {} via label selectors",
959 refs.len(),
960 spec.zone_name
961 );
962 refs
963 }
964 Err(e) => {
965 warn!(
967 "Failed to discover DNS records for zone {}: {}. Zone is configured but record discovery failed.",
968 spec.zone_name, e
969 );
970 status_updater.set_condition(
971 "Degraded",
972 "True",
973 "RecordDiscoveryFailed",
974 &format!("Zone configured but record discovery failed: {e}"),
975 );
976 vec![]
977 }
978 };
979
980 let records_count = record_refs.len();
981
982 status_updater.set_records(&record_refs);
984
985 Ok((record_refs, records_count))
986}
987
988#[cfg(test)]
989#[path = "discovery_tests.rs"]
990mod discovery_tests;