1use crate::bind9::Bind9Manager;
10use crate::context::Context;
11use crate::crd::{DNSZone, RecordStatus};
12use anyhow::{anyhow, Result};
13use futures::StreamExt;
14use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
15use kube::api::Api;
16use kube::core::NamespaceResourceScope;
17use kube::runtime::controller::Action;
18use kube::runtime::finalizer;
19use kube::runtime::watcher::Config as WatcherConfig;
20use kube::runtime::Controller;
21use kube::{Resource, ResourceExt};
22use serde::de::DeserializeOwned;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::sync::Arc;
26use std::time::Duration;
27use tracing::{error, info};
28
29#[derive(Debug, thiserror::Error)]
31#[error(transparent)]
32pub struct ReconcileError(#[from] anyhow::Error);
33
34#[allow(clippy::needless_pass_by_value)] fn error_policy<T, C>(resource: Arc<T>, err: &ReconcileError, _ctx: Arc<C>) -> Action
39where
40 T: Debug,
41{
42 error!(
43 error = %err,
44 resource = ?resource,
45 "Reconciliation error - will retry in {}s",
46 crate::constants::ERROR_REQUEUE_DURATION_SECS
47 );
48 Action::requeue(Duration::from_secs(
49 crate::constants::ERROR_REQUEUE_DURATION_SECS,
50 ))
51}
52
53pub trait DnsRecordType:
58 Resource<DynamicType = (), Scope = NamespaceResourceScope>
59 + Clone
60 + Debug
61 + DeserializeOwned
62 + Serialize
63 + Send
64 + Sync
65 + 'static
66{
67 const KIND: &'static str;
69
70 const FINALIZER: &'static str;
72
73 const RECORD_TYPE_STR: &'static str;
75
76 fn hickory_record_type() -> hickory_client::rr::RecordType;
78
79 fn reconcile_record(
81 context: Arc<Context>,
82 record: Self,
83 ) -> impl std::future::Future<Output = Result<(), ReconcileError>> + Send;
84
85 fn metadata(&self) -> &ObjectMeta;
87
88 fn status(&self) -> &Option<RecordStatus>;
90}
91
92pub async fn run_generic_record_operator<T>(
106 context: Arc<Context>,
107 bind9_manager: Arc<Bind9Manager>,
108) -> Result<()>
109where
110 T: DnsRecordType,
111{
112 info!("Starting {} operator", T::KIND);
113
114 let client = context.client.clone();
115 let api = Api::<T>::all(client.clone());
116 let dnszone_api = Api::<DNSZone>::all(client.clone());
117
118 let watcher_config = WatcherConfig::default().any_semantic();
120
121 let ctx = Arc::new((context.clone(), bind9_manager));
123
124 Controller::new(api, watcher_config.clone())
125 .watches(dnszone_api, watcher_config, |zone| {
126 let Some(namespace) = zone.namespace() else {
129 return vec![];
130 };
131
132 let empty_vec = Vec::new();
134 let records = zone.status.as_ref().map_or(&empty_vec, |s| &s.records);
135
136 records
137 .iter()
138 .filter(|record_ref| {
139 record_ref.kind == T::KIND
141 && record_ref.last_reconciled_at.is_none()
142 && record_ref.namespace == namespace
143 })
144 .map(|record_ref| {
145 kube::runtime::reflector::ObjectRef::new(&record_ref.name)
146 .within(&record_ref.namespace)
147 })
148 .collect::<Vec<_>>()
149 })
150 .run(
151 move |record: Arc<T>, ctx_clone: Arc<(Arc<Context>, Arc<Bind9Manager>)>| {
152 reconcile_wrapper(record, ctx_clone)
153 },
154 error_policy,
155 ctx,
156 )
157 .for_each(|_| futures::future::ready(()))
158 .await;
159
160 Ok(())
161}
162
163async fn reconcile_wrapper<T>(
171 record: Arc<T>,
172 ctx: Arc<(Arc<Context>, Arc<Bind9Manager>)>,
173) -> Result<Action, ReconcileError>
174where
175 T: DnsRecordType,
176{
177 let start = std::time::Instant::now();
178 let context = ctx.0.clone();
179 let client = context.client.clone();
180 let namespace = record
181 .metadata()
182 .namespace
183 .as_ref()
184 .ok_or_else(|| ReconcileError::from(anyhow!("{} has no namespace", T::KIND)))?;
185 let api: Api<T> = Api::namespaced(client.clone(), namespace);
186
187 let result = finalizer(&api, T::FINALIZER, record.clone(), |event| async {
189 match event {
190 finalizer::Event::Apply(rec) => {
191 T::reconcile_record(context.clone(), (*rec).clone()).await?;
193
194 info!("Successfully reconciled {}: {}", T::KIND, rec.name_any());
195
196 let updated_record = api
198 .get(&rec.name_any())
199 .await
200 .map_err(|e| ReconcileError::from(anyhow::Error::from(e)))?;
201
202 let is_ready = crate::record_wrappers::is_resource_ready(updated_record.status());
204
205 Ok(crate::record_wrappers::requeue_based_on_readiness(is_ready))
206 }
207 finalizer::Event::Cleanup(rec) => {
208 use crate::reconcilers::delete_record;
210
211 delete_record(
212 &client,
213 &*rec,
214 T::RECORD_TYPE_STR,
215 T::hickory_record_type(),
216 &context.stores,
217 )
218 .await
219 .map_err(ReconcileError::from)?;
220
221 info!(
222 "Successfully deleted {} from BIND9: {}",
223 T::KIND,
224 rec.name_any()
225 );
226 crate::metrics::record_resource_deleted(T::KIND);
227 Ok(Action::await_change())
228 }
229 }
230 })
231 .await;
232
233 let duration = start.elapsed();
234 if result.is_ok() {
235 crate::metrics::record_reconciliation_success(T::KIND, duration);
236 } else {
237 crate::metrics::record_reconciliation_error(T::KIND, duration);
238 crate::metrics::record_error(T::KIND, crate::record_wrappers::ERROR_TYPE_RECONCILE);
239 }
240
241 result.map_err(|e: finalizer::Error<ReconcileError>| match e {
242 finalizer::Error::ApplyFailed(err) | finalizer::Error::CleanupFailed(err) => err,
243 finalizer::Error::AddFinalizer(err) | finalizer::Error::RemoveFinalizer(err) => {
244 ReconcileError::from(anyhow!("Finalizer error: {err}"))
245 }
246 finalizer::Error::UnnamedObject => ReconcileError::from(anyhow!("{} has no name", T::KIND)),
247 finalizer::Error::InvalidFinalizer => {
248 ReconcileError::from(anyhow!("Invalid finalizer for {}", T::KIND))
249 }
250 })
251}