1use crate::constants::{API_GROUP_VERSION, KIND_BIND9_CLUSTER, KIND_CLUSTER_BIND9_PROVIDER};
16use crate::context::Context;
17use crate::crd::{
18 Bind9Cluster, Bind9ClusterStatus, Bind9Instance, ClusterBind9Provider, Condition,
19};
20use crate::labels::FINALIZER_BIND9_CLUSTER;
21use crate::reconcilers::finalizers::{
22 ensure_cluster_finalizer, handle_cluster_deletion, FinalizerCleanup,
23};
24use crate::status_reasons::{
25 CONDITION_TYPE_READY, REASON_ALL_READY, REASON_NOT_READY, REASON_NO_CHILDREN,
26 REASON_PARTIALLY_READY,
27};
28use anyhow::Result;
29use chrono::Utc;
30use kube::{
31 api::{ListParams, Patch, PatchParams},
32 client::Client,
33 Api, ResourceExt,
34};
35use serde_json::json;
36use std::sync::Arc;
37use tracing::{debug, error, info, warn};
38
39#[async_trait::async_trait]
44impl FinalizerCleanup for ClusterBind9Provider {
45 async fn cleanup(&self, client: &Client) -> Result<()> {
46 use crate::labels::{
47 BINDY_CLUSTER_LABEL, BINDY_MANAGED_BY_LABEL, MANAGED_BY_CLUSTER_BIND9_PROVIDER,
48 };
49 use kube::api::DeleteParams;
50
51 let name = self.name_any();
52
53 info!(
55 "Deleting managed Bind9Cluster resources for global cluster {}",
56 name
57 );
58
59 let clusters_api: Api<Bind9Cluster> = Api::all(client.clone());
60 let all_clusters = clusters_api.list(&ListParams::default()).await?;
61
62 let managed_clusters: Vec<_> = all_clusters
64 .items
65 .iter()
66 .filter(|c| {
67 c.metadata.labels.as_ref().is_some_and(|labels| {
68 labels.get(BINDY_MANAGED_BY_LABEL)
69 == Some(&MANAGED_BY_CLUSTER_BIND9_PROVIDER.to_string())
70 && labels.get(BINDY_CLUSTER_LABEL) == Some(&name.clone())
71 })
72 })
73 .collect();
74
75 if !managed_clusters.is_empty() {
76 info!(
77 "Found {} managed Bind9Cluster resources to delete for global cluster {}",
78 managed_clusters.len(),
79 name
80 );
81
82 for managed_cluster in managed_clusters {
83 let cluster_name = managed_cluster.name_any();
84 let cluster_namespace = managed_cluster.namespace().unwrap_or_default();
85
86 info!(
87 "Deleting managed Bind9Cluster {}/{} for global cluster {}",
88 cluster_namespace, cluster_name, name
89 );
90
91 let api: Api<Bind9Cluster> = Api::namespaced(client.clone(), &cluster_namespace);
92 match api.delete(&cluster_name, &DeleteParams::default()).await {
93 Ok(_) => {
94 info!(
95 "Successfully deleted Bind9Cluster {}/{}",
96 cluster_namespace, cluster_name
97 );
98 }
99 Err(e) => {
100 if e.to_string().contains("NotFound") {
102 debug!(
103 "Bind9Cluster {}/{} already deleted",
104 cluster_namespace, cluster_name
105 );
106 } else {
107 error!(
108 "Failed to delete Bind9Cluster {}/{}: {}",
109 cluster_namespace, cluster_name, e
110 );
111 return Err(e.into());
112 }
113 }
114 }
115 }
116 }
117
118 let instances_api: Api<Bind9Instance> = Api::all(client.clone());
121 let instances = instances_api.list(&ListParams::default()).await?;
122
123 let referencing_instances: Vec<_> = instances
124 .items
125 .iter()
126 .filter(|inst| inst.spec.cluster_ref == name)
127 .collect();
128
129 if !referencing_instances.is_empty() {
130 warn!(
131 "ClusterBind9Provider {} still has {} referencing instances. \
132 These will be cleaned up by their parent Bind9Cluster finalizers.",
133 name,
134 referencing_instances.len()
135 );
136 }
137
138 Ok(())
139 }
140}
141
142pub async fn reconcile_clusterbind9provider(
164 ctx: Arc<Context>,
165 cluster: ClusterBind9Provider,
166) -> Result<()> {
167 let client = ctx.client.clone();
168 let name = cluster.name_any();
169
170 info!("Reconciling ClusterBind9Provider: {}", name);
171 debug!(
172 name = %name,
173 generation = ?cluster.metadata.generation,
174 "Starting ClusterBind9Provider reconciliation (cluster-scoped)"
175 );
176
177 if cluster.metadata.deletion_timestamp.is_some() {
179 return handle_cluster_deletion(&client, &cluster, FINALIZER_BIND9_CLUSTER).await;
180 }
181
182 ensure_cluster_finalizer(&client, &cluster, FINALIZER_BIND9_CLUSTER).await?;
184
185 let current_generation = cluster.metadata.generation;
187 let observed_generation = cluster.status.as_ref().and_then(|s| s.observed_generation);
188
189 let spec_changed =
191 crate::reconcilers::should_reconcile(current_generation, observed_generation);
192
193 let drift_detected = if spec_changed {
195 false
196 } else {
197 detect_cluster_drift(&client, &cluster).await?
198 };
199
200 if spec_changed || drift_detected {
201 if drift_detected {
202 info!(
203 "Spec unchanged but cluster drift detected for ClusterBind9Provider {}",
204 name
205 );
206 } else {
207 debug!(
208 "Reconciliation needed: current_generation={:?}, observed_generation={:?}",
209 current_generation, observed_generation
210 );
211 }
212
213 reconcile_namespace_clusters(&client, &cluster).await?;
217 } else {
218 debug!(
219 "Spec unchanged (generation={:?}) and no drift detected, skipping resource reconciliation",
220 current_generation
221 );
222 }
223
224 update_cluster_status(&client, &cluster).await?;
226
227 Ok(())
228}
229
230#[allow(clippy::too_many_lines)]
245async fn reconcile_namespace_clusters(
246 client: &Client,
247 cluster_provider: &ClusterBind9Provider,
248) -> Result<()> {
249 use crate::crd::{Bind9Cluster, Bind9ClusterSpec};
250 use crate::labels::{
251 BINDY_CLUSTER_LABEL, BINDY_MANAGED_BY_LABEL, MANAGED_BY_CLUSTER_BIND9_PROVIDER,
252 };
253 use kube::api::{ListParams, PostParams};
254 use std::collections::{BTreeMap, HashSet};
255
256 let cluster_provider_name = cluster_provider.name_any();
257
258 let target_namespace = cluster_provider.spec.namespace.as_ref().map_or_else(
260 || std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "bindy-system".to_string()),
261 std::clone::Clone::clone,
262 );
263
264 debug!(
265 "Reconciling namespace-scoped Bind9Cluster for global cluster {} in namespace {}",
266 cluster_provider_name, target_namespace
267 );
268
269 let instances_api: Api<Bind9Instance> = Api::all(client.clone());
271 let all_instances = instances_api.list(&ListParams::default()).await?;
272
273 let namespaces: HashSet<String> = all_instances
275 .items
276 .iter()
277 .filter(|inst| inst.spec.cluster_ref == cluster_provider_name)
278 .filter_map(kube::ResourceExt::namespace)
279 .collect();
280
281 let namespaces_to_reconcile: HashSet<String> = if namespaces.is_empty() {
283 let mut set = HashSet::new();
284 set.insert(target_namespace);
285 set
286 } else {
287 namespaces
288 };
289
290 debug!(
291 "Found {} namespace(s) needing Bind9Cluster for global cluster {}",
292 namespaces_to_reconcile.len(),
293 cluster_provider_name
294 );
295
296 for namespace in namespaces_to_reconcile {
298 let cluster_name = cluster_provider_name.clone();
300
301 info!(
302 "Creating/updating Bind9Cluster {}/{} for global cluster {}",
303 namespace, cluster_name, cluster_provider_name
304 );
305
306 let mut labels = BTreeMap::new();
308 labels.insert(
309 BINDY_MANAGED_BY_LABEL.to_string(),
310 MANAGED_BY_CLUSTER_BIND9_PROVIDER.to_string(),
311 );
312 labels.insert(
313 BINDY_CLUSTER_LABEL.to_string(),
314 cluster_provider_name.clone(),
315 );
316
317 let owner_ref = k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference {
319 api_version: API_GROUP_VERSION.to_string(),
320 kind: KIND_CLUSTER_BIND9_PROVIDER.to_string(),
321 name: cluster_provider_name.clone(),
322 uid: cluster_provider.metadata.uid.clone().unwrap_or_default(),
323 controller: Some(true),
324 block_owner_deletion: Some(true),
325 };
326
327 let cluster_spec = Bind9ClusterSpec {
329 common: cluster_provider.spec.common.clone(),
330 };
331
332 let cluster = Bind9Cluster {
333 metadata: kube::api::ObjectMeta {
334 name: Some(cluster_name.clone()),
335 namespace: Some(namespace.clone()),
336 labels: Some(labels),
337 owner_references: Some(vec![owner_ref]),
338 ..Default::default()
339 },
340 spec: cluster_spec,
341 status: None,
342 };
343
344 let api: Api<Bind9Cluster> = Api::namespaced(client.clone(), &namespace);
345
346 match api.create(&PostParams::default(), &cluster).await {
348 Ok(_) => {
349 info!(
350 "Successfully created Bind9Cluster {}/{}",
351 namespace, cluster_name
352 );
353 }
354 Err(e) => {
355 if e.to_string().contains("AlreadyExists") {
357 debug!(
358 "Bind9Cluster {}/{} already exists, patching with updated spec",
359 namespace, cluster_name
360 );
361
362 let patch = serde_json::json!({
364 "apiVersion": API_GROUP_VERSION,
365 "kind": KIND_BIND9_CLUSTER,
366 "metadata": {
367 "name": cluster_name,
368 "namespace": namespace,
369 "ownerReferences": cluster.metadata.owner_references,
370 },
371 "spec": cluster.spec,
372 });
373
374 match api
376 .patch(
377 &cluster_name,
378 &PatchParams::apply("bindy-controller").force(),
379 &Patch::Apply(&patch),
380 )
381 .await
382 {
383 Ok(_) => {
384 info!(
385 "Successfully patched Bind9Cluster {}/{} with updated spec",
386 namespace, cluster_name
387 );
388 }
389 Err(patch_err) => {
390 warn!(
391 "Failed to patch Bind9Cluster {}/{}: {}",
392 namespace, cluster_name, patch_err
393 );
394 return Err(patch_err.into());
395 }
396 }
397 } else {
398 warn!(
399 "Failed to create Bind9Cluster {}/{}: {}",
400 namespace, cluster_name, e
401 );
402 return Err(e.into());
403 }
404 }
405 }
406 }
407
408 Ok(())
409}
410
411async fn update_cluster_status(client: &Client, cluster: &ClusterBind9Provider) -> Result<()> {
417 let name = cluster.name_any();
418
419 let instances_api: Api<Bind9Instance> = Api::all(client.clone());
421 let lp = ListParams::default();
422 let all_instances = instances_api.list(&lp).await?;
423
424 let instances: Vec<_> = all_instances
426 .items
427 .into_iter()
428 .filter(|inst| inst.spec.cluster_ref == name)
429 .collect();
430
431 debug!(
432 "Found {} instances referencing ClusterBind9Provider {}",
433 instances.len(),
434 name
435 );
436
437 let new_status = calculate_cluster_status(&instances, cluster.metadata.generation);
439
440 let status_changed = if let Some(current_status) = &cluster.status {
442 if current_status.instance_count != new_status.instance_count
444 || current_status.ready_instances != new_status.ready_instances
445 {
446 true
447 } else if let Some(current_condition) = current_status.conditions.first() {
448 let new_condition = new_status.conditions.first();
450 match new_condition {
451 Some(new_cond) => {
452 current_condition.r#type != new_cond.r#type
453 || current_condition.status != new_cond.status
454 || current_condition.message != new_cond.message
455 }
456 None => true, }
458 } else {
459 !new_status.conditions.is_empty()
461 }
462 } else {
463 true
465 };
466
467 if !status_changed {
469 debug!(
470 "Status unchanged for ClusterBind9Provider {}, skipping patch",
471 name
472 );
473 return Ok(());
474 }
475
476 let api: Api<ClusterBind9Provider> = Api::all(client.clone());
478 let status_patch = json!({
479 "status": new_status
480 });
481
482 api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status_patch))
483 .await?;
484
485 debug!("Updated status for ClusterBind9Provider: {}", name);
486 Ok(())
487}
488
489#[must_use]
500pub fn calculate_cluster_status(
501 instances: &[Bind9Instance],
502 generation: Option<i64>,
503) -> Bind9ClusterStatus {
504 let now = Utc::now();
505
506 let ready_instances = instances
508 .iter()
509 .filter(|inst| {
510 inst.status
511 .as_ref()
512 .and_then(|s| s.conditions.iter().find(|c| c.r#type == "Ready"))
513 .is_some_and(|c| c.status == "True")
514 })
515 .count();
516
517 let total_instances = instances.len();
518
519 let (status, reason, message) = if total_instances == 0 {
521 (
522 "False",
523 REASON_NO_CHILDREN,
524 "No instances found for this cluster".to_string(),
525 )
526 } else if ready_instances == total_instances {
527 (
528 "True",
529 REASON_ALL_READY,
530 format!("All {total_instances} instances are ready"),
531 )
532 } else if ready_instances > 0 {
533 (
534 "False",
535 REASON_PARTIALLY_READY,
536 format!("{ready_instances}/{total_instances} instances are ready"),
537 )
538 } else {
539 (
540 "False",
541 REASON_NOT_READY,
542 "No instances are ready".to_string(),
543 )
544 };
545
546 let instance_names: Vec<String> = instances
548 .iter()
549 .map(|inst| {
550 let name = inst.name_any();
551 let ns = inst.namespace().unwrap_or_default();
552 format!("{ns}/{name}")
553 })
554 .collect();
555
556 Bind9ClusterStatus {
557 conditions: vec![Condition {
558 r#type: CONDITION_TYPE_READY.to_string(),
559 status: status.to_string(),
560 reason: Some(reason.to_string()),
561 message: Some(message.clone()),
562 last_transition_time: Some(now.to_rfc3339()),
563 }],
564 instances: instance_names,
565 observed_generation: generation,
566 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
567 instance_count: Some(total_instances as i32),
568 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
569 ready_instances: Some(ready_instances as i32),
570 }
571}
572
573async fn detect_cluster_drift(
593 client: &Client,
594 cluster_provider: &ClusterBind9Provider,
595) -> Result<bool> {
596 use crate::crd::Bind9Cluster;
597 use crate::labels::{
598 BINDY_CLUSTER_LABEL, BINDY_MANAGED_BY_LABEL, MANAGED_BY_CLUSTER_BIND9_PROVIDER,
599 };
600 use kube::api::ListParams;
601
602 let cluster_provider_name = cluster_provider.name_any();
603
604 let target_namespace = cluster_provider.spec.namespace.as_ref().map_or_else(
606 || std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "bindy-system".to_string()),
607 std::clone::Clone::clone,
608 );
609
610 let clusters_api: Api<Bind9Cluster> = Api::namespaced(client.clone(), &target_namespace);
612 let clusters = clusters_api.list(&ListParams::default()).await?;
613
614 let managed_clusters: Vec<_> = clusters
616 .items
617 .into_iter()
618 .filter(|cluster| {
619 cluster.metadata.labels.as_ref().is_some_and(|labels| {
620 labels.get(BINDY_MANAGED_BY_LABEL)
621 == Some(&MANAGED_BY_CLUSTER_BIND9_PROVIDER.to_string())
622 && labels.get(BINDY_CLUSTER_LABEL) == Some(&cluster_provider_name.clone())
623 })
624 })
625 .collect();
626
627 let expected_count = 1;
629 let actual_count = managed_clusters.len();
630
631 if actual_count != expected_count {
633 info!(
634 "Cluster count drift detected for ClusterBind9Provider {}: expected {} Bind9Cluster in namespace {}, found {}",
635 cluster_provider_name, expected_count, target_namespace, actual_count
636 );
637 return Ok(true);
638 }
639
640 if let Some(managed_cluster) = managed_clusters.first() {
642 let desired_common_spec = &cluster_provider.spec.common;
644 let actual_common_spec = &managed_cluster.spec.common;
645
646 if desired_common_spec != actual_common_spec {
648 info!(
649 "Cluster spec drift detected for ClusterBind9Provider {} in namespace {}: \
650 managed Bind9Cluster spec differs from desired spec",
651 cluster_provider_name, target_namespace
652 );
653 return Ok(true);
654 }
655 }
656
657 Ok(false)
659}
660
661pub async fn delete_clusterbind9provider(
680 ctx: Arc<Context>,
681 cluster: ClusterBind9Provider,
682) -> Result<()> {
683 let name = cluster.name_any();
684 info!("Deleting ClusterBind9Provider: {}", name);
685
686 Box::pin(reconcile_clusterbind9provider(ctx, cluster)).await
688}