1use crate::crd::{ARecord, ARecordSpec, DNSZone};
25use anyhow::{anyhow, Result};
26use k8s_openapi::api::core::v1::{Secret, Service};
27use kube::api::{DeleteParams, ListParams, Patch, PatchParams};
28use kube::config::{KubeConfigOptions, Kubeconfig};
29
30#[derive(Debug, thiserror::Error)]
33#[error(transparent)]
34pub struct ScoutError(#[from] anyhow::Error);
35use futures::StreamExt;
36use k8s_openapi::api::networking::v1::Ingress;
37use kube::{
38 runtime::{
39 controller::Action, reflector, watcher, watcher::Config as WatcherConfig, Controller,
40 },
41 Api, Client, Error as KubeError, ResourceExt,
42};
43use std::{collections::BTreeMap, sync::Arc, time::Duration};
44use tracing::{debug, error, info, warn};
45
46#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
55#[serde(rename_all = "camelCase")]
56pub struct HTTPRouteSpec {
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub hostnames: Option<Vec<String>>,
60}
61
62#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct HTTPRoute {
65 #[serde(rename = "apiVersion")]
66 pub api_version: String,
67 pub kind: String,
68 pub metadata: kube::api::ObjectMeta,
69 #[serde(default)]
70 pub spec: Option<HTTPRouteSpec>,
71}
72
73#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
75#[serde(rename_all = "camelCase")]
76pub struct TLSRouteSpec {
77 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub hostnames: Option<Vec<String>>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub rules: Option<Vec<serde_json::Value>>,
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87pub struct TLSRoute {
88 #[serde(rename = "apiVersion")]
89 pub api_version: String,
90 pub kind: String,
91 pub metadata: kube::api::ObjectMeta,
92 #[serde(default)]
93 pub spec: Option<TLSRouteSpec>,
94}
95
96impl k8s_openapi::Metadata for HTTPRoute {
98 type Ty = kube::api::ObjectMeta;
99 fn metadata(&self) -> &kube::api::ObjectMeta {
100 &self.metadata
101 }
102 fn metadata_mut(&mut self) -> &mut kube::api::ObjectMeta {
103 &mut self.metadata
104 }
105}
106
107impl k8s_openapi::Metadata for TLSRoute {
108 type Ty = kube::api::ObjectMeta;
109 fn metadata(&self) -> &kube::api::ObjectMeta {
110 &self.metadata
111 }
112 fn metadata_mut(&mut self) -> &mut kube::api::ObjectMeta {
113 &mut self.metadata
114 }
115}
116
117impl k8s_openapi::Resource for HTTPRoute {
119 const API_VERSION: &'static str = "gateway.networking.k8s.io/v1";
120 const GROUP: &'static str = "gateway.networking.k8s.io";
121 const KIND: &'static str = "HTTPRoute";
122 const VERSION: &'static str = "v1";
123 const URL_PATH_SEGMENT: &'static str = "httproutes";
124 type Scope = k8s_openapi::NamespaceResourceScope;
125}
126
127impl k8s_openapi::Resource for TLSRoute {
128 const API_VERSION: &'static str = "gateway.networking.k8s.io/v1alpha2";
129 const GROUP: &'static str = "gateway.networking.k8s.io";
130 const KIND: &'static str = "TLSRoute";
131 const VERSION: &'static str = "v1alpha2";
132 const URL_PATH_SEGMENT: &'static str = "tlsroutes";
133 type Scope = k8s_openapi::NamespaceResourceScope;
134}
135
136pub const ANNOTATION_RECORD_KIND: &str = "bindy.firestoned.io/recordKind";
143
144pub const RECORD_KIND_ARECORD: &str = "ARecord";
146
147pub const ANNOTATION_ZONE: &str = "bindy.firestoned.io/zone";
149
150pub const ANNOTATION_SCOUT_ENABLED: &str = "bindy.firestoned.io/scout-enabled";
154
155pub const ANNOTATION_IP: &str = "bindy.firestoned.io/ip";
163
164pub const ANNOTATION_TTL: &str = "bindy.firestoned.io/ttl";
167
168pub const ANNOTATION_RECORD_NAME: &str = "bindy.firestoned.io/record-name";
177
178pub const FINALIZER_SCOUT: &str = "bindy.firestoned.io/arecord-finalizer";
180
181pub const LABEL_MANAGED_BY: &str = "bindy.firestoned.io/managed-by";
183
184pub const LABEL_MANAGED_BY_SCOUT: &str = "scout";
186
187pub const LABEL_SOURCE_CLUSTER: &str = "bindy.firestoned.io/source-cluster";
189
190pub const LABEL_SOURCE_NAMESPACE: &str = "bindy.firestoned.io/source-namespace";
192
193pub const LABEL_SOURCE_NAME: &str = "bindy.firestoned.io/source-name";
196
197pub const LABEL_ZONE: &str = "bindy.firestoned.io/zone";
199
200pub const DEFAULT_SCOUT_NAMESPACE: &str = "bindy-system";
202
203const MAX_K8S_NAME_LEN: usize = 253;
205
206const ARECORD_NAME_PREFIX: &str = "scout";
208
209const SCOUT_ERROR_REQUEUE_SECS: u64 = 30;
211
212const REFLECTOR_ERROR_BACKOFF_SECS: u64 = 5;
216
217pub struct ScoutContext {
223 pub client: Client,
226 pub remote_client: Client,
230 pub target_namespace: String,
232 pub cluster_name: String,
234 pub excluded_namespaces: Vec<String>,
236 pub default_ips: Vec<String>,
240 pub default_zone: Option<String>,
243 pub zone_store: reflector::Store<DNSZone>,
246}
247
248pub fn is_arecord_enabled(annotations: &BTreeMap<String, String>) -> bool {
257 annotations
258 .get(ANNOTATION_RECORD_KIND)
259 .map(|v| v == RECORD_KIND_ARECORD)
260 .unwrap_or(false)
261}
262
263pub fn is_scout_opted_in(annotations: &BTreeMap<String, String>) -> bool {
273 annotations
274 .get(ANNOTATION_SCOUT_ENABLED)
275 .map(|v| v == "true")
276 .unwrap_or(false)
277 || is_arecord_enabled(annotations)
278}
279
280pub fn resolve_zone(
287 annotations: &BTreeMap<String, String>,
288 default_zone: Option<&str>,
289) -> Option<String> {
290 get_zone_annotation(annotations).or_else(|| default_zone.map(ToString::to_string))
291}
292
293pub fn get_zone_annotation(annotations: &BTreeMap<String, String>) -> Option<String> {
297 annotations
298 .get(ANNOTATION_ZONE)
299 .filter(|v| !v.is_empty())
300 .cloned()
301}
302
303pub fn derive_record_name(host: &str, zone: &str) -> Result<String> {
315 let host = host.trim_end_matches('.');
317
318 if host == zone {
320 return Ok("@".to_string());
321 }
322
323 let zone_suffix = format!(".{zone}");
324 if !host.ends_with(&zone_suffix) {
325 return Err(anyhow!(
326 "host \"{host}\" does not belong to zone \"{zone}\""
327 ));
328 }
329
330 let record_name = &host[..host.len() - zone_suffix.len()];
331 Ok(record_name.to_string())
332}
333
334pub fn get_record_name_annotation(annotations: &BTreeMap<String, String>) -> Option<String> {
339 annotations
340 .get(ANNOTATION_RECORD_NAME)
341 .map(|v| v.trim().to_string())
342 .filter(|v| !v.is_empty())
343}
344
345pub fn resolve_record_name(
358 annotations: &BTreeMap<String, String>,
359 host: &str,
360 zone: &str,
361) -> Result<String> {
362 if let Some(override_name) = get_record_name_annotation(annotations) {
363 return Ok(override_name);
364 }
365 derive_record_name(host, zone)
366}
367
368pub fn resolve_ips_from_annotation(annotations: &BTreeMap<String, String>) -> Option<Vec<String>> {
377 let raw = annotations.get(ANNOTATION_IP)?;
378 let ips: Vec<String> = raw
379 .split(',')
380 .map(str::trim)
381 .filter(|s| !s.is_empty())
382 .map(ToString::to_string)
383 .collect();
384 if ips.is_empty() {
385 None
386 } else {
387 Some(ips)
388 }
389}
390
391pub fn resolve_ips(
399 annotations: &BTreeMap<String, String>,
400 default_ips: &[String],
401 ingress: &Ingress,
402) -> Option<Vec<String>> {
403 if let Some(ips) = resolve_ips_from_annotation(annotations) {
404 return Some(ips);
405 }
406 if !default_ips.is_empty() {
407 return Some(default_ips.to_vec());
408 }
409 resolve_ip_from_lb_status(ingress).map(|ip| vec![ip])
410}
411
412pub fn resolve_ip_from_lb_status(ingress: &Ingress) -> Option<String> {
417 let lb_ingresses = ingress
418 .status
419 .as_ref()?
420 .load_balancer
421 .as_ref()?
422 .ingress
423 .as_ref()?;
424
425 for lb in lb_ingresses {
426 if let Some(ip) = &lb.ip {
427 if !ip.is_empty() {
428 return Some(ip.clone());
429 }
430 }
431 if lb.hostname.is_some() {
432 warn!(
433 ingress = %ingress.name_any(),
434 "Ingress LB status has hostname but no IP — A record requires an IP address; skipping"
435 );
436 }
437 }
438 None
439}
440
441pub fn arecord_cr_name(
449 cluster: &str,
450 namespace: &str,
451 ingress_name: &str,
452 host_index: usize,
453) -> String {
454 let raw = format!("{ARECORD_NAME_PREFIX}-{cluster}-{namespace}-{ingress_name}-{host_index}");
455 let sanitized = sanitize_k8s_name(&raw);
456 sanitized[..sanitized.len().min(MAX_K8S_NAME_LEN)].to_string()
457}
458
459fn sanitize_k8s_name(s: &str) -> String {
466 let lower = s.to_lowercase();
467 let mut result = String::with_capacity(lower.len());
468 let mut last_was_hyphen = false;
469
470 for ch in lower.chars() {
471 if ch.is_ascii_alphanumeric() {
472 result.push(ch);
473 last_was_hyphen = false;
474 } else {
475 if !last_was_hyphen {
477 result.push('-');
478 last_was_hyphen = true;
479 }
480 }
481 }
482
483 let trimmed = result.trim_end_matches('-');
485 trimmed.trim_start_matches('-').to_string()
487}
488
489pub fn has_finalizer(ingress: &Ingress) -> bool {
491 ingress
492 .metadata
493 .finalizers
494 .as_ref()
495 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
496 .unwrap_or(false)
497}
498
499pub fn is_being_deleted(ingress: &Ingress) -> bool {
501 ingress.metadata.deletion_timestamp.is_some()
502}
503
504pub fn arecord_label_selector(cluster: &str, namespace: &str, ingress_name: &str) -> String {
510 format!(
511 "{}={},{cluster_key}={cluster},{ns_key}={namespace},{name_key}={ingress_name}",
512 LABEL_MANAGED_BY,
513 LABEL_MANAGED_BY_SCOUT,
514 cluster_key = LABEL_SOURCE_CLUSTER,
515 ns_key = LABEL_SOURCE_NAMESPACE,
516 name_key = LABEL_SOURCE_NAME,
517 )
518}
519
520pub fn stale_arecord_label_selector(
527 current_cluster: &str,
528 namespace: &str,
529 ingress_name: &str,
530) -> String {
531 format!(
532 "{}={},{cluster_key}!={current_cluster},{ns_key}={namespace},{name_key}={ingress_name}",
533 LABEL_MANAGED_BY,
534 LABEL_MANAGED_BY_SCOUT,
535 cluster_key = LABEL_SOURCE_CLUSTER,
536 ns_key = LABEL_SOURCE_NAMESPACE,
537 name_key = LABEL_SOURCE_NAME,
538 )
539}
540
541pub struct ARecordParams<'a> {
547 pub name: &'a str,
549 pub target_namespace: &'a str,
551 pub record_name: &'a str,
553 pub ips: &'a [String],
555 pub ttl: Option<i32>,
557 pub cluster_name: &'a str,
559 pub ingress_namespace: &'a str,
561 pub ingress_name: &'a str,
563 pub zone: &'a str,
565}
566
567pub fn build_arecord(params: ARecordParams<'_>) -> ARecord {
569 let mut labels = BTreeMap::new();
570 labels.insert(
571 LABEL_MANAGED_BY.to_string(),
572 LABEL_MANAGED_BY_SCOUT.to_string(),
573 );
574 labels.insert(
575 LABEL_SOURCE_CLUSTER.to_string(),
576 params.cluster_name.to_string(),
577 );
578 labels.insert(
579 LABEL_SOURCE_NAMESPACE.to_string(),
580 params.ingress_namespace.to_string(),
581 );
582 labels.insert(
583 LABEL_SOURCE_NAME.to_string(),
584 params.ingress_name.to_string(),
585 );
586 labels.insert(LABEL_ZONE.to_string(), params.zone.to_string());
587
588 let meta = kube::api::ObjectMeta {
589 name: Some(params.name.to_string()),
590 namespace: Some(params.target_namespace.to_string()),
591 labels: Some(labels),
592 ..Default::default()
593 };
594
595 ARecord {
596 metadata: meta,
597 spec: ARecordSpec {
598 name: params.record_name.to_string(),
599 ipv4_addresses: params.ips.to_vec(),
600 ttl: params.ttl,
601 },
602 status: None,
603 }
604}
605
606pub fn is_loadbalancer_service(svc: &Service) -> bool {
615 svc.spec
616 .as_ref()
617 .and_then(|s| s.type_.as_deref())
618 .map(|t| t == "LoadBalancer")
619 .unwrap_or(false)
620}
621
622pub fn resolve_ip_from_service_lb_status(svc: &Service) -> Option<String> {
628 svc.status
629 .as_ref()?
630 .load_balancer
631 .as_ref()?
632 .ingress
633 .as_ref()?
634 .iter()
635 .find_map(|entry| entry.ip.clone().filter(|ip| !ip.is_empty()))
636}
637
638pub fn service_arecord_cr_name(cluster: &str, namespace: &str, service_name: &str) -> String {
645 let raw = format!("{ARECORD_NAME_PREFIX}-{cluster}-{namespace}-{service_name}");
646 let sanitized = sanitize_k8s_name(&raw);
647 sanitized[..sanitized.len().min(MAX_K8S_NAME_LEN)].to_string()
648}
649
650pub fn service_arecord_label_selector(
653 cluster: &str,
654 namespace: &str,
655 service_name: &str,
656) -> String {
657 format!(
658 "{}={},{cluster_key}={cluster},{ns_key}={namespace},{name_key}={service_name}",
659 LABEL_MANAGED_BY,
660 LABEL_MANAGED_BY_SCOUT,
661 cluster_key = LABEL_SOURCE_CLUSTER,
662 ns_key = LABEL_SOURCE_NAMESPACE,
663 name_key = LABEL_SOURCE_NAME,
664 )
665}
666
667pub struct ServiceARecordParams<'a> {
669 pub name: &'a str,
671 pub target_namespace: &'a str,
673 pub record_name: &'a str,
675 pub ips: &'a [String],
677 pub ttl: Option<i32>,
679 pub cluster_name: &'a str,
681 pub service_namespace: &'a str,
683 pub service_name: &'a str,
685 pub zone: &'a str,
687}
688
689pub fn build_service_arecord(params: ServiceARecordParams<'_>) -> ARecord {
691 let mut labels = BTreeMap::new();
692 labels.insert(
693 LABEL_MANAGED_BY.to_string(),
694 LABEL_MANAGED_BY_SCOUT.to_string(),
695 );
696 labels.insert(
697 LABEL_SOURCE_CLUSTER.to_string(),
698 params.cluster_name.to_string(),
699 );
700 labels.insert(
701 LABEL_SOURCE_NAMESPACE.to_string(),
702 params.service_namespace.to_string(),
703 );
704 labels.insert(
705 LABEL_SOURCE_NAME.to_string(),
706 params.service_name.to_string(),
707 );
708 labels.insert(LABEL_ZONE.to_string(), params.zone.to_string());
709
710 let meta = kube::api::ObjectMeta {
711 name: Some(params.name.to_string()),
712 namespace: Some(params.target_namespace.to_string()),
713 labels: Some(labels),
714 ..Default::default()
715 };
716
717 ARecord {
718 metadata: meta,
719 spec: ARecordSpec {
720 name: params.record_name.to_string(),
721 ipv4_addresses: params.ips.to_vec(),
722 ttl: params.ttl,
723 },
724 status: None,
725 }
726}
727
728pub fn httproute_arecord_cr_name(
740 cluster: &str,
741 namespace: &str,
742 route_name: &str,
743 hostname_index: usize,
744) -> String {
745 let raw = format!("{ARECORD_NAME_PREFIX}-{cluster}-{namespace}-{route_name}-{hostname_index}");
746 let sanitized = sanitize_k8s_name(&raw);
747 sanitized[..sanitized.len().min(MAX_K8S_NAME_LEN)].to_string()
748}
749
750pub fn tlsroute_arecord_cr_name(
757 cluster: &str,
758 namespace: &str,
759 route_name: &str,
760 hostname_index: usize,
761) -> String {
762 let raw = format!("{ARECORD_NAME_PREFIX}-{cluster}-{namespace}-{route_name}-{hostname_index}");
763 let sanitized = sanitize_k8s_name(&raw);
764 sanitized[..sanitized.len().min(MAX_K8S_NAME_LEN)].to_string()
765}
766
767pub fn httproute_arecord_label_selector(
770 cluster: &str,
771 namespace: &str,
772 route_name: &str,
773) -> String {
774 format!(
775 "{}={},{cluster_key}={cluster},{ns_key}={namespace},{name_key}={route_name}",
776 LABEL_MANAGED_BY,
777 LABEL_MANAGED_BY_SCOUT,
778 cluster_key = LABEL_SOURCE_CLUSTER,
779 ns_key = LABEL_SOURCE_NAMESPACE,
780 name_key = LABEL_SOURCE_NAME,
781 )
782}
783
784pub fn tlsroute_arecord_label_selector(cluster: &str, namespace: &str, route_name: &str) -> String {
787 format!(
788 "{}={},{cluster_key}={cluster},{ns_key}={namespace},{name_key}={route_name}",
789 LABEL_MANAGED_BY,
790 LABEL_MANAGED_BY_SCOUT,
791 cluster_key = LABEL_SOURCE_CLUSTER,
792 ns_key = LABEL_SOURCE_NAMESPACE,
793 name_key = LABEL_SOURCE_NAME,
794 )
795}
796
797pub fn stale_httproute_arecord_label_selector(
803 current_cluster: &str,
804 namespace: &str,
805 route_name: &str,
806) -> String {
807 format!(
808 "{}={},{cluster_key}!={current_cluster},{ns_key}={namespace},{name_key}={route_name}",
809 LABEL_MANAGED_BY,
810 LABEL_MANAGED_BY_SCOUT,
811 cluster_key = LABEL_SOURCE_CLUSTER,
812 ns_key = LABEL_SOURCE_NAMESPACE,
813 name_key = LABEL_SOURCE_NAME,
814 )
815}
816
817pub fn stale_tlsroute_arecord_label_selector(
820 current_cluster: &str,
821 namespace: &str,
822 route_name: &str,
823) -> String {
824 format!(
825 "{}={},{cluster_key}!={current_cluster},{ns_key}={namespace},{name_key}={route_name}",
826 LABEL_MANAGED_BY,
827 LABEL_MANAGED_BY_SCOUT,
828 cluster_key = LABEL_SOURCE_CLUSTER,
829 ns_key = LABEL_SOURCE_NAMESPACE,
830 name_key = LABEL_SOURCE_NAME,
831 )
832}
833
834pub struct HTTPRouteARecordParams<'a> {
836 pub name: &'a str,
838 pub target_namespace: &'a str,
840 pub record_name: &'a str,
842 pub ips: &'a [String],
844 pub ttl: Option<i32>,
846 pub cluster_name: &'a str,
848 pub route_namespace: &'a str,
850 pub route_name: &'a str,
852 pub zone: &'a str,
854}
855
856pub fn build_httproute_arecord(params: HTTPRouteARecordParams<'_>) -> ARecord {
858 let mut labels = BTreeMap::new();
859 labels.insert(
860 LABEL_MANAGED_BY.to_string(),
861 LABEL_MANAGED_BY_SCOUT.to_string(),
862 );
863 labels.insert(
864 LABEL_SOURCE_CLUSTER.to_string(),
865 params.cluster_name.to_string(),
866 );
867 labels.insert(
868 LABEL_SOURCE_NAMESPACE.to_string(),
869 params.route_namespace.to_string(),
870 );
871 labels.insert(LABEL_SOURCE_NAME.to_string(), params.route_name.to_string());
872 labels.insert(LABEL_ZONE.to_string(), params.zone.to_string());
873
874 let meta = kube::api::ObjectMeta {
875 name: Some(params.name.to_string()),
876 namespace: Some(params.target_namespace.to_string()),
877 labels: Some(labels),
878 ..Default::default()
879 };
880
881 ARecord {
882 metadata: meta,
883 spec: ARecordSpec {
884 name: params.record_name.to_string(),
885 ipv4_addresses: params.ips.to_vec(),
886 ttl: params.ttl,
887 },
888 status: None,
889 }
890}
891
892pub struct TLSRouteARecordParams<'a> {
894 pub name: &'a str,
896 pub target_namespace: &'a str,
898 pub record_name: &'a str,
900 pub ips: &'a [String],
902 pub ttl: Option<i32>,
904 pub cluster_name: &'a str,
906 pub route_namespace: &'a str,
908 pub route_name: &'a str,
910 pub zone: &'a str,
912}
913
914pub fn build_tlsroute_arecord(params: TLSRouteARecordParams<'_>) -> ARecord {
916 let mut labels = BTreeMap::new();
917 labels.insert(
918 LABEL_MANAGED_BY.to_string(),
919 LABEL_MANAGED_BY_SCOUT.to_string(),
920 );
921 labels.insert(
922 LABEL_SOURCE_CLUSTER.to_string(),
923 params.cluster_name.to_string(),
924 );
925 labels.insert(
926 LABEL_SOURCE_NAMESPACE.to_string(),
927 params.route_namespace.to_string(),
928 );
929 labels.insert(LABEL_SOURCE_NAME.to_string(), params.route_name.to_string());
930 labels.insert(LABEL_ZONE.to_string(), params.zone.to_string());
931
932 let meta = kube::api::ObjectMeta {
933 name: Some(params.name.to_string()),
934 namespace: Some(params.target_namespace.to_string()),
935 labels: Some(labels),
936 ..Default::default()
937 };
938
939 ARecord {
940 metadata: meta,
941 spec: ARecordSpec {
942 name: params.record_name.to_string(),
943 ipv4_addresses: params.ips.to_vec(),
944 ttl: params.ttl,
945 },
946 status: None,
947 }
948}
949
950async fn add_finalizer(client: &Client, ingress: &Ingress) -> Result<()> {
959 let namespace = ingress.namespace().unwrap_or_default();
960 let name = ingress.name_any();
961 let api: Api<Ingress> = Api::namespaced(client.clone(), &namespace);
962
963 let mut finalizers = ingress.metadata.finalizers.clone().unwrap_or_default();
964 if !finalizers.contains(&FINALIZER_SCOUT.to_string()) {
965 finalizers.push(FINALIZER_SCOUT.to_string());
966 }
967
968 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
969 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
970 .await?;
971 Ok(())
972}
973
974async fn remove_finalizer(client: &Client, ingress: &Ingress) -> Result<()> {
978 let namespace = ingress.namespace().unwrap_or_default();
979 let name = ingress.name_any();
980 let api: Api<Ingress> = Api::namespaced(client.clone(), &namespace);
981
982 let finalizers: Vec<String> = ingress
983 .metadata
984 .finalizers
985 .clone()
986 .unwrap_or_default()
987 .into_iter()
988 .filter(|f| f != FINALIZER_SCOUT)
989 .collect();
990
991 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
992 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
993 .await?;
994 Ok(())
995}
996
997async fn add_finalizer_to_service(client: &Client, svc: &Service) -> Result<()> {
999 let namespace = svc.namespace().unwrap_or_default();
1000 let name = svc.name_any();
1001 let api: Api<Service> = Api::namespaced(client.clone(), &namespace);
1002
1003 let mut finalizers = svc.metadata.finalizers.clone().unwrap_or_default();
1004 if !finalizers.contains(&FINALIZER_SCOUT.to_string()) {
1005 finalizers.push(FINALIZER_SCOUT.to_string());
1006 }
1007
1008 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1009 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1010 .await?;
1011 Ok(())
1012}
1013
1014async fn remove_finalizer_from_service(client: &Client, svc: &Service) -> Result<()> {
1016 let namespace = svc.namespace().unwrap_or_default();
1017 let name = svc.name_any();
1018 let api: Api<Service> = Api::namespaced(client.clone(), &namespace);
1019
1020 let finalizers: Vec<String> = svc
1021 .metadata
1022 .finalizers
1023 .clone()
1024 .unwrap_or_default()
1025 .into_iter()
1026 .filter(|f| f != FINALIZER_SCOUT)
1027 .collect();
1028
1029 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1030 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1031 .await?;
1032 Ok(())
1033}
1034
1035async fn add_finalizer_to_httproute(client: &Client, route: &HTTPRoute) -> Result<()> {
1037 let namespace = route.namespace().unwrap_or_default();
1038 let name = route.name_any();
1039 let api: Api<HTTPRoute> = Api::namespaced(client.clone(), &namespace);
1040
1041 let mut finalizers = route.metadata.finalizers.clone().unwrap_or_default();
1042 if !finalizers.contains(&FINALIZER_SCOUT.to_string()) {
1043 finalizers.push(FINALIZER_SCOUT.to_string());
1044 }
1045
1046 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1047 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1048 .await?;
1049 Ok(())
1050}
1051
1052async fn remove_finalizer_from_httproute(client: &Client, route: &HTTPRoute) -> Result<()> {
1054 let namespace = route.namespace().unwrap_or_default();
1055 let name = route.name_any();
1056 let api: Api<HTTPRoute> = Api::namespaced(client.clone(), &namespace);
1057
1058 let finalizers: Vec<String> = route
1059 .metadata
1060 .finalizers
1061 .clone()
1062 .unwrap_or_default()
1063 .into_iter()
1064 .filter(|f| f != FINALIZER_SCOUT)
1065 .collect();
1066
1067 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1068 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1069 .await?;
1070 Ok(())
1071}
1072
1073async fn add_finalizer_to_tlsroute(client: &Client, route: &TLSRoute) -> Result<()> {
1075 let namespace = route.namespace().unwrap_or_default();
1076 let name = route.name_any();
1077 let api: Api<TLSRoute> = Api::namespaced(client.clone(), &namespace);
1078
1079 let mut finalizers = route.metadata.finalizers.clone().unwrap_or_default();
1080 if !finalizers.contains(&FINALIZER_SCOUT.to_string()) {
1081 finalizers.push(FINALIZER_SCOUT.to_string());
1082 }
1083
1084 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1085 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1086 .await?;
1087 Ok(())
1088}
1089
1090async fn remove_finalizer_from_tlsroute(client: &Client, route: &TLSRoute) -> Result<()> {
1092 let namespace = route.namespace().unwrap_or_default();
1093 let name = route.name_any();
1094 let api: Api<TLSRoute> = Api::namespaced(client.clone(), &namespace);
1095
1096 let finalizers: Vec<String> = route
1097 .metadata
1098 .finalizers
1099 .clone()
1100 .unwrap_or_default()
1101 .into_iter()
1102 .filter(|f| f != FINALIZER_SCOUT)
1103 .collect();
1104
1105 let patch = serde_json::json!({ "metadata": { "finalizers": finalizers } });
1106 api.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
1107 .await?;
1108 Ok(())
1109}
1110
1111async fn delete_arecords_for_ingress(
1117 remote_client: &Client,
1118 target_namespace: &str,
1119 cluster: &str,
1120 ingress_namespace: &str,
1121 ingress_name: &str,
1122) -> Result<()> {
1123 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1124 let selector = arecord_label_selector(cluster, ingress_namespace, ingress_name);
1125 let lp = ListParams::default().labels(&selector);
1126
1127 let arecords = api.list(&lp).await?;
1128 for ar in arecords.items {
1129 let ar_name = ar.name_any();
1130 api.delete(&ar_name, &DeleteParams::default()).await?;
1131 info!(
1132 arecord = %ar_name,
1133 ingress = %ingress_name,
1134 ns = %ingress_namespace,
1135 "Deleted ARecord during Ingress cleanup"
1136 );
1137 }
1138 Ok(())
1139}
1140
1141async fn delete_stale_cluster_arecords(
1149 remote_client: &Client,
1150 target_namespace: &str,
1151 current_cluster: &str,
1152 ingress_namespace: &str,
1153 ingress_name: &str,
1154) -> Result<()> {
1155 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1156 let selector = stale_arecord_label_selector(current_cluster, ingress_namespace, ingress_name);
1157 let lp = ListParams::default().labels(&selector);
1158
1159 let arecords = api.list(&lp).await?;
1160 for ar in arecords.items {
1161 let ar_name = ar.name_any();
1162 let old_cluster = ar
1163 .metadata
1164 .labels
1165 .as_ref()
1166 .and_then(|l| l.get(LABEL_SOURCE_CLUSTER))
1167 .map(String::as_str)
1168 .unwrap_or("unknown");
1169 api.delete(&ar_name, &DeleteParams::default()).await?;
1170 info!(
1171 arecord = %ar_name,
1172 old_cluster = %old_cluster,
1173 new_cluster = %current_cluster,
1174 ingress = %ingress_name,
1175 ns = %ingress_namespace,
1176 "Deleted stale ARecord after cluster-name change"
1177 );
1178 }
1179 Ok(())
1180}
1181
1182async fn delete_arecords_for_service(
1186 remote_client: &Client,
1187 target_namespace: &str,
1188 cluster: &str,
1189 svc_namespace: &str,
1190 svc_name: &str,
1191) -> Result<()> {
1192 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1193 let selector = service_arecord_label_selector(cluster, svc_namespace, svc_name);
1194 let lp = ListParams::default().labels(&selector);
1195
1196 let arecords = api.list(&lp).await?;
1197 for ar in arecords.items {
1198 let ar_name = ar.name_any();
1199 api.delete(&ar_name, &DeleteParams::default()).await?;
1200 info!(
1201 arecord = %ar_name,
1202 service = %svc_name,
1203 ns = %svc_namespace,
1204 "Deleted ARecord during Service cleanup"
1205 );
1206 }
1207 Ok(())
1208}
1209
1210async fn delete_arecords_for_httproute(
1212 remote_client: &Client,
1213 target_namespace: &str,
1214 cluster: &str,
1215 route_namespace: &str,
1216 route_name: &str,
1217) -> Result<()> {
1218 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1219 let selector = httproute_arecord_label_selector(cluster, route_namespace, route_name);
1220 let lp = ListParams::default().labels(&selector);
1221
1222 let arecords = api.list(&lp).await?;
1223 for ar in arecords.items {
1224 let ar_name = ar.name_any();
1225 api.delete(&ar_name, &DeleteParams::default()).await?;
1226 info!(
1227 arecord = %ar_name,
1228 httproute = %route_name,
1229 ns = %route_namespace,
1230 "Deleted ARecord during HTTPRoute cleanup"
1231 );
1232 }
1233 Ok(())
1234}
1235
1236async fn delete_arecords_for_tlsroute(
1238 remote_client: &Client,
1239 target_namespace: &str,
1240 cluster: &str,
1241 route_namespace: &str,
1242 route_name: &str,
1243) -> Result<()> {
1244 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1245 let selector = tlsroute_arecord_label_selector(cluster, route_namespace, route_name);
1246 let lp = ListParams::default().labels(&selector);
1247
1248 let arecords = api.list(&lp).await?;
1249 for ar in arecords.items {
1250 let ar_name = ar.name_any();
1251 api.delete(&ar_name, &DeleteParams::default()).await?;
1252 info!(
1253 arecord = %ar_name,
1254 tlsroute = %route_name,
1255 ns = %route_namespace,
1256 "Deleted ARecord during TLSRoute cleanup"
1257 );
1258 }
1259 Ok(())
1260}
1261
1262async fn delete_stale_cluster_httproute_arecords(
1264 remote_client: &Client,
1265 target_namespace: &str,
1266 current_cluster: &str,
1267 route_namespace: &str,
1268 route_name: &str,
1269) -> Result<()> {
1270 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1271 let selector =
1272 stale_httproute_arecord_label_selector(current_cluster, route_namespace, route_name);
1273 let lp = ListParams::default().labels(&selector);
1274
1275 let arecords = api.list(&lp).await?;
1276 for ar in arecords.items {
1277 let ar_name = ar.name_any();
1278 api.delete(&ar_name, &DeleteParams::default()).await?;
1279 info!(
1280 arecord = %ar_name,
1281 httproute = %route_name,
1282 "Deleted stale HTTPRoute ARecord from previous cluster name"
1283 );
1284 }
1285 Ok(())
1286}
1287
1288async fn delete_stale_cluster_tlsroute_arecords(
1290 remote_client: &Client,
1291 target_namespace: &str,
1292 current_cluster: &str,
1293 route_namespace: &str,
1294 route_name: &str,
1295) -> Result<()> {
1296 let api: Api<ARecord> = Api::namespaced(remote_client.clone(), target_namespace);
1297 let selector =
1298 stale_tlsroute_arecord_label_selector(current_cluster, route_namespace, route_name);
1299 let lp = ListParams::default().labels(&selector);
1300
1301 let arecords = api.list(&lp).await?;
1302 for ar in arecords.items {
1303 let ar_name = ar.name_any();
1304 api.delete(&ar_name, &DeleteParams::default()).await?;
1305 info!(
1306 arecord = %ar_name,
1307 tlsroute = %route_name,
1308 "Deleted stale TLSRoute ARecord from previous cluster name"
1309 );
1310 }
1311 Ok(())
1312}
1313
1314async fn reconcile(ingress: Arc<Ingress>, ctx: Arc<ScoutContext>) -> Result<Action, ScoutError> {
1329 let name = ingress.name_any();
1330 let namespace = ingress.namespace().unwrap_or_default();
1331
1332 if ctx.excluded_namespaces.contains(&namespace) {
1334 debug!(ingress = %name, ns = %namespace, "Skipping excluded namespace");
1335 return Ok(Action::await_change());
1336 }
1337
1338 if is_being_deleted(&ingress) {
1340 if has_finalizer(&ingress) {
1341 info!(ingress = %name, ns = %namespace, "Ingress deleting — cleaning up ARecords");
1342 delete_arecords_for_ingress(
1343 &ctx.remote_client,
1344 &ctx.target_namespace,
1345 &ctx.cluster_name,
1346 &namespace,
1347 &name,
1348 )
1349 .await
1350 .map_err(ScoutError::from)?;
1351 delete_stale_cluster_arecords(
1352 &ctx.remote_client,
1353 &ctx.target_namespace,
1354 &ctx.cluster_name,
1355 &namespace,
1356 &name,
1357 )
1358 .await
1359 .map_err(ScoutError::from)?;
1360 remove_finalizer(&ctx.client, &ingress)
1361 .await
1362 .map_err(ScoutError::from)?;
1363 info!(ingress = %name, ns = %namespace, "Finalizer removed — Ingress deletion unblocked");
1364 }
1365 return Ok(Action::await_change());
1366 }
1367
1368 let annotations = ingress
1369 .metadata
1370 .annotations
1371 .as_ref()
1372 .cloned()
1373 .unwrap_or_default();
1374
1375 if !is_scout_opted_in(&annotations) {
1377 if has_finalizer(&ingress) {
1379 info!(ingress = %name, ns = %namespace, "Scout opt-in annotation removed — cleaning up ARecords and finalizer");
1380 delete_arecords_for_ingress(
1381 &ctx.remote_client,
1382 &ctx.target_namespace,
1383 &ctx.cluster_name,
1384 &namespace,
1385 &name,
1386 )
1387 .await
1388 .map_err(ScoutError::from)?;
1389 delete_stale_cluster_arecords(
1390 &ctx.remote_client,
1391 &ctx.target_namespace,
1392 &ctx.cluster_name,
1393 &namespace,
1394 &name,
1395 )
1396 .await
1397 .map_err(ScoutError::from)?;
1398 remove_finalizer(&ctx.client, &ingress)
1399 .await
1400 .map_err(ScoutError::from)?;
1401 }
1402 debug!(ingress = %name, ns = %namespace, "No arecord annotation — skipping");
1403 return Ok(Action::await_change());
1404 }
1405
1406 if !has_finalizer(&ingress) {
1410 add_finalizer(&ctx.client, &ingress)
1411 .await
1412 .map_err(ScoutError::from)?;
1413 debug!(ingress = %name, ns = %namespace, "Finalizer added — re-queuing for record creation");
1414 return Ok(Action::await_change());
1415 }
1416
1417 let zone = match resolve_zone(&annotations, ctx.default_zone.as_deref()) {
1419 Some(z) => z,
1420 None => {
1421 warn!(ingress = %name, ns = %namespace, "No DNS zone available (set bindy.firestoned.io/zone annotation or BINDY_SCOUT_DEFAULT_ZONE) — skipping");
1422 return Ok(Action::requeue(Duration::from_secs(
1423 SCOUT_ERROR_REQUEUE_SECS,
1424 )));
1425 }
1426 };
1427
1428 let zone_exists = ctx
1430 .zone_store
1431 .state()
1432 .iter()
1433 .any(|z| z.spec.zone_name == zone);
1434 if !zone_exists {
1435 warn!(
1436 ingress = %name,
1437 ns = %namespace,
1438 zone = %zone,
1439 "Zone not found in DNSZone store — skipping until zone appears"
1440 );
1441 return Ok(Action::requeue(Duration::from_secs(
1442 SCOUT_ERROR_REQUEUE_SECS,
1443 )));
1444 }
1445
1446 let ips = match resolve_ips(&annotations, &ctx.default_ips, &ingress) {
1448 Some(ips) => ips,
1449 None => {
1450 warn!(ingress = %name, ns = %namespace, "No IP available (no annotation override, no default IPs, no LB status IP) — requeuing");
1451 return Ok(Action::requeue(Duration::from_secs(
1452 SCOUT_ERROR_REQUEUE_SECS,
1453 )));
1454 }
1455 };
1456
1457 let ttl: Option<i32> = annotations.get(ANNOTATION_TTL).and_then(|v| v.parse().ok());
1459
1460 let spec_rules = ingress
1461 .spec
1462 .as_ref()
1463 .and_then(|s| s.rules.as_ref())
1464 .cloned()
1465 .unwrap_or_default();
1466
1467 let arecord_api: Api<ARecord> =
1468 Api::namespaced(ctx.remote_client.clone(), &ctx.target_namespace);
1469
1470 for (idx, rule) in spec_rules.iter().enumerate() {
1471 let host = match rule.host.as_deref() {
1472 Some(h) if !h.is_empty() => h,
1473 _ => {
1474 debug!(ingress = %name, rule_index = idx, "Ingress rule has no host — skipping");
1475 continue;
1476 }
1477 };
1478
1479 let record_name = match resolve_record_name(&annotations, host, &zone) {
1480 Ok(n) => n,
1481 Err(e) => {
1482 warn!(ingress = %name, host = %host, zone = %zone, error = %e, "Host does not belong to zone — skipping rule");
1483 continue;
1484 }
1485 };
1486
1487 let cr_name = arecord_cr_name(&ctx.cluster_name, &namespace, &name, idx);
1488 let arecord = build_arecord(ARecordParams {
1489 name: &cr_name,
1490 target_namespace: &ctx.target_namespace,
1491 record_name: &record_name,
1492 ips: &ips,
1493 ttl,
1494 cluster_name: &ctx.cluster_name,
1495 ingress_namespace: &namespace,
1496 ingress_name: &name,
1497 zone: &zone,
1498 });
1499
1500 let ssapply = kube::api::PatchParams::apply("bindy-scout").force();
1502 match arecord_api
1503 .patch(&cr_name, &ssapply, &kube::api::Patch::Apply(&arecord))
1504 .await
1505 {
1506 Ok(_) => {
1507 info!(arecord = %cr_name, ingress = %name, host = %host, ips = ?ips, "ARecord created/updated");
1508 }
1509 Err(e) => {
1510 error!(arecord = %cr_name, ingress = %name, error = %e, "Failed to apply ARecord");
1511 return Err(ScoutError::from(anyhow!(
1512 "Failed to apply ARecord {cr_name}: {e}"
1513 )));
1514 }
1515 }
1516 }
1517
1518 delete_stale_cluster_arecords(
1521 &ctx.remote_client,
1522 &ctx.target_namespace,
1523 &ctx.cluster_name,
1524 &namespace,
1525 &name,
1526 )
1527 .await
1528 .map_err(ScoutError::from)?;
1529
1530 Ok(Action::await_change())
1531}
1532
1533async fn reconcile_service(
1546 svc: Arc<Service>,
1547 ctx: Arc<ScoutContext>,
1548) -> Result<Action, ScoutError> {
1549 let name = svc.name_any();
1550 let namespace = svc.namespace().unwrap_or_default();
1551
1552 if ctx.excluded_namespaces.contains(&namespace) {
1553 debug!(service = %name, ns = %namespace, "Skipping excluded namespace");
1554 return Ok(Action::await_change());
1555 }
1556
1557 if svc.metadata.deletion_timestamp.is_some() {
1559 if svc
1560 .metadata
1561 .finalizers
1562 .as_ref()
1563 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1564 .unwrap_or(false)
1565 {
1566 info!(service = %name, ns = %namespace, "Service deleting — cleaning up ARecord");
1567 delete_arecords_for_service(
1568 &ctx.remote_client,
1569 &ctx.target_namespace,
1570 &ctx.cluster_name,
1571 &namespace,
1572 &name,
1573 )
1574 .await
1575 .map_err(ScoutError::from)?;
1576 remove_finalizer_from_service(&ctx.client, &svc)
1577 .await
1578 .map_err(ScoutError::from)?;
1579 info!(service = %name, ns = %namespace, "Finalizer removed — Service deletion unblocked");
1580 }
1581 return Ok(Action::await_change());
1582 }
1583
1584 let annotations = svc
1585 .metadata
1586 .annotations
1587 .as_ref()
1588 .cloned()
1589 .unwrap_or_default();
1590
1591 if !is_scout_opted_in(&annotations) {
1593 let has_fin = svc
1594 .metadata
1595 .finalizers
1596 .as_ref()
1597 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1598 .unwrap_or(false);
1599 if has_fin {
1600 info!(service = %name, ns = %namespace, "Scout opt-in annotation removed — cleaning up ARecord and finalizer");
1601 delete_arecords_for_service(
1602 &ctx.remote_client,
1603 &ctx.target_namespace,
1604 &ctx.cluster_name,
1605 &namespace,
1606 &name,
1607 )
1608 .await
1609 .map_err(ScoutError::from)?;
1610 remove_finalizer_from_service(&ctx.client, &svc)
1611 .await
1612 .map_err(ScoutError::from)?;
1613 }
1614 debug!(service = %name, ns = %namespace, "No scout-enabled annotation — skipping");
1615 return Ok(Action::await_change());
1616 }
1617
1618 if !is_loadbalancer_service(&svc) {
1620 debug!(service = %name, ns = %namespace, "Service is not LoadBalancer type — skipping");
1621 return Ok(Action::await_change());
1622 }
1623
1624 let has_fin = svc
1626 .metadata
1627 .finalizers
1628 .as_ref()
1629 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1630 .unwrap_or(false);
1631 if !has_fin {
1632 add_finalizer_to_service(&ctx.client, &svc)
1633 .await
1634 .map_err(ScoutError::from)?;
1635 debug!(service = %name, ns = %namespace, "Finalizer added — re-queuing for record creation");
1636 return Ok(Action::await_change());
1637 }
1638
1639 let zone = match resolve_zone(&annotations, ctx.default_zone.as_deref()) {
1641 Some(z) => z,
1642 None => {
1643 warn!(service = %name, ns = %namespace, "No DNS zone available — skipping");
1644 return Ok(Action::requeue(Duration::from_secs(
1645 SCOUT_ERROR_REQUEUE_SECS,
1646 )));
1647 }
1648 };
1649
1650 let zone_exists = ctx
1652 .zone_store
1653 .state()
1654 .iter()
1655 .any(|z| z.spec.zone_name == zone);
1656 if !zone_exists {
1657 warn!(service = %name, ns = %namespace, zone = %zone, "Zone not found in DNSZone store — requeuing");
1658 return Ok(Action::requeue(Duration::from_secs(
1659 SCOUT_ERROR_REQUEUE_SECS,
1660 )));
1661 }
1662
1663 let ips = {
1665 let from_annotation = resolve_ips_from_annotation(&annotations);
1666 let from_defaults = if ctx.default_ips.is_empty() {
1667 None
1668 } else {
1669 Some(ctx.default_ips.clone())
1670 };
1671 let from_lb = resolve_ip_from_service_lb_status(&svc).map(|ip| vec![ip]);
1672
1673 match from_annotation.or(from_defaults).or(from_lb) {
1674 Some(ips) => ips,
1675 None => {
1676 warn!(service = %name, ns = %namespace, "No external IP yet — requeuing in {}s", SCOUT_ERROR_REQUEUE_SECS);
1677 return Ok(Action::requeue(Duration::from_secs(
1678 SCOUT_ERROR_REQUEUE_SECS,
1679 )));
1680 }
1681 }
1682 };
1683
1684 let ttl: Option<i32> = annotations.get(ANNOTATION_TTL).and_then(|v| v.parse().ok());
1685
1686 let fqdn = format!("{name}.{zone}");
1688 let record_name = match resolve_record_name(&annotations, &fqdn, &zone) {
1689 Ok(n) => n,
1690 Err(e) => {
1691 warn!(service = %name, zone = %zone, error = %e, "Cannot derive record name — skipping");
1692 return Ok(Action::requeue(Duration::from_secs(
1693 SCOUT_ERROR_REQUEUE_SECS,
1694 )));
1695 }
1696 };
1697
1698 let cr_name = service_arecord_cr_name(&ctx.cluster_name, &namespace, &name);
1699 let arecord = build_service_arecord(ServiceARecordParams {
1700 name: &cr_name,
1701 target_namespace: &ctx.target_namespace,
1702 record_name: &record_name,
1703 ips: &ips,
1704 ttl,
1705 cluster_name: &ctx.cluster_name,
1706 service_namespace: &namespace,
1707 service_name: &name,
1708 zone: &zone,
1709 });
1710
1711 let arecord_api: Api<ARecord> =
1712 Api::namespaced(ctx.remote_client.clone(), &ctx.target_namespace);
1713 let ssapply = kube::api::PatchParams::apply("bindy-scout").force();
1714 match arecord_api
1715 .patch(&cr_name, &ssapply, &kube::api::Patch::Apply(&arecord))
1716 .await
1717 {
1718 Ok(_) => {
1719 info!(arecord = %cr_name, service = %name, ips = ?ips, "ARecord created/updated for Service");
1720 }
1721 Err(e) => {
1722 error!(arecord = %cr_name, service = %name, error = %e, "Failed to apply ARecord for Service");
1723 return Err(ScoutError::from(anyhow!(
1724 "Failed to apply ARecord {cr_name}: {e}"
1725 )));
1726 }
1727 }
1728
1729 Ok(Action::await_change())
1730}
1731
1732fn service_error_policy(_obj: Arc<Service>, error: &ScoutError, _ctx: Arc<ScoutContext>) -> Action {
1734 error!(error = %error, "Scout service reconcile error — requeuing");
1735 Action::requeue(Duration::from_secs(SCOUT_ERROR_REQUEUE_SECS))
1736}
1737
1738fn error_policy(_obj: Arc<Ingress>, error: &ScoutError, _ctx: Arc<ScoutContext>) -> Action {
1740 error!(error = %error, "Scout reconcile error — requeuing");
1741 Action::requeue(Duration::from_secs(SCOUT_ERROR_REQUEUE_SECS))
1742}
1743
1744async fn reconcile_httproute(
1769 route: Arc<HTTPRoute>,
1770 ctx: Arc<ScoutContext>,
1771) -> Result<Action, ScoutError> {
1772 let name = route.name_any();
1773 let namespace = route.namespace().unwrap_or_default();
1774
1775 if ctx.excluded_namespaces.contains(&namespace) {
1777 debug!(httproute = %name, ns = %namespace, "Skipping excluded namespace");
1778 return Ok(Action::await_change());
1779 }
1780
1781 if route.metadata.deletion_timestamp.is_some() {
1783 if route
1784 .metadata
1785 .finalizers
1786 .as_ref()
1787 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1788 .unwrap_or(false)
1789 {
1790 info!(httproute = %name, ns = %namespace, "HTTPRoute deleting — cleaning up ARecords");
1791 delete_arecords_for_httproute(
1792 &ctx.remote_client,
1793 &ctx.target_namespace,
1794 &ctx.cluster_name,
1795 &namespace,
1796 &name,
1797 )
1798 .await
1799 .map_err(ScoutError::from)?;
1800 delete_stale_cluster_httproute_arecords(
1801 &ctx.remote_client,
1802 &ctx.target_namespace,
1803 &ctx.cluster_name,
1804 &namespace,
1805 &name,
1806 )
1807 .await
1808 .map_err(ScoutError::from)?;
1809 remove_finalizer_from_httproute(&ctx.client, &route)
1810 .await
1811 .map_err(ScoutError::from)?;
1812 info!(httproute = %name, ns = %namespace, "Finalizer removed — HTTPRoute deletion unblocked");
1813 }
1814 return Ok(Action::await_change());
1815 }
1816
1817 let annotations = route
1818 .metadata
1819 .annotations
1820 .as_ref()
1821 .cloned()
1822 .unwrap_or_default();
1823
1824 if !is_scout_opted_in(&annotations) {
1826 let has_fin = route
1827 .metadata
1828 .finalizers
1829 .as_ref()
1830 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1831 .unwrap_or(false);
1832 if has_fin {
1833 info!(httproute = %name, ns = %namespace, "Scout opt-in annotation removed — cleaning up ARecords and finalizer");
1834 delete_arecords_for_httproute(
1835 &ctx.remote_client,
1836 &ctx.target_namespace,
1837 &ctx.cluster_name,
1838 &namespace,
1839 &name,
1840 )
1841 .await
1842 .map_err(ScoutError::from)?;
1843 delete_stale_cluster_httproute_arecords(
1844 &ctx.remote_client,
1845 &ctx.target_namespace,
1846 &ctx.cluster_name,
1847 &namespace,
1848 &name,
1849 )
1850 .await
1851 .map_err(ScoutError::from)?;
1852 remove_finalizer_from_httproute(&ctx.client, &route)
1853 .await
1854 .map_err(ScoutError::from)?;
1855 }
1856 debug!(httproute = %name, ns = %namespace, "No scout-enabled annotation — skipping");
1857 return Ok(Action::await_change());
1858 }
1859
1860 let has_fin = route
1862 .metadata
1863 .finalizers
1864 .as_ref()
1865 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
1866 .unwrap_or(false);
1867 if !has_fin {
1868 add_finalizer_to_httproute(&ctx.client, &route)
1869 .await
1870 .map_err(ScoutError::from)?;
1871 debug!(httproute = %name, ns = %namespace, "Finalizer added — re-queuing for record creation");
1872 return Ok(Action::await_change());
1873 }
1874
1875 let zone = match resolve_zone(&annotations, ctx.default_zone.as_deref()) {
1877 Some(z) => z,
1878 None => {
1879 warn!(httproute = %name, ns = %namespace, "No DNS zone available — skipping");
1880 return Ok(Action::requeue(Duration::from_secs(
1881 SCOUT_ERROR_REQUEUE_SECS,
1882 )));
1883 }
1884 };
1885
1886 let zone_exists = ctx
1888 .zone_store
1889 .state()
1890 .iter()
1891 .any(|z| z.spec.zone_name == zone);
1892 if !zone_exists {
1893 warn!(httproute = %name, ns = %namespace, zone = %zone, "Zone not found in DNSZone store — requeuing");
1894 return Ok(Action::requeue(Duration::from_secs(
1895 SCOUT_ERROR_REQUEUE_SECS,
1896 )));
1897 }
1898
1899 let ips = {
1901 let from_annotation = resolve_ips_from_annotation(&annotations);
1902 let from_defaults = if ctx.default_ips.is_empty() {
1903 None
1904 } else {
1905 Some(ctx.default_ips.clone())
1906 };
1907
1908 match from_annotation.or(from_defaults) {
1909 Some(ips) => ips,
1910 None => {
1911 warn!(httproute = %name, ns = %namespace, "No IP available (no annotation override, no default IPs) — requeuing");
1912 return Ok(Action::requeue(Duration::from_secs(
1913 SCOUT_ERROR_REQUEUE_SECS,
1914 )));
1915 }
1916 }
1917 };
1918
1919 let ttl: Option<i32> = annotations.get(ANNOTATION_TTL).and_then(|v| v.parse().ok());
1920
1921 let hostnames = route
1923 .spec
1924 .as_ref()
1925 .and_then(|s| s.hostnames.as_ref())
1926 .cloned()
1927 .unwrap_or_default();
1928
1929 let arecord_api: Api<ARecord> =
1930 Api::namespaced(ctx.remote_client.clone(), &ctx.target_namespace);
1931
1932 for (idx, hostname) in hostnames.iter().enumerate() {
1933 if hostname.is_empty() {
1934 debug!(httproute = %name, hostname_index = idx, "HTTPRoute hostname is empty — skipping");
1935 continue;
1936 }
1937
1938 let record_name = match resolve_record_name(&annotations, hostname, &zone) {
1939 Ok(n) => n,
1940 Err(e) => {
1941 warn!(httproute = %name, hostname = %hostname, zone = %zone, error = %e, "Hostname does not belong to zone — skipping");
1942 continue;
1943 }
1944 };
1945
1946 let cr_name = httproute_arecord_cr_name(&ctx.cluster_name, &namespace, &name, idx);
1947 let arecord = build_httproute_arecord(HTTPRouteARecordParams {
1948 name: &cr_name,
1949 target_namespace: &ctx.target_namespace,
1950 record_name: &record_name,
1951 ips: &ips,
1952 ttl,
1953 cluster_name: &ctx.cluster_name,
1954 route_namespace: &namespace,
1955 route_name: &name,
1956 zone: &zone,
1957 });
1958
1959 let ssapply = kube::api::PatchParams::apply("bindy-scout").force();
1961 match arecord_api
1962 .patch(&cr_name, &ssapply, &kube::api::Patch::Apply(&arecord))
1963 .await
1964 {
1965 Ok(_) => {
1966 info!(arecord = %cr_name, httproute = %name, hostname = %hostname, ips = ?ips, "ARecord created/updated for HTTPRoute");
1967 }
1968 Err(e) => {
1969 error!(arecord = %cr_name, httproute = %name, error = %e, "Failed to apply ARecord for HTTPRoute");
1970 return Err(ScoutError::from(anyhow!(
1971 "Failed to apply ARecord {cr_name}: {e}"
1972 )));
1973 }
1974 }
1975 }
1976
1977 delete_stale_cluster_httproute_arecords(
1979 &ctx.remote_client,
1980 &ctx.target_namespace,
1981 &ctx.cluster_name,
1982 &namespace,
1983 &name,
1984 )
1985 .await
1986 .map_err(ScoutError::from)?;
1987
1988 Ok(Action::await_change())
1989}
1990
1991async fn reconcile_tlsroute(
2000 route: Arc<TLSRoute>,
2001 ctx: Arc<ScoutContext>,
2002) -> Result<Action, ScoutError> {
2003 let name = route.name_any();
2004 let namespace = route.namespace().unwrap_or_default();
2005
2006 if ctx.excluded_namespaces.contains(&namespace) {
2008 debug!(tlsroute = %name, ns = %namespace, "Skipping excluded namespace");
2009 return Ok(Action::await_change());
2010 }
2011
2012 if route.metadata.deletion_timestamp.is_some() {
2014 if route
2015 .metadata
2016 .finalizers
2017 .as_ref()
2018 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
2019 .unwrap_or(false)
2020 {
2021 info!(tlsroute = %name, ns = %namespace, "TLSRoute deleting — cleaning up ARecords");
2022 delete_arecords_for_tlsroute(
2023 &ctx.remote_client,
2024 &ctx.target_namespace,
2025 &ctx.cluster_name,
2026 &namespace,
2027 &name,
2028 )
2029 .await
2030 .map_err(ScoutError::from)?;
2031 delete_stale_cluster_tlsroute_arecords(
2032 &ctx.remote_client,
2033 &ctx.target_namespace,
2034 &ctx.cluster_name,
2035 &namespace,
2036 &name,
2037 )
2038 .await
2039 .map_err(ScoutError::from)?;
2040 remove_finalizer_from_tlsroute(&ctx.client, &route)
2041 .await
2042 .map_err(ScoutError::from)?;
2043 info!(tlsroute = %name, ns = %namespace, "Finalizer removed — TLSRoute deletion unblocked");
2044 }
2045 return Ok(Action::await_change());
2046 }
2047
2048 let annotations = route
2049 .metadata
2050 .annotations
2051 .as_ref()
2052 .cloned()
2053 .unwrap_or_default();
2054
2055 if !is_scout_opted_in(&annotations) {
2057 let has_fin = route
2058 .metadata
2059 .finalizers
2060 .as_ref()
2061 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
2062 .unwrap_or(false);
2063 if has_fin {
2064 info!(tlsroute = %name, ns = %namespace, "Scout opt-in annotation removed — cleaning up ARecords and finalizer");
2065 delete_arecords_for_tlsroute(
2066 &ctx.remote_client,
2067 &ctx.target_namespace,
2068 &ctx.cluster_name,
2069 &namespace,
2070 &name,
2071 )
2072 .await
2073 .map_err(ScoutError::from)?;
2074 delete_stale_cluster_tlsroute_arecords(
2075 &ctx.remote_client,
2076 &ctx.target_namespace,
2077 &ctx.cluster_name,
2078 &namespace,
2079 &name,
2080 )
2081 .await
2082 .map_err(ScoutError::from)?;
2083 remove_finalizer_from_tlsroute(&ctx.client, &route)
2084 .await
2085 .map_err(ScoutError::from)?;
2086 }
2087 debug!(tlsroute = %name, ns = %namespace, "No scout-enabled annotation — skipping");
2088 return Ok(Action::await_change());
2089 }
2090
2091 let has_fin = route
2093 .metadata
2094 .finalizers
2095 .as_ref()
2096 .map(|fs| fs.iter().any(|f| f == FINALIZER_SCOUT))
2097 .unwrap_or(false);
2098 if !has_fin {
2099 add_finalizer_to_tlsroute(&ctx.client, &route)
2100 .await
2101 .map_err(ScoutError::from)?;
2102 debug!(tlsroute = %name, ns = %namespace, "Finalizer added — re-queuing for record creation");
2103 return Ok(Action::await_change());
2104 }
2105
2106 let zone = match resolve_zone(&annotations, ctx.default_zone.as_deref()) {
2108 Some(z) => z,
2109 None => {
2110 warn!(tlsroute = %name, ns = %namespace, "No DNS zone available — skipping");
2111 return Ok(Action::requeue(Duration::from_secs(
2112 SCOUT_ERROR_REQUEUE_SECS,
2113 )));
2114 }
2115 };
2116
2117 let zone_exists = ctx
2119 .zone_store
2120 .state()
2121 .iter()
2122 .any(|z| z.spec.zone_name == zone);
2123 if !zone_exists {
2124 warn!(tlsroute = %name, ns = %namespace, zone = %zone, "Zone not found in DNSZone store — requeuing");
2125 return Ok(Action::requeue(Duration::from_secs(
2126 SCOUT_ERROR_REQUEUE_SECS,
2127 )));
2128 }
2129
2130 let ips = {
2132 let from_annotation = resolve_ips_from_annotation(&annotations);
2133 let from_defaults = if ctx.default_ips.is_empty() {
2134 None
2135 } else {
2136 Some(ctx.default_ips.clone())
2137 };
2138
2139 match from_annotation.or(from_defaults) {
2140 Some(ips) => ips,
2141 None => {
2142 warn!(tlsroute = %name, ns = %namespace, "No IP available (no annotation override, no default IPs) — requeuing");
2143 return Ok(Action::requeue(Duration::from_secs(
2144 SCOUT_ERROR_REQUEUE_SECS,
2145 )));
2146 }
2147 }
2148 };
2149
2150 let ttl: Option<i32> = annotations.get(ANNOTATION_TTL).and_then(|v| v.parse().ok());
2151
2152 let hostnames = route
2154 .spec
2155 .as_ref()
2156 .and_then(|s| s.hostnames.as_ref())
2157 .cloned()
2158 .unwrap_or_default();
2159
2160 let arecord_api: Api<ARecord> =
2161 Api::namespaced(ctx.remote_client.clone(), &ctx.target_namespace);
2162
2163 for (idx, hostname) in hostnames.iter().enumerate() {
2164 if hostname.is_empty() {
2165 debug!(tlsroute = %name, hostname_index = idx, "TLSRoute hostname is empty — skipping");
2166 continue;
2167 }
2168
2169 let record_name = match resolve_record_name(&annotations, hostname, &zone) {
2170 Ok(n) => n,
2171 Err(e) => {
2172 warn!(tlsroute = %name, hostname = %hostname, zone = %zone, error = %e, "Hostname does not belong to zone — skipping");
2173 continue;
2174 }
2175 };
2176
2177 let cr_name = tlsroute_arecord_cr_name(&ctx.cluster_name, &namespace, &name, idx);
2178 let arecord = build_tlsroute_arecord(TLSRouteARecordParams {
2179 name: &cr_name,
2180 target_namespace: &ctx.target_namespace,
2181 record_name: &record_name,
2182 ips: &ips,
2183 ttl,
2184 cluster_name: &ctx.cluster_name,
2185 route_namespace: &namespace,
2186 route_name: &name,
2187 zone: &zone,
2188 });
2189
2190 let ssapply = kube::api::PatchParams::apply("bindy-scout").force();
2192 match arecord_api
2193 .patch(&cr_name, &ssapply, &kube::api::Patch::Apply(&arecord))
2194 .await
2195 {
2196 Ok(_) => {
2197 info!(arecord = %cr_name, tlsroute = %name, hostname = %hostname, ips = ?ips, "ARecord created/updated for TLSRoute");
2198 }
2199 Err(e) => {
2200 error!(arecord = %cr_name, tlsroute = %name, error = %e, "Failed to apply ARecord for TLSRoute");
2201 return Err(ScoutError::from(anyhow!(
2202 "Failed to apply ARecord {cr_name}: {e}"
2203 )));
2204 }
2205 }
2206 }
2207
2208 delete_stale_cluster_tlsroute_arecords(
2210 &ctx.remote_client,
2211 &ctx.target_namespace,
2212 &ctx.cluster_name,
2213 &namespace,
2214 &name,
2215 )
2216 .await
2217 .map_err(ScoutError::from)?;
2218
2219 Ok(Action::await_change())
2220}
2221
2222fn gateway_route_error_policy(
2224 _obj: Arc<HTTPRoute>,
2225 error: &ScoutError,
2226 _ctx: Arc<ScoutContext>,
2227) -> Action {
2228 error!(error = %error, "Scout HTTPRoute reconcile error — requeuing");
2229 Action::requeue(Duration::from_secs(SCOUT_ERROR_REQUEUE_SECS))
2230}
2231
2232fn tlsroute_error_policy(
2234 _obj: Arc<TLSRoute>,
2235 error: &ScoutError,
2236 _ctx: Arc<ScoutContext>,
2237) -> Action {
2238 error!(error = %error, "Scout TLSRoute reconcile error — requeuing");
2239 Action::requeue(Duration::from_secs(SCOUT_ERROR_REQUEUE_SECS))
2240}
2241
2242async fn build_remote_client(
2257 local_client: &Client,
2258 secret_name: &str,
2259 secret_namespace: &str,
2260) -> Result<Client> {
2261 let api: Api<Secret> = Api::namespaced(local_client.clone(), secret_namespace);
2262 let secret = api.get(secret_name).await.map_err(|e| {
2263 anyhow!("Failed to read kubeconfig Secret {secret_namespace}/{secret_name}: {e}")
2264 })?;
2265
2266 let kubeconfig_bytes = secret
2267 .data
2268 .as_ref()
2269 .and_then(|d| d.get("kubeconfig"))
2270 .ok_or_else(|| {
2271 anyhow!("Secret {secret_namespace}/{secret_name} has no 'kubeconfig' key in .data")
2272 })?;
2273
2274 let kubeconfig_str = std::str::from_utf8(&kubeconfig_bytes.0)
2275 .map_err(|e| anyhow!("kubeconfig in Secret is not valid UTF-8: {e}"))?;
2276
2277 let kubeconfig = Kubeconfig::from_yaml(kubeconfig_str)
2278 .map_err(|e| anyhow!("Failed to parse kubeconfig from Secret: {e}"))?;
2279
2280 let config = kube::Config::from_custom_kubeconfig(kubeconfig, &KubeConfigOptions::default())
2281 .await
2282 .map_err(|e| anyhow!("Failed to build client config from kubeconfig: {e}"))?;
2283
2284 Client::try_from(config).map_err(|e| anyhow!("Failed to create remote Kubernetes client: {e}"))
2285}
2286
2287struct ScoutConfig {
2293 target_namespace: String,
2294 cluster_name: String,
2295 excluded_namespaces: Vec<String>,
2296 default_ips: Vec<String>,
2299 default_zone: Option<String>,
2302 remote_secret_name: Option<String>,
2305 remote_secret_namespace: String,
2307}
2308
2309impl ScoutConfig {
2310 fn from_env(
2314 cli_cluster_name: Option<String>,
2315 cli_namespace: Option<String>,
2316 cli_default_ips: Vec<String>,
2317 cli_default_zone: Option<String>,
2318 ) -> Result<Self> {
2319 let target_namespace = cli_namespace
2320 .filter(|s| !s.is_empty())
2321 .or_else(|| std::env::var("BINDY_SCOUT_NAMESPACE").ok())
2322 .unwrap_or_else(|| DEFAULT_SCOUT_NAMESPACE.to_string());
2323
2324 let cluster_name = cli_cluster_name
2325 .filter(|s| !s.is_empty())
2326 .or_else(|| std::env::var("BINDY_SCOUT_CLUSTER_NAME").ok())
2327 .ok_or_else(|| {
2328 anyhow!("BINDY_SCOUT_CLUSTER_NAME is required (set via --cluster-name or env var)")
2329 })?;
2330
2331 let own_namespace =
2332 std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string());
2333
2334 let mut excluded_namespaces: Vec<String> = std::env::var("BINDY_SCOUT_EXCLUDE_NAMESPACES")
2335 .unwrap_or_default()
2336 .split(',')
2337 .map(str::trim)
2338 .filter(|s| !s.is_empty())
2339 .map(ToString::to_string)
2340 .collect();
2341
2342 if !excluded_namespaces.contains(&own_namespace) {
2344 excluded_namespaces.push(own_namespace.clone());
2345 }
2346
2347 let default_ips = if !cli_default_ips.is_empty() {
2349 cli_default_ips
2350 } else {
2351 std::env::var("BINDY_SCOUT_DEFAULT_IPS")
2352 .unwrap_or_default()
2353 .split(',')
2354 .map(str::trim)
2355 .filter(|s| !s.is_empty())
2356 .map(ToString::to_string)
2357 .collect()
2358 };
2359
2360 let default_zone = cli_default_zone.filter(|s| !s.is_empty()).or_else(|| {
2362 std::env::var("BINDY_SCOUT_DEFAULT_ZONE")
2363 .ok()
2364 .filter(|s| !s.is_empty())
2365 });
2366
2367 let remote_secret_name = std::env::var("BINDY_SCOUT_REMOTE_SECRET")
2368 .ok()
2369 .filter(|s| !s.is_empty());
2370
2371 let remote_secret_namespace =
2372 std::env::var("BINDY_SCOUT_REMOTE_SECRET_NAMESPACE").unwrap_or(own_namespace);
2373
2374 Ok(Self {
2375 target_namespace,
2376 cluster_name,
2377 excluded_namespaces,
2378 default_ips,
2379 default_zone,
2380 remote_secret_name,
2381 remote_secret_namespace,
2382 })
2383 }
2384}
2385
2386fn diagnose_reflector_error(e: &watcher::Error) -> String {
2396 let (phase, client_err) = match e {
2399 watcher::Error::InitialListFailed(e) => ("initial list", e),
2400 watcher::Error::WatchStartFailed(e) => ("watch start", e),
2401 watcher::Error::WatchFailed(e) => ("watch stream", e),
2402 watcher::Error::WatchError(status) => {
2403 return format!(
2404 "API server returned error during watch: {} (HTTP {})",
2405 status.message, status.code
2406 );
2407 }
2408 watcher::Error::NoResourceVersion => {
2409 return "resource does not support watch (no resourceVersion returned)".to_string();
2410 }
2411 };
2412
2413 let detail = match client_err {
2414 KubeError::Api(status) => match status.code {
2415 401 => format!(
2416 "unauthorized — check credentials/token ({})",
2417 status.message
2418 ),
2419 403 => format!("forbidden — check RBAC permissions ({})", status.message),
2420 code => format!("API error HTTP {code} — {}", status.message),
2421 },
2422 KubeError::Auth(e) => format!("authentication error — {e}"),
2423 KubeError::Service(e) => format!("cannot connect to API server — {e}"),
2424 KubeError::HyperError(e) => format!("HTTP transport error — {e}"),
2425 other => format!("{other}"),
2426 };
2427
2428 format!("{phase} failed: {detail}")
2429}
2430
2431pub async fn run_scout(
2441 cli_cluster_name: Option<String>,
2442 cli_namespace: Option<String>,
2443 cli_default_ips: Vec<String>,
2444 cli_default_zone: Option<String>,
2445) -> Result<()> {
2446 let config = ScoutConfig::from_env(
2447 cli_cluster_name,
2448 cli_namespace,
2449 cli_default_ips,
2450 cli_default_zone,
2451 )?;
2452
2453 let local_client = Client::try_default().await?;
2454
2455 let remote_client = if let Some(ref secret_name) = config.remote_secret_name {
2456 info!(
2457 cluster = %config.cluster_name,
2458 target_ns = %config.target_namespace,
2459 secret = %secret_name,
2460 secret_ns = %config.remote_secret_namespace,
2461 excluded = ?config.excluded_namespaces,
2462 default_ips = ?config.default_ips,
2463 default_zone = ?config.default_zone,
2464 "Starting bindy scout in remote cluster mode"
2465 );
2466 build_remote_client(&local_client, secret_name, &config.remote_secret_namespace).await?
2467 } else {
2468 info!(
2469 cluster = %config.cluster_name,
2470 target_ns = %config.target_namespace,
2471 excluded = ?config.excluded_namespaces,
2472 default_ips = ?config.default_ips,
2473 default_zone = ?config.default_zone,
2474 "Starting bindy scout in same-cluster mode"
2475 );
2476 local_client.clone()
2477 };
2478
2479 let dnszone_api: Api<DNSZone> =
2485 Api::namespaced(remote_client.clone(), &config.target_namespace);
2486 let (dnszone_reader, dnszone_writer) = reflector::store();
2487 let dnszone_reflector = reflector(
2488 dnszone_writer,
2489 watcher(dnszone_api, WatcherConfig::default()),
2490 );
2491
2492 tokio::spawn(async move {
2497 dnszone_reflector
2498 .for_each(|event| async move {
2499 match event {
2500 Ok(_) => {}
2501 Err(e) => {
2502 error!(diagnosis = %diagnose_reflector_error(&e), "DNSZone reflector error");
2503 tokio::time::sleep(tokio::time::Duration::from_secs(
2504 REFLECTOR_ERROR_BACKOFF_SECS,
2505 ))
2506 .await;
2507 }
2508 }
2509 })
2510 .await;
2511 });
2512
2513 let ctx = Arc::new(ScoutContext {
2514 client: local_client.clone(),
2515 remote_client,
2516 target_namespace: config.target_namespace,
2517 cluster_name: config.cluster_name,
2518 excluded_namespaces: config.excluded_namespaces,
2519 default_ips: config.default_ips,
2520 default_zone: config.default_zone,
2521 zone_store: dnszone_reader,
2522 });
2523
2524 let ingress_api: Api<Ingress> = Api::all(local_client.clone());
2526 let svc_api: Api<Service> = Api::all(local_client.clone());
2528 let httproute_api: Api<HTTPRoute> = Api::all(local_client.clone());
2530 let tlsroute_api: Api<TLSRoute> = Api::all(local_client.clone());
2532
2533 info!("Scout controller running — watching Ingresses, Services, HTTPRoutes, and TLSRoutes");
2534
2535 let ingress_controller = Controller::new(ingress_api, WatcherConfig::default())
2536 .run(reconcile, error_policy, ctx.clone())
2537 .for_each(|res| async move {
2538 match res {
2539 Ok(obj) => debug!(obj = ?obj, "Reconciled Ingress"),
2540 Err(e) => error!(error = %e, "Ingress reconcile failed"),
2541 }
2542 });
2543
2544 let service_controller = Controller::new(svc_api, WatcherConfig::default())
2545 .run(reconcile_service, service_error_policy, ctx.clone())
2546 .for_each(|res| async move {
2547 match res {
2548 Ok(obj) => debug!(obj = ?obj, "Reconciled Service"),
2549 Err(e) => error!(error = %e, "Service reconcile failed"),
2550 }
2551 });
2552
2553 let httproute_controller = Controller::new(httproute_api, WatcherConfig::default())
2554 .run(reconcile_httproute, gateway_route_error_policy, ctx.clone())
2555 .for_each(|res| async move {
2556 match res {
2557 Ok(obj) => debug!(obj = ?obj, "Reconciled HTTPRoute"),
2558 Err(e) => error!(error = %e, "HTTPRoute reconcile failed"),
2559 }
2560 });
2561
2562 let tlsroute_controller = Controller::new(tlsroute_api, WatcherConfig::default())
2563 .run(reconcile_tlsroute, tlsroute_error_policy, ctx)
2564 .for_each(|res| async move {
2565 match res {
2566 Ok(obj) => debug!(obj = ?obj, "Reconciled TLSRoute"),
2567 Err(e) => error!(error = %e, "TLSRoute reconcile failed"),
2568 }
2569 });
2570
2571 futures::future::join4(
2572 ingress_controller,
2573 service_controller,
2574 httproute_controller,
2575 tlsroute_controller,
2576 )
2577 .await;
2578
2579 Ok(())
2580}