bindy/reconcilers/records/
status_helpers.rs1#[allow(clippy::wildcard_imports)]
7use super::types::*;
8
9pub(super) async fn create_event<T>(
10 client: &Client,
11 record: &T,
12 event_type: &str,
13 reason: &str,
14 message: &str,
15) -> Result<()>
16where
17 T: Resource<DynamicType = ()> + ResourceExt,
18{
19 let namespace = record.namespace().unwrap_or_default();
20 let name = record.name_any();
21 let event_api: Api<Event> = Api::namespaced(client.clone(), &namespace);
22
23 let now = Time(k8s_openapi::jiff::Timestamp::now());
24 let event = Event {
25 metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
26 generate_name: Some(format!("{name}-")),
27 namespace: Some(namespace.clone()),
28 ..Default::default()
29 },
30 involved_object: ObjectReference {
31 api_version: Some(T::api_version(&()).to_string()),
32 kind: Some(T::kind(&()).to_string()),
33 name: Some(name.clone()),
34 namespace: Some(namespace),
35 uid: record.meta().uid.clone(),
36 ..Default::default()
37 },
38 reason: Some(reason.to_string()),
39 message: Some(message.to_string()),
40 type_: Some(event_type.to_string()),
41 first_timestamp: Some(now.clone()),
42 last_timestamp: Some(now),
43 count: Some(1),
44 ..Default::default()
45 };
46
47 match event_api.create(&PostParams::default(), &event).await {
48 Ok(_) => Ok(()),
49 Err(e) => {
50 warn!("Failed to create event for {}: {}", name, e);
51 Ok(()) }
53 }
54}
55
56#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
77pub(super) async fn update_record_status<T>(
78 client: &Client,
79 record: &T,
80 condition_type: &str,
81 status: &str,
82 reason: &str,
83 message: &str,
84 observed_generation: Option<i64>,
85 record_hash: Option<String>,
86 last_updated: Option<String>,
87 addresses: Option<String>,
88) -> Result<()>
89where
90 T: Resource<DynamicType = (), Scope = k8s_openapi::NamespaceResourceScope>
91 + ResourceExt
92 + Clone
93 + std::fmt::Debug
94 + serde::Serialize
95 + for<'de> serde::Deserialize<'de>,
96{
97 let namespace = record.namespace().unwrap_or_default();
98 let name = record.name_any();
99 let api: Api<T> = Api::namespaced(client.clone(), &namespace);
100
101 let current = api
103 .get(&name)
104 .await
105 .context("Failed to fetch current resource")?;
106
107 let current_json = serde_json::to_value(¤t)?;
110 let needs_update = if let Some(current_status) = current_json.get("status") {
111 if let Some(observed_gen) = current_status.get("observedGeneration") {
112 if observed_gen == &json!(record.meta().generation) {
114 if let Some(conditions) =
115 current_status.get("conditions").and_then(|c| c.as_array())
116 {
117 let matching_condition = conditions.iter().find(|cond| {
119 cond.get("type").and_then(|t| t.as_str()) == Some(condition_type)
120 });
121
122 if let Some(cond) = matching_condition {
123 let status_matches =
124 cond.get("status").and_then(|s| s.as_str()) == Some(status);
125 let reason_matches =
126 cond.get("reason").and_then(|r| r.as_str()) == Some(reason);
127 let message_matches =
128 cond.get("message").and_then(|m| m.as_str()) == Some(message);
129 !(status_matches && reason_matches && message_matches)
131 } else {
132 true }
134 } else {
135 true }
137 } else {
138 true }
140 } else {
141 true }
143 } else {
144 true };
146
147 if !needs_update {
148 return Ok(());
150 }
151
152 let last_transition_time = if let Some(current_status) = current_json.get("status") {
154 if let Some(conditions) = current_status.get("conditions").and_then(|c| c.as_array()) {
155 let matching_condition = conditions
157 .iter()
158 .find(|cond| cond.get("type").and_then(|t| t.as_str()) == Some(condition_type));
159
160 if let Some(cond) = matching_condition {
161 let status_changed = cond.get("status").and_then(|s| s.as_str()) != Some(status);
162 if status_changed {
163 Utc::now().to_rfc3339()
165 } else {
166 cond.get("lastTransitionTime")
168 .and_then(|t| t.as_str())
169 .unwrap_or(&Utc::now().to_rfc3339())
170 .to_string()
171 }
172 } else {
173 Utc::now().to_rfc3339()
175 }
176 } else {
177 Utc::now().to_rfc3339()
178 }
179 } else {
180 Utc::now().to_rfc3339()
181 };
182
183 let condition = Condition {
184 r#type: condition_type.to_string(),
185 status: status.to_string(),
186 reason: Some(reason.to_string()),
187 message: Some(message.to_string()),
188 last_transition_time: Some(last_transition_time),
189 };
190
191 let zone = current_json
193 .get("status")
194 .and_then(|s| s.get("zone"))
195 .and_then(|z| z.as_str())
196 .map(ToString::to_string);
197
198 let zone_ref = current_json
200 .get("status")
201 .and_then(|s| s.get("zoneRef"))
202 .and_then(|z| serde_json::from_value::<crate::crd::ZoneReference>(z.clone()).ok());
203
204 let status_addresses = addresses.or_else(|| {
206 current_json
207 .get("status")
208 .and_then(|s| s.get("addresses"))
209 .and_then(|a| a.as_str())
210 .map(ToString::to_string)
211 });
212
213 #[allow(deprecated)] let record_status = RecordStatus {
215 conditions: vec![condition],
216 observed_generation: observed_generation.or(record.meta().generation),
217 zone,
218 zone_ref, record_hash,
220 last_updated,
221 addresses: status_addresses, };
223
224 let status_patch = json!({
225 "status": record_status
226 });
227
228 api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status_patch))
229 .await
230 .context("Failed to update record status")?;
231
232 info!(
233 "Updated status for {}/{}: {} = {}",
234 namespace, name, condition_type, status
235 );
236
237 let event_type = if status == "True" {
239 "Normal"
240 } else {
241 "Warning"
242 };
243 create_event(client, record, event_type, reason, message).await?;
244
245 Ok(())
246}