bindy/reconcilers/
pagination.rs

1// Copyright (c) 2025 Erick Bourgeois, firestoned
2// SPDX-License-Identifier: MIT
3
4//! Pagination helpers for Kubernetes API list operations.
5//!
6//! This module provides utilities for efficiently listing large resource sets
7//! by fetching them in pages, reducing memory usage and API server load.
8
9use crate::constants::KUBE_LIST_PAGE_SIZE;
10use anyhow::Result;
11use kube::{api::ListParams, Api, Resource};
12use serde::de::DeserializeOwned;
13use std::fmt::Debug;
14use tracing::{debug, error};
15
16/// Maximum number of pages to fetch before aborting pagination.
17///
18/// This safety limit prevents infinite loops in case of Kubernetes API bugs
19/// where the continue token never becomes None or repeats indefinitely.
20/// With 100 items per page, 10,000 pages = 1,000,000 resources maximum.
21const MAX_REASONABLE_PAGES: usize = 10_000;
22
23/// List all resources with automatic pagination.
24///
25/// Fetches resources in pages to reduce memory usage and API server load.
26/// This is especially important when listing hundreds or thousands of resources
27/// (e.g., 1000+ `DNSZone`s per namespace).
28///
29/// # Arguments
30///
31/// * `api` - Kubernetes API client for the resource type
32/// * `list_params` - Base list parameters (labels, fields, etc.)
33///
34/// # Returns
35///
36/// Vector of all resources, fetched in pages
37///
38/// # Example
39///
40/// ```no_run
41/// use kube::{Api, Client, api::ListParams};
42/// use bindy::crd::DNSZone;
43/// use bindy::reconcilers::pagination::list_all_paginated;
44///
45/// # async fn example() -> anyhow::Result<()> {
46/// let client = Client::try_default().await?;
47/// let api: Api<DNSZone> = Api::namespaced(client, "default");
48///
49/// let zones = list_all_paginated(&api, ListParams::default()).await?;
50/// println!("Found {} zones", zones.len());
51/// # Ok(())
52/// # }
53/// ```
54///
55/// # Errors
56///
57/// Returns an error if Kubernetes API operations fail.
58pub async fn list_all_paginated<K>(api: &Api<K>, mut list_params: ListParams) -> Result<Vec<K>>
59where
60    K: Resource<DynamicType = ()> + Clone + DeserializeOwned + Debug,
61    K::DynamicType: Default,
62{
63    // Configure pagination
64    list_params.limit = Some(KUBE_LIST_PAGE_SIZE);
65
66    let mut all_items = Vec::new();
67    let mut page_count = 0;
68    let mut last_continue_token: Option<String> = None;
69
70    loop {
71        page_count += 1;
72
73        // Log current pagination state for debugging
74        debug!(
75            page = page_count,
76            continue_token = ?list_params.continue_token,
77            limit = ?list_params.limit,
78            "About to fetch page from Kubernetes API"
79        );
80
81        let result = api.list(&list_params).await?;
82
83        let item_count = result.items.len();
84
85        // CRITICAL: Treat empty string continue tokens as None
86        // The Kubernetes API sometimes returns Some("") instead of None for the last page
87        let new_continue_token = result
88            .metadata
89            .continue_
90            .clone()
91            .filter(|token| !token.is_empty());
92
93        // CRITICAL: Check if we're stuck in an infinite loop (same continue token repeated)
94        if let Some(ref new_token) = new_continue_token {
95            if last_continue_token.as_ref() == Some(new_token) {
96                error!(
97                    page = page_count,
98                    continue_token = ?new_token,
99                    items_in_page = item_count,
100                    "PAGINATION INFINITE LOOP DETECTED: Same continue token returned twice! Breaking loop to prevent infinite paging."
101                );
102                break;
103            }
104        }
105
106        // CRITICAL: Check for empty page with continue token (API bug)
107        if item_count == 0 && new_continue_token.is_some() {
108            error!(
109                page = page_count,
110                continue_token = ?new_continue_token,
111                total_items = all_items.len(),
112                "PAGINATION API BUG DETECTED: Received 0 items but got a continue token! Breaking loop to prevent infinite paging."
113            );
114            break;
115        }
116
117        all_items.extend(result.items);
118
119        debug!(
120            page = page_count,
121            items_in_page = item_count,
122            total_items = all_items.len(),
123            continue_token = ?new_continue_token,
124            "Fetched page from Kubernetes API"
125        );
126
127        // Check if there are more pages
128        if let Some(continue_token) = new_continue_token.clone() {
129            last_continue_token = Some(continue_token.clone());
130            list_params.continue_token = Some(continue_token);
131        } else {
132            debug!(
133                page = page_count,
134                total_items = all_items.len(),
135                "No continue token - pagination complete"
136            );
137            break;
138        }
139
140        // Safety check: Prevent infinite loops if we somehow exceed a reasonable page count
141        if page_count >= MAX_REASONABLE_PAGES {
142            error!(
143                page = page_count,
144                total_items = all_items.len(),
145                "PAGINATION SAFETY LIMIT EXCEEDED: More than {} pages fetched! Breaking loop to prevent infinite paging. This indicates a serious bug.",
146                MAX_REASONABLE_PAGES
147            );
148            break;
149        }
150    }
151
152    debug!(
153        total_pages = page_count,
154        total_items = all_items.len(),
155        "Completed paginated list operation"
156    );
157
158    Ok(all_items)
159}
160
161#[cfg(test)]
162#[path = "pagination_tests.rs"]
163mod pagination_tests;