1#[allow(clippy::wildcard_imports)]
10use super::types::*;
11use crate::constants::{API_GROUP_VERSION, KIND_BIND9_CLUSTER, KIND_BIND9_INSTANCE};
12use crate::reconcilers::pagination::list_all_paginated;
13
14#[allow(clippy::too_many_lines)]
30pub(super) async fn reconcile_managed_instances(
31 ctx: &Context,
32 cluster: &Bind9Cluster,
33) -> Result<()> {
34 let client = ctx.client.clone();
35 let namespace = cluster.namespace().unwrap_or_default();
36 let cluster_name = cluster.name_any();
37
38 info!(
39 "Reconciling managed instances for cluster {}/{}",
40 namespace, cluster_name
41 );
42
43 let primary_replicas = cluster
45 .spec
46 .common
47 .primary
48 .as_ref()
49 .and_then(|p| p.replicas)
50 .unwrap_or(0);
51
52 let secondary_replicas = cluster
53 .spec
54 .common
55 .secondary
56 .as_ref()
57 .and_then(|s| s.replicas)
58 .unwrap_or(0);
59
60 debug!(
61 "Desired replicas: {} primary, {} secondary",
62 primary_replicas, secondary_replicas
63 );
64
65 if primary_replicas == 0 && secondary_replicas == 0 {
66 debug!(
67 "No instances requested for cluster {}/{}",
68 namespace, cluster_name
69 );
70 return Ok(());
71 }
72
73 let api: Api<Bind9Instance> = Api::namespaced(client.clone(), &namespace);
75 let instances = list_all_paginated(&api, ListParams::default()).await?;
76
77 let managed_instances: Vec<_> = instances
79 .into_iter()
80 .filter(|instance| {
81 instance.metadata.labels.as_ref().is_some_and(|labels| {
83 labels.get(BINDY_MANAGED_BY_LABEL) == Some(&MANAGED_BY_BIND9_CLUSTER.to_string())
84 && labels.get(BINDY_CLUSTER_LABEL) == Some(&cluster_name)
85 })
86 })
87 .collect();
88
89 debug!(
90 "Found {} managed instances for cluster {}/{}",
91 managed_instances.len(),
92 namespace,
93 cluster_name
94 );
95
96 let existing_primary: Vec<_> = managed_instances
98 .iter()
99 .filter(|i| i.spec.role == ServerRole::Primary)
100 .collect();
101
102 let existing_secondary: Vec<_> = managed_instances
103 .iter()
104 .filter(|i| i.spec.role == ServerRole::Secondary)
105 .collect();
106
107 debug!(
108 "Existing instances: {} primary, {} secondary",
109 existing_primary.len(),
110 existing_secondary.len()
111 );
112
113 let owner_ref = k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference {
115 api_version: API_GROUP_VERSION.to_string(),
116 kind: KIND_BIND9_CLUSTER.to_string(),
117 name: cluster_name.clone(),
118 uid: cluster.metadata.uid.clone().unwrap_or_default(),
119 controller: Some(true),
120 block_owner_deletion: Some(true),
121 };
122
123 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
127 let mut primaries_to_create = 0;
128 {
129 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
131 let desired_primary_names: std::collections::HashSet<String> = (0..(primary_replicas
132 as usize))
133 .map(|i| format!("{cluster_name}-primary-{i}"))
134 .collect();
135
136 let existing_primary_names: std::collections::HashSet<String> = existing_primary
138 .iter()
139 .map(|instance| instance.name_any())
140 .collect();
141
142 let missing_primaries: Vec<_> = desired_primary_names
144 .difference(&existing_primary_names)
145 .collect();
146
147 for instance_name in missing_primaries {
149 let index = instance_name
151 .rsplit('-')
152 .next()
153 .and_then(|s| s.parse::<usize>().ok())
154 .unwrap_or(0);
155
156 create_managed_instance_with_owner(
157 &client,
158 &namespace,
159 &cluster_name,
160 ServerRole::Primary,
161 index,
162 &cluster.spec.common,
163 Some(owner_ref.clone()),
164 )
165 .await?;
166 primaries_to_create += 1;
167 }
168 }
169
170 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
172 let primaries_to_delete = existing_primary
173 .len()
174 .saturating_sub(primary_replicas as usize);
175 if primaries_to_delete > 0 {
176 let mut sorted_primary: Vec<_> = existing_primary.iter().collect();
178 sorted_primary.sort_by_key(|instance| {
179 instance
180 .metadata
181 .annotations
182 .as_ref()
183 .and_then(|a| a.get(BINDY_INSTANCE_INDEX_ANNOTATION))
184 .and_then(|idx| idx.parse::<usize>().ok())
185 .unwrap_or(0)
186 });
187 sorted_primary.reverse();
188
189 for instance in sorted_primary.iter().take(primaries_to_delete) {
190 let instance_name = instance.name_any();
191 delete_managed_instance(&client, &namespace, &instance_name).await?;
192 }
193 }
194
195 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
199 let mut secondaries_to_create = 0;
200 {
201 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
203 let desired_secondary_names: std::collections::HashSet<String> = (0..(secondary_replicas
204 as usize))
205 .map(|i| format!("{cluster_name}-secondary-{i}"))
206 .collect();
207
208 let existing_secondary_names: std::collections::HashSet<String> = existing_secondary
210 .iter()
211 .map(|instance| instance.name_any())
212 .collect();
213
214 let missing_secondaries: Vec<_> = desired_secondary_names
216 .difference(&existing_secondary_names)
217 .collect();
218
219 for instance_name in missing_secondaries {
221 let index = instance_name
223 .rsplit('-')
224 .next()
225 .and_then(|s| s.parse::<usize>().ok())
226 .unwrap_or(0);
227
228 create_managed_instance_with_owner(
229 &client,
230 &namespace,
231 &cluster_name,
232 ServerRole::Secondary,
233 index,
234 &cluster.spec.common,
235 Some(owner_ref.clone()),
236 )
237 .await?;
238 secondaries_to_create += 1;
239 }
240 }
241
242 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
244 let secondaries_to_delete = existing_secondary
245 .len()
246 .saturating_sub(secondary_replicas as usize);
247 if secondaries_to_delete > 0 {
248 let mut sorted_secondary: Vec<_> = existing_secondary.iter().collect();
250 sorted_secondary.sort_by_key(|instance| {
251 instance
252 .metadata
253 .annotations
254 .as_ref()
255 .and_then(|a| a.get(BINDY_INSTANCE_INDEX_ANNOTATION))
256 .and_then(|idx| idx.parse::<usize>().ok())
257 .unwrap_or(0)
258 });
259 sorted_secondary.reverse();
260
261 for instance in sorted_secondary.iter().take(secondaries_to_delete) {
262 let instance_name = instance.name_any();
263 delete_managed_instance(&client, &namespace, &instance_name).await?;
264 }
265 }
266
267 if primaries_to_create > 0
268 || secondaries_to_create > 0
269 || primaries_to_delete > 0
270 || secondaries_to_delete > 0
271 {
272 info!(
273 "Scaled cluster {}/{}: created {} primary, {} secondary; deleted {} primary, {} secondary",
274 namespace,
275 cluster_name,
276 primaries_to_create,
277 secondaries_to_create,
278 primaries_to_delete,
279 secondaries_to_delete
280 );
281 } else {
282 debug!(
283 "Cluster {}/{} already at desired scale",
284 namespace, cluster_name
285 );
286 }
287
288 update_existing_managed_instances(
290 &client,
291 &namespace,
292 &cluster_name,
293 &cluster.spec.common,
294 &managed_instances,
295 )
296 .await?;
297
298 ensure_managed_instance_resources(&client, cluster, &managed_instances).await?;
300
301 Ok(())
302}
303
304pub(super) async fn update_existing_managed_instances(
324 client: &Client,
325 namespace: &str,
326 cluster_name: &str,
327 common_spec: &crate::crd::Bind9ClusterCommonSpec,
328 managed_instances: &[Bind9Instance],
329) -> Result<()> {
330 if managed_instances.is_empty() {
331 return Ok(());
332 }
333
334 let instance_api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
335 let mut updated_count = 0;
336
337 for instance in managed_instances {
338 let instance_name = instance.name_any();
339
340 let desired_bindcar_config = common_spec
342 .global
343 .as_ref()
344 .and_then(|g| g.bindcar_config.clone());
345
346 let needs_update = instance.spec.version != common_spec.version
348 || instance.spec.image != common_spec.image
349 || instance.spec.config_map_refs != common_spec.config_map_refs
350 || instance.spec.volumes != common_spec.volumes
351 || instance.spec.volume_mounts != common_spec.volume_mounts
352 || instance.spec.bindcar_config != desired_bindcar_config;
353
354 if needs_update {
355 debug!(
356 "Instance {}/{} spec differs from cluster spec, updating",
357 namespace, instance_name
358 );
359
360 #[allow(deprecated)]
362 let updated_spec = Bind9InstanceSpec {
364 cluster_ref: instance.spec.cluster_ref.clone(),
365 role: instance.spec.role.clone(),
366 replicas: instance.spec.replicas, version: common_spec.version.clone(),
368 image: common_spec.image.clone(),
369 config_map_refs: common_spec.config_map_refs.clone(),
370 config: None, primary_servers: instance.spec.primary_servers.clone(), volumes: common_spec.volumes.clone(),
373 volume_mounts: common_spec.volume_mounts.clone(),
374 rndc_secret_ref: instance.spec.rndc_secret_ref.clone(), rndc_key: instance.spec.rndc_key.clone(), storage: instance.spec.storage.clone(), bindcar_config: desired_bindcar_config,
378 };
379
380 let patch = serde_json::json!({
382 "apiVersion": API_GROUP_VERSION,
383 "kind": KIND_BIND9_INSTANCE,
384 "metadata": {
385 "name": instance_name,
386 "namespace": namespace,
387 },
388 "spec": updated_spec,
389 });
390
391 match instance_api
392 .patch(
393 &instance_name,
394 &PatchParams::apply("bindy-controller").force(),
395 &Patch::Apply(&patch),
396 )
397 .await
398 {
399 Ok(_) => {
400 info!(
401 "Updated managed instance {}/{} to match cluster spec",
402 namespace, instance_name
403 );
404 updated_count += 1;
405 }
406 Err(e) => {
407 error!(
408 "Failed to update managed instance {}/{}: {}",
409 namespace, instance_name, e
410 );
411 return Err(e.into());
412 }
413 }
414 } else {
415 debug!(
416 "Instance {}/{} spec matches cluster spec, no update needed",
417 namespace, instance_name
418 );
419 }
420 }
421
422 if updated_count > 0 {
423 info!(
424 "Updated {} managed instances in cluster {}/{} to match current spec",
425 updated_count, namespace, cluster_name
426 );
427 }
428
429 Ok(())
430}
431
432pub(super) async fn ensure_managed_instance_resources(
448 client: &Client,
449 cluster: &Bind9Cluster,
450 managed_instances: &[Bind9Instance],
451) -> Result<()> {
452 let namespace = cluster.namespace().unwrap_or_default();
453 let cluster_name = cluster.name_any();
454
455 if managed_instances.is_empty() {
456 return Ok(());
457 }
458
459 debug!(
460 "Ensuring child resources exist for {} managed instances in cluster {}/{}",
461 managed_instances.len(),
462 namespace,
463 cluster_name
464 );
465
466 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
467 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &namespace);
468 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
469 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
470 let instance_api: Api<Bind9Instance> = Api::namespaced(client.clone(), &namespace);
471
472 let cluster_configmap_name = format!("{cluster_name}-config");
474
475 for instance in managed_instances {
476 let instance_name = instance.name_any();
477 let mut missing_resources = Vec::new();
478
479 if configmap_api.get(&cluster_configmap_name).await.is_err() {
481 missing_resources.push("ConfigMap");
482 }
483
484 let secret_name = format!("{instance_name}-rndc-key");
486 if secret_api.get(&secret_name).await.is_err() {
487 missing_resources.push("Secret");
488 }
489
490 if service_api.get(&instance_name).await.is_err() {
492 missing_resources.push("Service");
493 }
494
495 if deployment_api.get(&instance_name).await.is_err() {
497 missing_resources.push("Deployment");
498 }
499
500 if missing_resources.is_empty() {
502 debug!(
503 "All child resources exist for managed instance {}/{}",
504 namespace, instance_name
505 );
506 } else {
507 warn!(
508 "Missing resources for managed instance {}/{}: {}. Triggering reconciliation.",
509 namespace,
510 instance_name,
511 missing_resources.join(", ")
512 );
513
514 let patch = json!({
516 "metadata": {
517 "annotations": {
518 BINDY_RECONCILE_TRIGGER_ANNOTATION: Utc::now().to_rfc3339()
519 }
520 }
521 });
522
523 instance_api
524 .patch(
525 &instance_name,
526 &PatchParams::apply("bindy-cluster-controller"),
527 &Patch::Merge(&patch),
528 )
529 .await?;
530
531 info!(
532 "Triggered reconciliation for instance {}/{} to recreate: {}",
533 namespace,
534 instance_name,
535 missing_resources.join(", ")
536 );
537 }
538 }
539
540 Ok(())
541}
542
543#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
561pub async fn create_managed_instance(
562 client: &Client,
563 namespace: &str,
564 cluster_name: &str,
565 role: ServerRole,
566 index: usize,
567 common_spec: &crate::crd::Bind9ClusterCommonSpec,
568 _is_global: bool,
569) -> Result<()> {
570 create_managed_instance_with_owner(
571 client,
572 namespace,
573 cluster_name,
574 role,
575 index,
576 common_spec,
577 None, )
579 .await
580}
581
582#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
591async fn create_managed_instance_with_owner(
592 client: &Client,
593 namespace: &str,
594 cluster_name: &str,
595 role: ServerRole,
596 index: usize,
597 common_spec: &crate::crd::Bind9ClusterCommonSpec,
598 owner_ref: Option<k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference>,
599) -> Result<()> {
600 let role_str = match role {
601 ServerRole::Primary => ROLE_PRIMARY,
602 ServerRole::Secondary => ROLE_SECONDARY,
603 };
604
605 let instance_name = format!("{cluster_name}-{role_str}-{index}");
606
607 info!(
608 "Creating managed instance {}/{} for cluster {} (role: {:?}, index: {})",
609 namespace, instance_name, cluster_name, role, index
610 );
611
612 let mut labels = BTreeMap::new();
614 labels.insert(
615 BINDY_MANAGED_BY_LABEL.to_string(),
616 MANAGED_BY_BIND9_CLUSTER.to_string(),
617 );
618 labels.insert(BINDY_CLUSTER_LABEL.to_string(), cluster_name.to_string());
619 labels.insert(BINDY_ROLE_LABEL.to_string(), role_str.to_string());
620 labels.insert(K8S_PART_OF.to_string(), PART_OF_BINDY.to_string());
621
622 match role {
624 ServerRole::Primary => {
625 if let Some(primary_config) = &common_spec.primary {
626 if let Some(custom_labels) = &primary_config.labels {
627 for (key, value) in custom_labels {
628 labels.insert(key.clone(), value.clone());
629 }
630 }
631 }
632 }
633 ServerRole::Secondary => {
634 if let Some(secondary_config) = &common_spec.secondary {
635 if let Some(custom_labels) = &secondary_config.labels {
636 for (key, value) in custom_labels {
637 labels.insert(key.clone(), value.clone());
638 }
639 }
640 }
641 }
642 }
643
644 let mut annotations = BTreeMap::new();
646 annotations.insert(
647 BINDY_INSTANCE_INDEX_ANNOTATION.to_string(),
648 index.to_string(),
649 );
650
651 #[allow(deprecated)] let instance_spec = Bind9InstanceSpec {
654 cluster_ref: cluster_name.to_string(),
655 role,
656 replicas: Some(1), version: common_spec.version.clone(),
658 image: common_spec.image.clone(),
659 config_map_refs: common_spec.config_map_refs.clone(),
660 config: None, primary_servers: None, volumes: common_spec.volumes.clone(),
663 volume_mounts: common_spec.volume_mounts.clone(),
664 rndc_secret_ref: None, rndc_key: None, storage: None, bindcar_config: common_spec
668 .global
669 .as_ref()
670 .and_then(|g| g.bindcar_config.clone()),
671 };
672
673 let instance = Bind9Instance {
674 metadata: ObjectMeta {
675 name: Some(instance_name.clone()),
676 namespace: Some(namespace.to_string()),
677 labels: Some(labels.clone()),
678 annotations: Some(annotations),
679 owner_references: owner_ref.map(|r| vec![r]),
680 ..Default::default()
681 },
682 spec: instance_spec,
683 status: None,
684 };
685
686 let api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
687
688 match api.create(&PostParams::default(), &instance).await {
689 Ok(_) => {
690 info!(
691 "Successfully created managed instance {}/{}",
692 namespace, instance_name
693 );
694 Ok(())
695 }
696 Err(e) => {
697 if e.to_string().contains("AlreadyExists") {
699 debug!(
700 "Managed instance {}/{} already exists, patching with updated spec",
701 namespace, instance_name
702 );
703
704 let labels_json: serde_json::Map<String, serde_json::Value> = labels
707 .iter()
708 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
709 .collect();
710
711 let patch = serde_json::json!({
712 "apiVersion": API_GROUP_VERSION,
713 "kind": KIND_BIND9_INSTANCE,
714 "metadata": {
715 "name": instance_name,
716 "namespace": namespace,
717 "labels": labels_json,
718 "annotations": {
719 BINDY_INSTANCE_INDEX_ANNOTATION: index.to_string(),
720 },
721 "ownerReferences": instance.metadata.owner_references,
722 },
723 "spec": instance.spec,
724 });
725
726 match api
728 .patch(
729 &instance_name,
730 &PatchParams::apply("bindy-controller").force(),
731 &Patch::Apply(&patch),
732 )
733 .await
734 {
735 Ok(_) => {
736 info!(
737 "Successfully patched managed instance {}/{} with updated spec",
738 namespace, instance_name
739 );
740 Ok(())
741 }
742 Err(patch_err) => {
743 error!(
744 "Failed to patch managed instance {}/{}: {}",
745 namespace, instance_name, patch_err
746 );
747 Err(patch_err.into())
748 }
749 }
750 } else {
751 error!(
752 "Failed to create managed instance {}/{}: {}",
753 namespace, instance_name, e
754 );
755 Err(e.into())
756 }
757 }
758 }
759}
760
761pub async fn delete_managed_instance(
775 client: &Client,
776 namespace: &str,
777 instance_name: &str,
778) -> Result<()> {
779 let api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
780
781 match api.delete(instance_name, &DeleteParams::default()).await {
782 Ok(_) => {
783 info!(
784 "Successfully deleted managed instance {}/{}",
785 namespace, instance_name
786 );
787 Ok(())
788 }
789 Err(e) if e.to_string().contains("NotFound") => {
790 debug!(
791 "Managed instance {}/{} already deleted",
792 namespace, instance_name
793 );
794 Ok(())
795 }
796 Err(e) => {
797 error!(
798 "Failed to delete managed instance {}/{}: {}",
799 namespace, instance_name, e
800 );
801 Err(e.into())
802 }
803 }
804}
805
806pub(super) async fn delete_cluster_instances(
820 client: &Client,
821 namespace: &str,
822 cluster_name: &str,
823) -> Result<()> {
824 let api: Api<Bind9Instance> = Api::namespaced(client.clone(), namespace);
825
826 info!(
827 "Finding all Bind9Instance resources for cluster {}/{}",
828 namespace, cluster_name
829 );
830
831 let instances = list_all_paginated(&api, ListParams::default()).await?;
833
834 let cluster_instances: Vec<_> = instances
836 .into_iter()
837 .filter(|instance| instance.spec.cluster_ref == cluster_name)
838 .collect();
839
840 if cluster_instances.is_empty() {
841 info!(
842 "No Bind9Instance resources found for cluster {}/{}",
843 namespace, cluster_name
844 );
845 return Ok(());
846 }
847
848 info!(
849 "Found {} Bind9Instance resources for cluster {}/{}, deleting...",
850 cluster_instances.len(),
851 namespace,
852 cluster_name
853 );
854
855 for instance in cluster_instances {
857 let instance_name = instance.name_any();
858 info!(
859 "Deleting Bind9Instance {}/{} (clusterRef: {})",
860 namespace, instance_name, cluster_name
861 );
862
863 match api.delete(&instance_name, &DeleteParams::default()).await {
864 Ok(_) => {
865 info!(
866 "Successfully deleted Bind9Instance {}/{}",
867 namespace, instance_name
868 );
869 }
870 Err(e) => {
871 if e.to_string().contains("NotFound") {
873 warn!(
874 "Bind9Instance {}/{} already deleted",
875 namespace, instance_name
876 );
877 } else {
878 error!(
879 "Failed to delete Bind9Instance {}/{}: {}",
880 namespace, instance_name, e
881 );
882 return Err(e.into());
883 }
884 }
885 }
886 }
887
888 info!(
889 "Successfully deleted all Bind9Instance resources for cluster {}/{}",
890 namespace, cluster_name
891 );
892
893 Ok(())
894}
895
896pub async fn delete_bind9cluster(_client: Client, _cluster: Bind9Cluster) -> Result<()> {
905 Ok(())
907}
908
909#[cfg(test)]
910#[path = "instances_tests.rs"]
911mod instances_tests;