openzeppelin_relayer/queues/sqs/
worker.rs

1//! SQS worker implementation for polling and processing messages.
2//!
3//! This module provides worker tasks that poll SQS queues and process jobs
4//! using the existing handler functions.
5
6use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14    DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::metrics::observe_queue_pickup_latency;
23use crate::queues::{backoff_config_for_queue, retry_delay_secs};
24use crate::{
25    config::ServerConfig,
26    jobs::{
27        notification_handler, relayer_health_check_handler, token_swap_request_handler,
28        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
29        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30        TransactionSend, TransactionStatusCheck,
31    },
32    utils::{aws_error::DisplayErrorContext, classify_sdk_error},
33};
34
35use super::{HandlerError, WorkerContext};
36use super::{QueueBackendError, QueueType, WorkerHandle};
37
38#[derive(Debug)]
39enum ProcessingError {
40    Retryable(String),
41    Permanent(String),
42}
43
44/// Outcome of processing a single SQS message, used to decide whether the
45/// message should be batch-deleted or left in the queue.
46#[derive(Debug)]
47enum MessageOutcome {
48    /// Message processed successfully — should be deleted from queue.
49    Delete { receipt_handle: String },
50    /// Message should remain in queue (e.g. status-check retry via visibility
51    /// change, or retryable error awaiting visibility timeout).
52    Retain,
53}
54
55/// Configuration for a single SQS poll loop, bundling parameters that
56/// would otherwise require too many function arguments.
57#[derive(Clone)]
58struct PollLoopConfig {
59    queue_type: QueueType,
60    polling_interval: u64,
61    visibility_timeout: u32,
62    handler_timeout: Duration,
63    max_retries: usize,
64    poller_id: usize,
65    poller_count: usize,
66}
67
68/// Spawns a worker task for a specific SQS queue.
69///
70/// The worker continuously polls the queue, processes messages, and handles
71/// retries via SQS visibility timeout.
72///
73/// # Arguments
74/// * `sqs_client` - AWS SQS client for all operations (poll, send, delete, change visibility)
75/// * `queue_type` - Type of queue (determines handler and concurrency)
76/// * `queue_url` - SQS queue URL
77/// * `app_state` - Application state with repositories and services
78///
79/// # Returns
80/// JoinHandle to the spawned worker task
81pub async fn spawn_worker_for_queue(
82    sqs_client: aws_sdk_sqs::Client,
83    queue_type: QueueType,
84    queue_url: String,
85    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
86    shutdown_rx: watch::Receiver<bool>,
87) -> Result<WorkerHandle, QueueBackendError> {
88    let concurrency = get_concurrency_for_queue(queue_type);
89    let max_retries = queue_type.max_retries();
90    let polling_interval = get_wait_time_for_queue(queue_type);
91    let poller_count = get_poller_count_for_queue(queue_type);
92    let visibility_timeout = queue_type.visibility_timeout_secs();
93    let handler_timeout_secs = handler_timeout_secs(queue_type);
94    let handler_timeout = Duration::from_secs(handler_timeout_secs);
95
96    info!(
97        queue_type = ?queue_type,
98        queue_url = %queue_url,
99        concurrency = concurrency,
100        max_retries = max_retries,
101        polling_interval_secs = polling_interval,
102        poller_count = poller_count,
103        visibility_timeout_secs = visibility_timeout,
104        handler_timeout_secs = handler_timeout_secs,
105        "Spawning SQS worker"
106    );
107
108    // All pollers share the same semaphore so total concurrency is bounded.
109    let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
110
111    let handle: JoinHandle<()> = tokio::spawn(async move {
112        let mut poller_handles: JoinSet<()> = JoinSet::new();
113
114        for poller_id in 0..poller_count {
115            let client = sqs_client.clone();
116            let url = queue_url.clone();
117            let state = app_state.clone();
118            let sem = semaphore.clone();
119            let mut rx = shutdown_rx.clone();
120            let config = PollLoopConfig {
121                queue_type,
122                polling_interval,
123                visibility_timeout,
124                handler_timeout,
125                max_retries,
126                poller_id,
127                poller_count,
128            };
129
130            poller_handles.spawn(async move {
131                run_poll_loop(client, url, state, sem, &mut rx, config).await;
132            });
133        }
134
135        // Wait for all pollers to finish (they exit on shutdown signal)
136        while let Some(join_result) = poller_handles.join_next().await {
137            if let Err(err) = join_result {
138                error!(
139                    queue_type = ?queue_type,
140                    error = %err,
141                    "SQS poller task terminated unexpectedly"
142                );
143            }
144        }
145        info!(queue_type = ?queue_type, "SQS worker stopped");
146    });
147
148    Ok(WorkerHandle::Tokio(handle))
149}
150
151/// Runs a single SQS poll loop. Multiple instances may share the same semaphore
152/// to increase pickup smoothness without exceeding handler concurrency limits.
153async fn run_poll_loop(
154    sqs_client: aws_sdk_sqs::Client,
155    queue_url: String,
156    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
157    semaphore: Arc<tokio::sync::Semaphore>,
158    shutdown_rx: &mut watch::Receiver<bool>,
159    config: PollLoopConfig,
160) {
161    let PollLoopConfig {
162        queue_type,
163        polling_interval,
164        visibility_timeout,
165        handler_timeout,
166        max_retries,
167        poller_id,
168        poller_count,
169    } = config;
170    let mut inflight: JoinSet<Option<String>> = JoinSet::new();
171    let mut consecutive_poll_errors: u32 = 0;
172    let mut pending_deletes: Vec<String> = Vec::new();
173
174    loop {
175        // Reap completed tasks and collect receipt handles for batch delete
176        while let Some(result) = inflight.try_join_next() {
177            match result {
178                Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
179                Ok(None) => {} // Retained message, no delete needed
180                Err(e) => {
181                    warn!(
182                        queue_type = ?queue_type,
183                        poller_id = poller_id,
184                        error = %e,
185                        "In-flight task failed"
186                    );
187                }
188            }
189        }
190
191        // Flush any accumulated deletes as a batch
192        if !pending_deletes.is_empty() {
193            flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
194            pending_deletes.clear();
195        }
196
197        // Check shutdown before each iteration
198        if *shutdown_rx.borrow() {
199            info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
200            break;
201        }
202
203        // Distribute available permits fairly across pollers to prevent
204        // collective overfetch. Each poller gets floor(available / N)
205        // messages, and the first (available % N) pollers (by poller_id)
206        // each get one extra from the remainder. This ensures:
207        // - No stall: at least one poller polls when any permits exist
208        // - Bounded overfetch: at most poller_count extra from racing
209        let available_permits = semaphore.available_permits();
210        let base_share = available_permits / poller_count;
211        let remainder = available_permits % poller_count;
212        let my_share = base_share + usize::from(poller_id < remainder);
213        if my_share == 0 {
214            tokio::select! {
215                _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
216                _ = shutdown_rx.changed() => {
217                    info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
218                    break;
219                }
220            }
221        }
222
223        // SQS MaxNumberOfMessages must be 1-10.
224        let batch_size = my_share.min(10) as i32;
225
226        // Poll SQS for messages, racing with shutdown signal
227        let messages_result = tokio::select! {
228            result = sqs_client
229                .receive_message()
230                .queue_url(&queue_url)
231                .max_number_of_messages(batch_size) // SQS max is 10
232                .wait_time_seconds(polling_interval as i32)
233                .visibility_timeout(visibility_timeout as i32)
234                .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
235                .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
236                .message_system_attribute_names(MessageSystemAttributeName::SentTimestamp)
237                .message_attribute_names("target_scheduled_on")
238                .message_attribute_names("retry_attempt")
239                .send() => result,
240            _ = shutdown_rx.changed() => {
241                info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
242                break;
243            }
244        };
245
246        match messages_result {
247            Ok(output) => {
248                if consecutive_poll_errors > 0 {
249                    info!(
250                        queue_type = ?queue_type,
251                        poller_id = poller_id,
252                        previous_errors = consecutive_poll_errors,
253                        "SQS polling recovered after consecutive errors"
254                    );
255                }
256                consecutive_poll_errors = 0;
257
258                if let Some(messages) = output.messages {
259                    if !messages.is_empty() {
260                        debug!(
261                            queue_type = ?queue_type,
262                            poller_id = poller_id,
263                            message_count = messages.len(),
264                            "Received messages from SQS"
265                        );
266
267                        // Process messages concurrently (up to semaphore limit)
268                        for message in messages {
269                            let permit = match semaphore.clone().acquire_owned().await {
270                                Ok(permit) => permit,
271                                Err(err) => {
272                                    error!(
273                                        queue_type = ?queue_type,
274                                        poller_id = poller_id,
275                                        error = %err,
276                                        "Semaphore closed, stopping SQS poller loop"
277                                    );
278                                    return;
279                                }
280                            };
281                            let client = sqs_client.clone();
282                            let url = queue_url.clone();
283                            let state = app_state.clone();
284
285                            inflight.spawn(async move {
286                                let _permit = permit; // always dropped, even on panic
287
288                                let result = tokio::time::timeout(
289                                    handler_timeout,
290                                    AssertUnwindSafe(process_message(
291                                        client.clone(),
292                                        message,
293                                        queue_type,
294                                        &url,
295                                        state,
296                                        max_retries,
297                                    ))
298                                    .catch_unwind(),
299                                )
300                                .await;
301
302                                match result {
303                                    Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
304                                        Some(receipt_handle)
305                                    }
306                                    Ok(Ok(Ok(MessageOutcome::Retain))) => None,
307                                    Ok(Ok(Err(e))) => {
308                                        error!(
309                                            queue_type = ?queue_type,
310                                            error = %e,
311                                            "Failed to process message"
312                                        );
313                                        None
314                                    }
315                                    Ok(Err(panic_info)) => {
316                                        let msg = panic_info
317                                            .downcast_ref::<String>()
318                                            .map(|s| s.as_str())
319                                            .or_else(|| {
320                                                panic_info.downcast_ref::<&str>().copied()
321                                            })
322                                            .unwrap_or("unknown panic");
323                                        error!(
324                                            queue_type = ?queue_type,
325                                            panic = %msg,
326                                            "Message handler panicked"
327                                        );
328                                        None
329                                    }
330                                    Err(_) => {
331                                        error!(
332                                            queue_type = ?queue_type,
333                                            timeout_secs = handler_timeout.as_secs(),
334                                            "Message handler timed out; message will be retried after visibility timeout"
335                                        );
336                                        None
337                                    }
338                                }
339                            });
340                        }
341                    }
342                }
343            }
344            Err(e) => {
345                consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
346                let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
347                let (error_code, error_message) = match &e {
348                    SdkError::ServiceError(ctx) => (ctx.err().code(), ctx.err().message()),
349                    _ => (None, None),
350                };
351                error!(
352                    queue_type = ?queue_type,
353                    poller_id = poller_id,
354                    error.kind = classify_sdk_error(&e),
355                    error.detail = %DisplayErrorContext(&e),
356                    error_code = error_code.unwrap_or("unknown"),
357                    error_message = error_message.unwrap_or("n/a"),
358                    consecutive_errors = consecutive_poll_errors,
359                    backoff_secs = backoff_secs,
360                    "Failed to receive messages from SQS, backing off"
361                );
362                tokio::select! {
363                    _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
364                    _ = shutdown_rx.changed() => {
365                        info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
366                        break;
367                    }
368                }
369            }
370        }
371    }
372
373    // Drain in-flight tasks before shutdown, collecting final deletes
374    if !inflight.is_empty() {
375        info!(
376            queue_type = ?queue_type,
377            poller_id = poller_id,
378            count = inflight.len(),
379            "Draining in-flight tasks before shutdown"
380        );
381        match tokio::time::timeout(Duration::from_secs(30), async {
382            while let Some(result) = inflight.join_next().await {
383                match result {
384                    Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
385                    Ok(None) => {}
386                    Err(e) => {
387                        warn!(
388                            queue_type = ?queue_type,
389                            poller_id = poller_id,
390                            error = %e,
391                            "In-flight task failed during drain"
392                        );
393                    }
394                }
395            }
396        })
397        .await
398        {
399            Ok(()) => {
400                info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
401            }
402            Err(_) => {
403                warn!(
404                    queue_type = ?queue_type,
405                    poller_id = poller_id,
406                    remaining = inflight.len(),
407                    "Drain timeout, abandoning remaining tasks"
408                );
409                inflight.abort_all();
410            }
411        }
412    }
413
414    // Flush any remaining deletes accumulated during drain
415    if !pending_deletes.is_empty() {
416        flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
417    }
418}
419
420/// Processes a single SQS message.
421///
422/// Routes the message to the appropriate handler based on queue type,
423/// handles success/failure, and manages message deletion/retry.
424async fn process_message(
425    sqs_client: aws_sdk_sqs::Client,
426    message: Message,
427    queue_type: QueueType,
428    queue_url: &str,
429    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
430    max_retries: usize,
431) -> Result<MessageOutcome, QueueBackendError> {
432    let body = message
433        .body()
434        .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
435
436    let receipt_handle = message
437        .receipt_handle()
438        .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
439
440    // Observe queue pickup latency on the FIRST physical delivery, before the
441    // defer block consumes it. Placement here is deliberate:
442    //   - Standard queues hold scheduled messages invisible via DelaySeconds
443    //     and deliver at ~target_scheduled_on, so latency reflects actual
444    //     sub-second pickup delay.
445    //   - FIFO queues deliver scheduled messages immediately (no native
446    //     DelaySeconds) and the consumer then defers via visibility timeout.
447    //     The negative `now - target_scheduled_on` clamps to 0, which honestly
448    //     says "consumer is keeping up with the schedule".
449    // Either way: receive_count==1 in `queue_pickup_baseline_ms` ensures we
450    // observe exactly once per logical message lifecycle. FIFO defer/retry
451    // re-deliveries (which bump receive_count) are skipped; standard-queue
452    // retries are skipped via the `retry_attempt` attribute.
453    if let Some(baseline) = queue_pickup_baseline_ms(&message) {
454        let now_ms = chrono::Utc::now().timestamp_millis();
455        // SentTimestamp is set by the AWS broker; if the consumer clock runs
456        // ahead of the broker by more than this threshold for a non-scheduled
457        // message, the latency is almost certainly clock skew, not a real
458        // backlog. Log so operators can detect bad data rather than alert on it.
459        let delta_ms = now_ms - baseline;
460        if parse_target_scheduled_on(&message).is_none()
461            && delta_ms > PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS
462        {
463            warn!(
464                queue_type = ?queue_type,
465                latency_ms = delta_ms,
466                "queue_pickup_latency above sanity threshold for non-scheduled SQS message; check broker/consumer clock skew"
467            );
468        }
469        observe_queue_pickup_latency(
470            queue_type.queue_name(),
471            "sqs",
472            pickup_latency_secs(baseline, now_ms),
473        );
474    }
475
476    // For jobs with scheduling beyond SQS 15-minute max delay, keep deferring in hops.
477    if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
478        let now = std::time::SystemTime::now()
479            .duration_since(std::time::SystemTime::UNIX_EPOCH)
480            .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
481            .as_secs() as i64;
482        let remaining = target_scheduled_on - now;
483        if remaining > 0 {
484            let should_delete_original = defer_message(
485                &sqs_client,
486                queue_url,
487                body.to_string(),
488                &message,
489                target_scheduled_on,
490                remaining.min(900) as i32,
491            )
492            .await?;
493
494            debug!(
495                queue_type = ?queue_type,
496                remaining_seconds = remaining,
497                "Deferred scheduled SQS message for next delay hop"
498            );
499            return if should_delete_original {
500                Ok(MessageOutcome::Delete {
501                    receipt_handle: receipt_handle.to_string(),
502                })
503            } else {
504                Ok(MessageOutcome::Retain)
505            };
506        }
507    }
508
509    // Get retry attempt count from message attributes
510    let receive_count = message
511        .attributes()
512        .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
513        .and_then(|count| count.parse::<usize>().ok())
514        .unwrap_or(1);
515    // SQS receive count starts at 1; Apalis Attempt starts at 0.
516    let attempt_number = receive_count.saturating_sub(1);
517    // Persisted retry attempt for self-reenqueued status checks. Falls back to receive_count-based
518    // attempt when attribute is missing.
519    let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
520
521    // Use SQS MessageId as the worker task_id for log correlation.
522    let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
523
524    debug!(
525        queue_type = ?queue_type,
526        message_id = %sqs_message_id,
527        attempt = attempt_number,
528        receive_count = receive_count,
529        max_retries = max_retries,
530        "Processing message"
531    );
532
533    // Route to appropriate handler
534    let result = match queue_type {
535        QueueType::TransactionRequest => {
536            process_job::<TransactionRequest, _, _>(
537                body,
538                app_state,
539                attempt_number,
540                sqs_message_id,
541                "TransactionRequest",
542                transaction_request_handler,
543            )
544            .await
545        }
546        QueueType::TransactionSubmission => {
547            process_job::<TransactionSend, _, _>(
548                body,
549                app_state,
550                attempt_number,
551                sqs_message_id,
552                "TransactionSend",
553                transaction_submission_handler,
554            )
555            .await
556        }
557        QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
558            process_job::<TransactionStatusCheck, _, _>(
559                body,
560                app_state,
561                attempt_number,
562                sqs_message_id,
563                "TransactionStatusCheck",
564                transaction_status_handler,
565            )
566            .await
567        }
568        QueueType::Notification => {
569            process_job::<NotificationSend, _, _>(
570                body,
571                app_state,
572                attempt_number,
573                sqs_message_id,
574                "NotificationSend",
575                notification_handler,
576            )
577            .await
578        }
579        QueueType::TokenSwapRequest => {
580            process_job::<TokenSwapRequest, _, _>(
581                body,
582                app_state,
583                attempt_number,
584                sqs_message_id,
585                "TokenSwapRequest",
586                token_swap_request_handler,
587            )
588            .await
589        }
590        QueueType::RelayerHealthCheck => {
591            process_job::<RelayerHealthCheck, _, _>(
592                body,
593                app_state,
594                attempt_number,
595                sqs_message_id,
596                "RelayerHealthCheck",
597                relayer_health_check_handler,
598            )
599            .await
600        }
601    };
602
603    match result {
604        Ok(()) => {
605            debug!(
606                queue_type = ?queue_type,
607                attempt = attempt_number,
608                "Message processed successfully"
609            );
610
611            Ok(MessageOutcome::Delete {
612                receipt_handle: receipt_handle.to_string(),
613            })
614        }
615        Err(ProcessingError::Permanent(e)) => {
616            error!(
617                queue_type = ?queue_type,
618                attempt = attempt_number,
619                error = %e,
620                "Permanent handler failure, message will be deleted"
621            );
622
623            Ok(MessageOutcome::Delete {
624                receipt_handle: receipt_handle.to_string(),
625            })
626        }
627        Err(ProcessingError::Retryable(e)) => {
628            // Check max retries for non-infinite queues (status checks use usize::MAX)
629            if max_retries != usize::MAX && receive_count > max_retries {
630                error!(
631                    queue_type = ?queue_type,
632                    attempt = attempt_number,
633                    receive_count = receive_count,
634                    max_retries = max_retries,
635                    error = %e,
636                    "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
637                );
638                return Ok(MessageOutcome::Retain);
639            }
640
641            // Compute retry delay based on queue type:
642            // - Status checks use network-type-aware backoff from the message body
643            // - All other queues use their configured backoff profile from retry_config
644            let delay = if queue_type.is_status_check() {
645                compute_status_retry_delay(body, logical_retry_attempt)
646            } else {
647                retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
648            };
649
650            // FIFO queues do not support per-message DelaySeconds. Use visibility
651            // timeout on the in-flight message to schedule the retry.
652            if is_fifo_queue_url(queue_url) {
653                if let Err(err) = sqs_client
654                    .change_message_visibility()
655                    .queue_url(queue_url)
656                    .receipt_handle(receipt_handle)
657                    .visibility_timeout(delay.clamp(1, 900))
658                    .send()
659                    .await
660                {
661                    error!(
662                        queue_type = ?queue_type,
663                        error = %err,
664                        "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
665                    );
666                    return Ok(MessageOutcome::Retain);
667                }
668
669                debug!(
670                    queue_type = ?queue_type,
671                    attempt = logical_retry_attempt,
672                    delay_seconds = delay,
673                    error = %e,
674                    "Retry scheduled via visibility timeout"
675                );
676
677                return Ok(MessageOutcome::Retain);
678            }
679
680            let next_retry_attempt = logical_retry_attempt.saturating_add(1);
681
682            // Standard queues: re-enqueue with native DelaySeconds,
683            // no group_id or dedup_id needed. Duplicate deliveries are
684            // harmless because handlers are idempotent.
685            if let Err(send_err) = sqs_client
686                .send_message()
687                .queue_url(queue_url)
688                .message_body(body.to_string())
689                .delay_seconds(delay)
690                .message_attributes(
691                    "retry_attempt",
692                    MessageAttributeValue::builder()
693                        .data_type("Number")
694                        .string_value(next_retry_attempt.to_string())
695                        .build()
696                        .map_err(|err| {
697                            QueueBackendError::SqsError(format!(
698                                "Failed to build retry_attempt attribute: {err}"
699                            ))
700                        })?,
701                )
702                .send()
703                .await
704            {
705                error!(
706                    queue_type = ?queue_type,
707                    error.kind = classify_sdk_error(&send_err),
708                    error.detail = %DisplayErrorContext(&send_err),
709                    "Failed to re-enqueue message; leaving original for visibility timeout retry"
710                );
711                // Fall through — original message will retry after visibility timeout
712                return Ok(MessageOutcome::Retain);
713            }
714
715            debug!(
716                queue_type = ?queue_type,
717                attempt = logical_retry_attempt,
718                delay_seconds = delay,
719                error = %e,
720                "Message re-enqueued with backoff delay"
721            );
722
723            // Delete the original message now that the re-enqueue succeeded
724            Ok(MessageOutcome::Delete {
725                receipt_handle: receipt_handle.to_string(),
726            })
727        }
728    }
729}
730
731/// Generic job processor — deserializes `Job<T>`, creates a `WorkerContext`,
732/// and delegates to the provided handler function.
733async fn process_job<T, F, Fut>(
734    body: &str,
735    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
736    attempt: usize,
737    task_id: String,
738    type_name: &str,
739    handler: F,
740) -> Result<(), ProcessingError>
741where
742    T: DeserializeOwned,
743    F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
744    Fut: Future<Output = Result<(), HandlerError>>,
745{
746    let job: Job<T> = serde_json::from_str(body).map_err(|e| {
747        error!(error = %e, "Failed to deserialize {} job", type_name);
748        // Malformed payload is not recoverable by retrying the same message body.
749        ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
750    })?;
751
752    let ctx = WorkerContext::new(attempt, task_id);
753    handler(job, (*app_state).clone(), ctx)
754        .await
755        .map_err(map_handler_error)
756}
757
758fn map_handler_error(error: HandlerError) -> ProcessingError {
759    match error {
760        HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
761        HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
762    }
763}
764
765fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
766    message
767        .message_attributes()
768        .and_then(|attrs| attrs.get("target_scheduled_on"))
769        .and_then(|value| value.string_value())
770        .and_then(|value| value.parse::<i64>().ok())
771}
772
773fn parse_retry_attempt(message: &Message) -> Option<usize> {
774    message
775        .message_attributes()
776        .and_then(|attrs| attrs.get("retry_attempt"))
777        .and_then(|value| value.string_value())
778        .and_then(|value| value.parse::<usize>().ok())
779}
780
781/// Compute pickup latency in seconds, clamping negative deltas to 0 so a
782/// consumer clock running ahead of the AWS broker (or a future-dated
783/// `target_scheduled_on`) cannot produce a negative-then-huge cast value.
784fn pickup_latency_secs(baseline_ms: i64, now_ms: i64) -> f64 {
785    (now_ms - baseline_ms).max(0) as f64 / 1000.0
786}
787
788/// Sanity threshold (ms) for non-scheduled latency observations. Above this,
789/// the consumer clock is almost certainly skewed relative to the AWS broker
790/// rather than the queue genuinely being backed up by an hour+. Used only to
791/// emit a warning — the value is still observed in the histogram.
792const PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS: i64 = 60 * 60 * 1000;
793
794fn queue_pickup_baseline_ms(message: &Message) -> Option<i64> {
795    // Observe pickup latency only on the very first physical delivery of
796    // a message. The gate is intentionally narrow because the relayer
797    // supports both standard and FIFO SQS queues — and FIFO defer-hops and
798    // error retries both reuse the same physical message via
799    // `change_message_visibility`, which cannot mutate the `retry_attempt`
800    // attribute. Without the receive-count gate, every FIFO redelivery
801    // would re-observe the latency, conflating the metric with retry
802    // backoff time.
803    //
804    // The trade-offs:
805    //   - Standard queues: behaves correctly. Initial delivery has
806    //     receive_count=1; standard-queue retries re-send a new message
807    //     (also receive_count=1) but carry `retry_attempt > 0`, so the
808    //     second check below skips them.
809    //   - Standard queues with defer-hop (only triggered when
810    //     scheduled_on - now > 900s): each defer-hop creates a new message
811    //     with receive_count=1 and no `retry_attempt`, so the metric
812    //     observes each hop. This is an accepted limitation; long-delay
813    //     scheduling is rare in this codebase (status checks use seconds-
814    //     scale backoff).
815    //   - FIFO queues: only the very first delivery observes. For
816    //     scheduled jobs that arrive before `target_scheduled_on`, the
817    //     computed latency is clamped to 0 by the caller. This is narrower
818    //     than the standard-queue semantic but consistent and free of
819    //     retry inflation.
820    let receive_count = message
821        .attributes()
822        .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
823        .and_then(|count| count.parse::<usize>().ok())
824        .unwrap_or(1);
825    if receive_count != 1 {
826        return None;
827    }
828
829    // Standard-queue retries re-enqueue as new messages with receive_count=1
830    // and an explicit `retry_attempt` attribute. Skip those so the metric
831    // doesn't include retry backoff time.
832    if parse_retry_attempt(message).is_some_and(|n| n > 0) {
833        return None;
834    }
835
836    parse_target_scheduled_on(message)
837        .map(|ts_secs| ts_secs * 1000)
838        .or_else(|| {
839            message
840                .attributes()
841                .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp))
842                .and_then(|v| v.parse::<i64>().ok())
843        })
844}
845
846fn is_fifo_queue_url(queue_url: &str) -> bool {
847    queue_url.ends_with(".fifo")
848}
849
850async fn defer_message(
851    sqs_client: &aws_sdk_sqs::Client,
852    queue_url: &str,
853    body: String,
854    message: &Message,
855    target_scheduled_on: i64,
856    delay_seconds: i32,
857) -> Result<bool, QueueBackendError> {
858    if is_fifo_queue_url(queue_url) {
859        let receipt_handle = message.receipt_handle().ok_or_else(|| {
860            QueueBackendError::QueueError(
861                "Cannot defer FIFO message: missing receipt handle".to_string(),
862            )
863        })?;
864
865        sqs_client
866            .change_message_visibility()
867            .queue_url(queue_url)
868            .receipt_handle(receipt_handle)
869            .visibility_timeout(delay_seconds.clamp(1, 900))
870            .send()
871            .await
872            .map_err(|e| {
873                error!(
874                    error.kind = classify_sdk_error(&e),
875                    error.detail = %DisplayErrorContext(&e),
876                    queue_url = %queue_url,
877                    "Failed to defer FIFO message via visibility timeout"
878                );
879                QueueBackendError::SqsError(format!(
880                    "Failed to defer FIFO message via visibility timeout: {}",
881                    classify_sdk_error(&e)
882                ))
883            })?;
884
885        return Ok(false);
886    }
887
888    // Standard queues support native per-message DelaySeconds — no need for
889    // group_id or dedup_id. Just re-send with the delay and scheduling attribute.
890    let request = sqs_client
891        .send_message()
892        .queue_url(queue_url)
893        .message_body(body)
894        .delay_seconds(delay_seconds.clamp(1, 900))
895        .message_attributes(
896            "target_scheduled_on",
897            MessageAttributeValue::builder()
898                .data_type("Number")
899                .string_value(target_scheduled_on.to_string())
900                .build()
901                .map_err(|e| {
902                    QueueBackendError::SqsError(format!(
903                        "Failed to build deferred scheduled attribute: {e}"
904                    ))
905                })?,
906        );
907
908    request.send().await.map_err(|e| {
909        error!(
910            error.kind = classify_sdk_error(&e),
911            error.detail = %DisplayErrorContext(&e),
912            queue_url = %queue_url,
913            "Failed to defer scheduled message"
914        );
915        QueueBackendError::SqsError(format!(
916            "Failed to defer scheduled message: {}",
917            classify_sdk_error(&e)
918        ))
919    })?;
920
921    Ok(true)
922}
923
924/// Partial struct for deserializing only the `network_type` field from a status check job.
925///
926/// Used to avoid deserializing the entire `Job<TransactionStatusCheck>` when we only
927/// need the network type to determine retry delay.
928#[derive(serde::Deserialize)]
929struct StatusCheckData {
930    network_type: Option<crate::models::NetworkType>,
931}
932
933/// Partial struct matching `Job<TransactionStatusCheck>` structure.
934///
935/// Used for efficient partial deserialization to extract only the `network_type`
936/// field without parsing the entire job payload.
937#[derive(serde::Deserialize)]
938struct PartialStatusCheckJob {
939    data: StatusCheckData,
940}
941
942/// Extracts `network_type` from a status check payload and computes retry delay.
943///
944/// This uses hardcoded network-specific backoff windows aligned with Redis/Apalis:
945/// - EVM: 8s -> 12s cap
946/// - Stellar: 2s -> 3s cap
947/// - Solana/default: 5s -> 8s cap
948fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
949    let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
950        .ok()
951        .and_then(|j| j.data.network_type);
952
953    crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
954}
955
956/// Gets the SQS long-poll wait time for a queue type from environment or default.
957fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
958    ServerConfig::get_sqs_wait_time(
959        queue_type.sqs_env_key(),
960        queue_type.default_wait_time_secs(),
961    )
962}
963
964/// Gets the number of poll loops to run for a queue type from environment or default.
965fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
966    let configured = ServerConfig::get_sqs_poller_count(
967        queue_type.sqs_env_key(),
968        queue_type.default_poller_count(),
969    );
970    if configured == 0 {
971        warn!(
972            queue_type = ?queue_type,
973            "Configured poller count is 0; clamping to 1"
974        );
975        1
976    } else {
977        configured
978    }
979}
980
981/// Gets the concurrency limit for a queue type from environment.
982fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
983    let configured = ServerConfig::get_worker_concurrency(
984        queue_type.concurrency_env_key(),
985        queue_type.default_concurrency(),
986    );
987    if configured == 0 {
988        warn!(
989            queue_type = ?queue_type,
990            "Configured concurrency is 0; clamping to 1"
991        );
992        1
993    } else {
994        configured
995    }
996}
997
998/// Maximum allowed wall-clock processing time per message before the handler task is canceled.
999///
1000/// Keep this bounded so permits cannot be held forever by hung handlers.
1001fn handler_timeout_secs(queue_type: QueueType) -> u64 {
1002    u64::from(queue_type.visibility_timeout_secs().max(1))
1003}
1004
1005/// Maximum backoff duration for poll errors (1 minute).
1006const MAX_POLL_BACKOFF_SECS: u64 = 60;
1007
1008/// Number of consecutive errors between recovery probes at the backoff ceiling.
1009/// Once the backoff reaches `MAX_POLL_BACKOFF_SECS`, every Nth error cycle uses
1010/// the base interval (5s) to quickly detect when the SQS endpoint recovers.
1011const RECOVERY_PROBE_EVERY: u32 = 4;
1012
1013/// Computes exponential backoff for consecutive poll errors with recovery probes.
1014///
1015/// Returns: 5, 10, 20, 40, 60, 60, 60, **5** (probe), 60, 60, 60, **5**, ...
1016fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
1017    let base: u64 = 5;
1018
1019    // Once well past the ceiling, periodically try the base interval
1020    // to quickly detect when the SQS endpoint recovers.
1021    if consecutive_errors >= 7 && consecutive_errors.is_multiple_of(RECOVERY_PROBE_EVERY) {
1022        return base;
1023    }
1024
1025    let exponent = consecutive_errors.saturating_sub(1).min(16);
1026    base.saturating_mul(2_u64.saturating_pow(exponent))
1027        .min(MAX_POLL_BACKOFF_SECS)
1028}
1029
1030/// Deletes messages from SQS in batches of up to 10 (the SQS maximum per call).
1031///
1032/// Returns the total number of successfully deleted messages. Any per-entry
1033/// failures are logged as warnings — SQS will redeliver those messages after
1034/// the visibility timeout expires.
1035async fn flush_delete_batch(
1036    sqs_client: &aws_sdk_sqs::Client,
1037    queue_url: &str,
1038    batch: &[String],
1039    queue_type: QueueType,
1040) -> usize {
1041    if batch.is_empty() {
1042        return 0;
1043    }
1044
1045    let mut deleted = 0;
1046
1047    for chunk in batch.chunks(10) {
1048        let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
1049            .iter()
1050            .enumerate()
1051            .map(|(i, handle)| {
1052                DeleteMessageBatchRequestEntry::builder()
1053                    .id(i.to_string())
1054                    .receipt_handle(handle)
1055                    .build()
1056                    .expect("id and receipt_handle are always set")
1057            })
1058            .collect();
1059
1060        match sqs_client
1061            .delete_message_batch()
1062            .queue_url(queue_url)
1063            .set_entries(Some(entries))
1064            .send()
1065            .await
1066        {
1067            Ok(output) => {
1068                deleted += output.successful().len();
1069
1070                for f in output.failed() {
1071                    warn!(
1072                        queue_type = ?queue_type,
1073                        id = %f.id(),
1074                        code = %f.code(),
1075                        message = f.message().unwrap_or("unknown"),
1076                        "Batch delete entry failed (message will be redelivered)"
1077                    );
1078                }
1079            }
1080            Err(e) => {
1081                error!(
1082                    queue_type = ?queue_type,
1083                    error.kind = classify_sdk_error(&e),
1084                    error.detail = %DisplayErrorContext(&e),
1085                    batch_size = chunk.len(),
1086                    "Batch delete API call failed (messages will be redelivered)"
1087                );
1088            }
1089        }
1090    }
1091
1092    deleted
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097    use super::*;
1098
1099    #[test]
1100    fn test_get_concurrency_for_queue() {
1101        // Test that concurrency is retrieved (exact value depends on env)
1102        let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
1103        assert!(concurrency > 0);
1104
1105        let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
1106        assert!(concurrency > 0);
1107    }
1108
1109    #[test]
1110    fn test_handler_timeout_secs_is_positive() {
1111        let all = [
1112            QueueType::TransactionRequest,
1113            QueueType::TransactionSubmission,
1114            QueueType::StatusCheck,
1115            QueueType::StatusCheckEvm,
1116            QueueType::StatusCheckStellar,
1117            QueueType::Notification,
1118            QueueType::TokenSwapRequest,
1119            QueueType::RelayerHealthCheck,
1120        ];
1121        for queue_type in all {
1122            assert!(handler_timeout_secs(queue_type) > 0);
1123        }
1124    }
1125
1126    #[test]
1127    fn test_handler_timeout_secs_uses_visibility_timeout() {
1128        assert_eq!(
1129            handler_timeout_secs(QueueType::StatusCheckEvm),
1130            QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1131        );
1132        assert_eq!(
1133            handler_timeout_secs(QueueType::Notification),
1134            QueueType::Notification.visibility_timeout_secs() as u64
1135        );
1136    }
1137
1138    #[test]
1139    fn test_parse_target_scheduled_on() {
1140        // Test parsing target_scheduled_on from message attributes
1141        let message = Message::builder().build();
1142
1143        // Message without attribute should return None
1144        assert_eq!(parse_target_scheduled_on(&message), None);
1145
1146        // Message with valid attribute
1147        let message = Message::builder()
1148            .message_attributes(
1149                "target_scheduled_on",
1150                MessageAttributeValue::builder()
1151                    .data_type("Number")
1152                    .string_value("1234567890")
1153                    .build()
1154                    .unwrap(),
1155            )
1156            .build();
1157
1158        assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1159    }
1160
1161    #[test]
1162    fn test_parse_retry_attempt() {
1163        let message = Message::builder().build();
1164        assert_eq!(parse_retry_attempt(&message), None);
1165
1166        let message = Message::builder()
1167            .message_attributes(
1168                "retry_attempt",
1169                MessageAttributeValue::builder()
1170                    .data_type("Number")
1171                    .string_value("7")
1172                    .build()
1173                    .unwrap(),
1174            )
1175            .build();
1176        assert_eq!(parse_retry_attempt(&message), Some(7));
1177    }
1178
1179    #[test]
1180    fn test_map_handler_error() {
1181        // Test Abort maps to Permanent
1182        let error = HandlerError::Abort("Validation failed".to_string());
1183        let result = map_handler_error(error);
1184        assert!(matches!(result, ProcessingError::Permanent(_)));
1185
1186        // Test Retry maps to Retryable
1187        let error = HandlerError::Retry("Network timeout".to_string());
1188        let result = map_handler_error(error);
1189        assert!(matches!(result, ProcessingError::Retryable(_)));
1190    }
1191
1192    #[test]
1193    fn test_is_fifo_queue_url() {
1194        assert!(is_fifo_queue_url(
1195            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1196        ));
1197        assert!(!is_fifo_queue_url(
1198            "https://sqs.us-east-1.amazonaws.com/123/queue"
1199        ));
1200    }
1201
1202    #[test]
1203    fn test_compute_status_retry_delay_evm() {
1204        // NetworkType uses #[serde(rename_all = "lowercase")]
1205        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1206        assert_eq!(compute_status_retry_delay(body, 0), 8);
1207        assert_eq!(compute_status_retry_delay(body, 1), 12);
1208        assert_eq!(compute_status_retry_delay(body, 8), 12);
1209    }
1210
1211    #[test]
1212    fn test_compute_status_retry_delay_stellar() {
1213        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1214        assert_eq!(compute_status_retry_delay(body, 0), 2);
1215        assert_eq!(compute_status_retry_delay(body, 1), 3);
1216        assert_eq!(compute_status_retry_delay(body, 8), 3);
1217    }
1218
1219    #[test]
1220    fn test_compute_status_retry_delay_solana() {
1221        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1222        assert_eq!(compute_status_retry_delay(body, 0), 5);
1223        assert_eq!(compute_status_retry_delay(body, 1), 8);
1224        assert_eq!(compute_status_retry_delay(body, 8), 8);
1225    }
1226
1227    #[test]
1228    fn test_compute_status_retry_delay_missing_network() {
1229        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1230        assert_eq!(compute_status_retry_delay(body, 0), 5);
1231        assert_eq!(compute_status_retry_delay(body, 1), 8);
1232        assert_eq!(compute_status_retry_delay(body, 8), 8);
1233    }
1234
1235    #[test]
1236    fn test_compute_status_retry_delay_invalid_body() {
1237        assert_eq!(compute_status_retry_delay("not json", 0), 5);
1238        assert_eq!(compute_status_retry_delay("not json", 1), 8);
1239        assert_eq!(compute_status_retry_delay("not json", 8), 8);
1240    }
1241
1242    #[tokio::test]
1243    async fn test_semaphore_released_on_panic() {
1244        let sem = Arc::new(tokio::sync::Semaphore::new(1));
1245        let permit = sem.clone().acquire_owned().await.unwrap();
1246
1247        let handle = tokio::spawn(async move {
1248            let _permit = permit; // dropped on scope exit, even after panic
1249            let _ = AssertUnwindSafe(async { panic!("test panic") })
1250                .catch_unwind()
1251                .await;
1252        });
1253
1254        handle.await.unwrap();
1255        // Would hang forever if permit leaked
1256        let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1257            .await
1258            .expect("permit should be available after panic");
1259    }
1260
1261    #[test]
1262    fn test_poll_error_backoff_secs() {
1263        // First error: 5s
1264        assert_eq!(poll_error_backoff_secs(1), 5);
1265        // Second: 10s
1266        assert_eq!(poll_error_backoff_secs(2), 10);
1267        // Third: 20s
1268        assert_eq!(poll_error_backoff_secs(3), 20);
1269        // Fourth: 40s
1270        assert_eq!(poll_error_backoff_secs(4), 40);
1271        // Capped at MAX_POLL_BACKOFF_SECS (60)
1272        assert_eq!(poll_error_backoff_secs(5), 60);
1273        assert_eq!(poll_error_backoff_secs(6), 60);
1274        assert_eq!(poll_error_backoff_secs(7), 60);
1275        // Recovery probe: base interval at multiples of RECOVERY_PROBE_EVERY (>= 7)
1276        assert_eq!(poll_error_backoff_secs(8), 5);
1277        assert_eq!(poll_error_backoff_secs(9), 60);
1278        assert_eq!(poll_error_backoff_secs(12), 5); // next probe
1279    }
1280
1281    #[test]
1282    fn test_poll_error_backoff_zero_errors() {
1283        // Zero consecutive errors should still produce a reasonable value
1284        assert_eq!(poll_error_backoff_secs(0), 5);
1285    }
1286
1287    #[test]
1288    fn test_poll_error_backoff_recovery_probes() {
1289        // Verify probes repeat at regular intervals once past threshold
1290        for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1291            assert_eq!(
1292                poll_error_backoff_secs(i as u32),
1293                5,
1294                "Expected recovery probe at error {i}"
1295            );
1296        }
1297    }
1298
1299    #[test]
1300    fn test_message_outcome_delete_carries_receipt_handle() {
1301        let handle = "test-receipt-handle-123".to_string();
1302        let outcome = MessageOutcome::Delete {
1303            receipt_handle: handle.clone(),
1304        };
1305        match outcome {
1306            MessageOutcome::Delete { receipt_handle } => {
1307                assert_eq!(receipt_handle, handle);
1308            }
1309            MessageOutcome::Retain => panic!("Expected Delete variant"),
1310        }
1311    }
1312
1313    #[test]
1314    fn test_message_outcome_retain() {
1315        let outcome = MessageOutcome::Retain;
1316        assert!(matches!(outcome, MessageOutcome::Retain));
1317    }
1318
1319    #[test]
1320    fn test_batch_delete_entry_builder() {
1321        // Verify DeleteMessageBatchRequestEntry builds correctly with sequential IDs,
1322        // matching the pattern used in flush_delete_batch.
1323        let handles = vec![
1324            "receipt-0".to_string(),
1325            "receipt-1".to_string(),
1326            "receipt-2".to_string(),
1327        ];
1328        let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1329            .iter()
1330            .enumerate()
1331            .map(|(i, handle)| {
1332                DeleteMessageBatchRequestEntry::builder()
1333                    .id(i.to_string())
1334                    .receipt_handle(handle)
1335                    .build()
1336                    .expect("id and receipt_handle are set")
1337            })
1338            .collect();
1339
1340        assert_eq!(entries.len(), 3);
1341        assert_eq!(entries[0].id(), "0");
1342        assert_eq!(entries[0].receipt_handle(), "receipt-0");
1343        assert_eq!(entries[2].id(), "2");
1344        assert_eq!(entries[2].receipt_handle(), "receipt-2");
1345    }
1346
1347    #[test]
1348    fn test_batch_chunking_logic() {
1349        // Verify that chunks(10) correctly splits receipt handles,
1350        // matching the pattern used in flush_delete_batch.
1351        let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1352        let chunks: Vec<&[String]> = handles.chunks(10).collect();
1353
1354        assert_eq!(chunks.len(), 3);
1355        assert_eq!(chunks[0].len(), 10);
1356        assert_eq!(chunks[1].len(), 10);
1357        assert_eq!(chunks[2].len(), 5);
1358    }
1359
1360    #[test]
1361    fn test_outcome_collection_pattern() {
1362        // Verify the pattern used in the main loop to collect receipt handles
1363        // from a mix of Delete and Retain outcomes.
1364        let outcomes = vec![
1365            Some("receipt-1".to_string()), // Delete
1366            None,                          // Retain
1367            Some("receipt-2".to_string()), // Delete
1368            None,                          // Retain
1369            Some("receipt-3".to_string()), // Delete
1370        ];
1371
1372        let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1373
1374        assert_eq!(pending_deletes.len(), 3);
1375        assert_eq!(pending_deletes[0], "receipt-1");
1376        assert_eq!(pending_deletes[1], "receipt-2");
1377        assert_eq!(pending_deletes[2], "receipt-3");
1378    }
1379
1380    // ── parse_target_scheduled_on: edge cases ─────────────────────────
1381
1382    #[test]
1383    fn test_parse_target_scheduled_on_non_numeric_string() {
1384        let message = Message::builder()
1385            .message_attributes(
1386                "target_scheduled_on",
1387                MessageAttributeValue::builder()
1388                    .data_type("String")
1389                    .string_value("not-a-number")
1390                    .build()
1391                    .unwrap(),
1392            )
1393            .build();
1394        assert_eq!(parse_target_scheduled_on(&message), None);
1395    }
1396
1397    #[test]
1398    fn test_parse_target_scheduled_on_empty_string() {
1399        let message = Message::builder()
1400            .message_attributes(
1401                "target_scheduled_on",
1402                MessageAttributeValue::builder()
1403                    .data_type("Number")
1404                    .string_value("")
1405                    .build()
1406                    .unwrap(),
1407            )
1408            .build();
1409        assert_eq!(parse_target_scheduled_on(&message), None);
1410    }
1411
1412    #[test]
1413    fn test_parse_target_scheduled_on_negative_value() {
1414        let message = Message::builder()
1415            .message_attributes(
1416                "target_scheduled_on",
1417                MessageAttributeValue::builder()
1418                    .data_type("Number")
1419                    .string_value("-1000")
1420                    .build()
1421                    .unwrap(),
1422            )
1423            .build();
1424        // Negative values parse fine as i64
1425        assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1426    }
1427
1428    #[test]
1429    fn test_parse_target_scheduled_on_float_string() {
1430        let message = Message::builder()
1431            .message_attributes(
1432                "target_scheduled_on",
1433                MessageAttributeValue::builder()
1434                    .data_type("Number")
1435                    .string_value("1234567890.5")
1436                    .build()
1437                    .unwrap(),
1438            )
1439            .build();
1440        // Floats can't parse as i64
1441        assert_eq!(parse_target_scheduled_on(&message), None);
1442    }
1443
1444    #[test]
1445    fn test_parse_target_scheduled_on_zero() {
1446        let message = Message::builder()
1447            .message_attributes(
1448                "target_scheduled_on",
1449                MessageAttributeValue::builder()
1450                    .data_type("Number")
1451                    .string_value("0")
1452                    .build()
1453                    .unwrap(),
1454            )
1455            .build();
1456        assert_eq!(parse_target_scheduled_on(&message), Some(0));
1457    }
1458
1459    #[test]
1460    fn test_parse_target_scheduled_on_wrong_attribute_name() {
1461        // Attribute exists but under a different key
1462        let message = Message::builder()
1463            .message_attributes(
1464                "wrong_key",
1465                MessageAttributeValue::builder()
1466                    .data_type("Number")
1467                    .string_value("1234567890")
1468                    .build()
1469                    .unwrap(),
1470            )
1471            .build();
1472        assert_eq!(parse_target_scheduled_on(&message), None);
1473    }
1474
1475    // ── parse_retry_attempt: edge cases ───────────────────────────────
1476
1477    #[test]
1478    fn test_parse_retry_attempt_non_numeric_string() {
1479        let message = Message::builder()
1480            .message_attributes(
1481                "retry_attempt",
1482                MessageAttributeValue::builder()
1483                    .data_type("String")
1484                    .string_value("abc")
1485                    .build()
1486                    .unwrap(),
1487            )
1488            .build();
1489        assert_eq!(parse_retry_attempt(&message), None);
1490    }
1491
1492    #[test]
1493    fn test_parse_retry_attempt_negative_value() {
1494        let message = Message::builder()
1495            .message_attributes(
1496                "retry_attempt",
1497                MessageAttributeValue::builder()
1498                    .data_type("Number")
1499                    .string_value("-1")
1500                    .build()
1501                    .unwrap(),
1502            )
1503            .build();
1504        // Negative values can't parse as usize
1505        assert_eq!(parse_retry_attempt(&message), None);
1506    }
1507
1508    #[test]
1509    fn test_parse_retry_attempt_zero() {
1510        let message = Message::builder()
1511            .message_attributes(
1512                "retry_attempt",
1513                MessageAttributeValue::builder()
1514                    .data_type("Number")
1515                    .string_value("0")
1516                    .build()
1517                    .unwrap(),
1518            )
1519            .build();
1520        assert_eq!(parse_retry_attempt(&message), Some(0));
1521    }
1522
1523    #[test]
1524    fn test_parse_retry_attempt_large_value() {
1525        let message = Message::builder()
1526            .message_attributes(
1527                "retry_attempt",
1528                MessageAttributeValue::builder()
1529                    .data_type("Number")
1530                    .string_value("999999")
1531                    .build()
1532                    .unwrap(),
1533            )
1534            .build();
1535        assert_eq!(parse_retry_attempt(&message), Some(999999));
1536    }
1537
1538    #[test]
1539    fn test_queue_pickup_baseline_ms_uses_scheduled_time_on_first_delivery() {
1540        let message = Message::builder()
1541            .message_attributes(
1542                "target_scheduled_on",
1543                MessageAttributeValue::builder()
1544                    .data_type("Number")
1545                    .string_value("123")
1546                    .build()
1547                    .unwrap(),
1548            )
1549            .set_attributes(Some(std::collections::HashMap::from([(
1550                MessageSystemAttributeName::SentTimestamp,
1551                "999999".to_string(),
1552            )])))
1553            .build();
1554
1555        // No retry_attempt attribute → first attempt
1556        assert_eq!(queue_pickup_baseline_ms(&message), Some(123_000));
1557    }
1558
1559    #[test]
1560    fn test_queue_pickup_baseline_ms_falls_back_to_sent_timestamp() {
1561        let message = Message::builder()
1562            .set_attributes(Some(std::collections::HashMap::from([(
1563                MessageSystemAttributeName::SentTimestamp,
1564                "123456".to_string(),
1565            )])))
1566            .build();
1567
1568        assert_eq!(queue_pickup_baseline_ms(&message), Some(123456));
1569    }
1570
1571    #[test]
1572    fn test_pickup_latency_secs_clamps_negative_skew() {
1573        // Consumer clock running 5s behind the broker (baseline is "in the future")
1574        // must not produce a negative-then-huge cast value — clamp to 0.
1575        let now_ms = 1_000_000_i64;
1576        let baseline_ms = now_ms + 5_000;
1577        assert_eq!(pickup_latency_secs(baseline_ms, now_ms), 0.0);
1578    }
1579
1580    #[test]
1581    fn test_pickup_latency_secs_positive_delta() {
1582        // 2.5s positive delta should be reported in seconds with ms precision.
1583        assert_eq!(pickup_latency_secs(1_000_000, 1_002_500), 2.5);
1584    }
1585
1586    #[test]
1587    fn test_queue_pickup_baseline_ms_skips_when_retry_attempt_positive() {
1588        let message = Message::builder()
1589            .message_attributes(
1590                "target_scheduled_on",
1591                MessageAttributeValue::builder()
1592                    .data_type("Number")
1593                    .string_value("123")
1594                    .build()
1595                    .unwrap(),
1596            )
1597            .message_attributes(
1598                "retry_attempt",
1599                MessageAttributeValue::builder()
1600                    .data_type("Number")
1601                    .string_value("1")
1602                    .build()
1603                    .unwrap(),
1604            )
1605            .set_attributes(Some(std::collections::HashMap::from([(
1606                MessageSystemAttributeName::SentTimestamp,
1607                "123456".to_string(),
1608            )])))
1609            .build();
1610
1611        // retry_attempt > 0 → genuine retry, skip
1612        assert_eq!(queue_pickup_baseline_ms(&message), None);
1613    }
1614
1615    #[test]
1616    fn test_queue_pickup_baseline_ms_accepts_retry_attempt_zero() {
1617        // Explicit retry_attempt=0 should be treated as first attempt
1618        // (consistent with absent attribute).
1619        let message = Message::builder()
1620            .message_attributes(
1621                "target_scheduled_on",
1622                MessageAttributeValue::builder()
1623                    .data_type("Number")
1624                    .string_value("777")
1625                    .build()
1626                    .unwrap(),
1627            )
1628            .message_attributes(
1629                "retry_attempt",
1630                MessageAttributeValue::builder()
1631                    .data_type("Number")
1632                    .string_value("0")
1633                    .build()
1634                    .unwrap(),
1635            )
1636            .build();
1637
1638        assert_eq!(queue_pickup_baseline_ms(&message), Some(777_000));
1639    }
1640
1641    #[test]
1642    fn test_queue_pickup_baseline_ms_skips_when_receive_count_gt_one() {
1643        // Receive count > 1 means the message has been delivered before,
1644        // which on FIFO queues happens for both scheduling defer-hops and
1645        // error retries — neither of which we want to record as a fresh
1646        // pickup. Gating on receive_count == 1 prevents this conflation.
1647        let message = Message::builder()
1648            .message_attributes(
1649                "target_scheduled_on",
1650                MessageAttributeValue::builder()
1651                    .data_type("Number")
1652                    .string_value("500")
1653                    .build()
1654                    .unwrap(),
1655            )
1656            .set_attributes(Some(std::collections::HashMap::from([
1657                (
1658                    MessageSystemAttributeName::ApproximateReceiveCount,
1659                    "2".to_string(),
1660                ),
1661                (MessageSystemAttributeName::SentTimestamp, "999".to_string()),
1662            ])))
1663            .build();
1664
1665        assert_eq!(queue_pickup_baseline_ms(&message), None);
1666    }
1667
1668    #[test]
1669    fn test_queue_pickup_baseline_ms_observes_when_receive_count_explicitly_one() {
1670        // Explicit receive_count=1 (first physical delivery) should be
1671        // observed, mirroring the implicit default when the attribute is
1672        // missing.
1673        let message = Message::builder()
1674            .message_attributes(
1675                "target_scheduled_on",
1676                MessageAttributeValue::builder()
1677                    .data_type("Number")
1678                    .string_value("250")
1679                    .build()
1680                    .unwrap(),
1681            )
1682            .set_attributes(Some(std::collections::HashMap::from([(
1683                MessageSystemAttributeName::ApproximateReceiveCount,
1684                "1".to_string(),
1685            )])))
1686            .build();
1687
1688        assert_eq!(queue_pickup_baseline_ms(&message), Some(250_000));
1689    }
1690
1691    // ── is_fifo_queue_url: comprehensive cases ────────────────────────
1692
1693    #[test]
1694    fn test_is_fifo_queue_url_empty_string() {
1695        assert!(!is_fifo_queue_url(""));
1696    }
1697
1698    #[test]
1699    fn test_is_fifo_queue_url_just_fifo_suffix() {
1700        assert!(is_fifo_queue_url("my-queue.fifo"));
1701    }
1702
1703    #[test]
1704    fn test_is_fifo_queue_url_fifo_in_middle() {
1705        // .fifo appearing in the path but not as suffix
1706        assert!(!is_fifo_queue_url(
1707            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1708        ));
1709    }
1710
1711    #[test]
1712    fn test_is_fifo_queue_url_case_sensitive() {
1713        assert!(!is_fifo_queue_url(
1714            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1715        ));
1716        assert!(!is_fifo_queue_url(
1717            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1718        ));
1719    }
1720
1721    #[test]
1722    fn test_is_fifo_queue_url_standard_queue_variations() {
1723        assert!(!is_fifo_queue_url(
1724            "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1725        ));
1726        assert!(!is_fifo_queue_url(
1727            "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1728        ));
1729        assert!(!is_fifo_queue_url(
1730            "http://localhost:4566/000000000000/test-queue"
1731        ));
1732    }
1733
1734    #[test]
1735    fn test_is_fifo_queue_url_localstack() {
1736        // LocalStack FIFO queue URL format
1737        assert!(is_fifo_queue_url(
1738            "http://localhost:4566/000000000000/test-queue.fifo"
1739        ));
1740    }
1741
1742    // ── map_handler_error: message preservation ───────────────────────
1743
1744    #[test]
1745    fn test_map_handler_error_preserves_abort_message() {
1746        let msg = "Validation failed: invalid nonce";
1747        let error = HandlerError::Abort(msg.to_string());
1748        match map_handler_error(error) {
1749            ProcessingError::Permanent(s) => assert_eq!(s, msg),
1750            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1751        }
1752    }
1753
1754    #[test]
1755    fn test_map_handler_error_preserves_retry_message() {
1756        let msg = "RPC timeout after 30s";
1757        let error = HandlerError::Retry(msg.to_string());
1758        match map_handler_error(error) {
1759            ProcessingError::Retryable(s) => assert_eq!(s, msg),
1760            ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1761        }
1762    }
1763
1764    #[test]
1765    fn test_map_handler_error_empty_message() {
1766        let error = HandlerError::Abort(String::new());
1767        match map_handler_error(error) {
1768            ProcessingError::Permanent(s) => assert!(s.is_empty()),
1769            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1770        }
1771    }
1772
1773    // ── handler_timeout_secs: all queue types ─────────────────────────
1774
1775    #[test]
1776    fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1777        let all = [
1778            QueueType::TransactionRequest,
1779            QueueType::TransactionSubmission,
1780            QueueType::StatusCheck,
1781            QueueType::StatusCheckEvm,
1782            QueueType::StatusCheckStellar,
1783            QueueType::Notification,
1784            QueueType::TokenSwapRequest,
1785            QueueType::RelayerHealthCheck,
1786        ];
1787        for qt in all {
1788            assert_eq!(
1789                handler_timeout_secs(qt),
1790                qt.visibility_timeout_secs().max(1) as u64,
1791                "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1792            );
1793        }
1794    }
1795
1796    // ── get_concurrency_for_queue: all queue types ────────────────────
1797
1798    #[test]
1799    fn test_get_concurrency_for_queue_all_types_positive() {
1800        let all = [
1801            QueueType::TransactionRequest,
1802            QueueType::TransactionSubmission,
1803            QueueType::StatusCheck,
1804            QueueType::StatusCheckEvm,
1805            QueueType::StatusCheckStellar,
1806            QueueType::Notification,
1807            QueueType::TokenSwapRequest,
1808            QueueType::RelayerHealthCheck,
1809        ];
1810        for qt in all {
1811            assert!(
1812                get_concurrency_for_queue(qt) > 0,
1813                "{qt:?}: concurrency must be positive (clamped to at least 1)"
1814            );
1815        }
1816    }
1817
1818    // ── poll_error_backoff_secs: overflow and invariants ───────────────
1819
1820    #[test]
1821    fn test_poll_error_backoff_never_exceeds_max() {
1822        for i in 0..200 {
1823            let backoff = poll_error_backoff_secs(i);
1824            assert!(
1825                backoff <= MAX_POLL_BACKOFF_SECS,
1826                "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1827            );
1828        }
1829    }
1830
1831    #[test]
1832    fn test_poll_error_backoff_u32_max_does_not_overflow() {
1833        let backoff = poll_error_backoff_secs(u32::MAX);
1834        assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1835        assert!(backoff > 0);
1836    }
1837
1838    #[test]
1839    fn test_poll_error_backoff_always_positive() {
1840        for i in 0..200 {
1841            assert!(
1842                poll_error_backoff_secs(i) > 0,
1843                "Error count {i}: backoff must be positive"
1844            );
1845        }
1846    }
1847
1848    #[test]
1849    fn test_poll_error_backoff_monotonic_before_cap() {
1850        // Before hitting the cap, backoff should be non-decreasing
1851        let mut prev = poll_error_backoff_secs(0);
1852        for i in 1..=4 {
1853            let curr = poll_error_backoff_secs(i);
1854            assert!(
1855                curr >= prev,
1856                "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1857            );
1858            prev = curr;
1859        }
1860    }
1861
1862    // ── Constants validation ──────────────────────────────────────────
1863
1864    #[test]
1865    fn test_max_poll_backoff_is_reasonable() {
1866        assert!(
1867            MAX_POLL_BACKOFF_SECS >= 10,
1868            "Max backoff should be at least 10s to avoid tight error loops"
1869        );
1870        assert!(
1871            MAX_POLL_BACKOFF_SECS <= 300,
1872            "Max backoff should be at most 5 minutes to detect recovery promptly"
1873        );
1874    }
1875
1876    #[test]
1877    fn test_recovery_probe_every_is_valid() {
1878        assert!(
1879            RECOVERY_PROBE_EVERY >= 2,
1880            "Recovery probe interval must be at least 2 to avoid probing every attempt"
1881        );
1882        assert!(
1883            RECOVERY_PROBE_EVERY <= 10,
1884            "Recovery probe interval should not be too large or recovery detection is slow"
1885        );
1886    }
1887
1888    // ── compute_status_retry_delay: edge cases ────────────────────────
1889
1890    #[test]
1891    fn test_compute_status_retry_delay_very_high_attempt() {
1892        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1893        // Very high attempts should stay capped at the max (12s for EVM)
1894        assert_eq!(compute_status_retry_delay(body, 1000), 12);
1895        assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1896    }
1897
1898    #[test]
1899    fn test_compute_status_retry_delay_empty_body() {
1900        // Empty JSON body should fall back to generic/Solana defaults
1901        assert_eq!(compute_status_retry_delay("", 0), 5);
1902        assert_eq!(compute_status_retry_delay("{}", 0), 5);
1903    }
1904
1905    #[test]
1906    fn test_compute_status_retry_delay_partial_json() {
1907        // JSON with missing inner structure
1908        assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1909        assert_eq!(
1910            compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1911            8
1912        );
1913    }
1914
1915    // ── PartialStatusCheckJob deserialization ──────────────────────────
1916
1917    #[test]
1918    fn test_partial_status_check_job_deserializes_network_type() {
1919        let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1920        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1921        assert_eq!(
1922            parsed.data.network_type,
1923            Some(crate::models::NetworkType::Evm)
1924        );
1925    }
1926
1927    #[test]
1928    fn test_partial_status_check_job_handles_missing_network_type() {
1929        let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1930        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1931        assert_eq!(parsed.data.network_type, None);
1932    }
1933
1934    #[test]
1935    fn test_partial_status_check_job_rejects_missing_data() {
1936        let body = r#"{"not_data":{}}"#;
1937        let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1938        assert!(result.is_err());
1939    }
1940
1941    // ── is_fifo_queue_url used consistently ───────────────────────────
1942
1943    #[test]
1944    fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1945        // Both defer_message and the retry path in process_message use
1946        // is_fifo_queue_url to decide between visibility-timeout vs re-enqueue.
1947        // Verify our standard and FIFO URLs are classified identically by both
1948        // call sites (they both call the same function).
1949        let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1950        let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1951
1952        assert!(!is_fifo_queue_url(standard));
1953        assert!(is_fifo_queue_url(fifo));
1954    }
1955
1956    // ── get_wait_time_for_queue ──────────────────────────────────────────
1957
1958    #[test]
1959    fn test_get_wait_time_for_queue_returns_positive() {
1960        let all = [
1961            QueueType::TransactionRequest,
1962            QueueType::TransactionSubmission,
1963            QueueType::StatusCheck,
1964            QueueType::StatusCheckEvm,
1965            QueueType::StatusCheckStellar,
1966            QueueType::Notification,
1967            QueueType::TokenSwapRequest,
1968            QueueType::RelayerHealthCheck,
1969        ];
1970        for qt in all {
1971            let wt = get_wait_time_for_queue(qt);
1972            assert!(
1973                wt <= 20,
1974                "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1975            );
1976        }
1977    }
1978
1979    #[test]
1980    fn test_get_wait_time_for_queue_matches_defaults() {
1981        // Without env overrides the helper should return the queue's default
1982        assert_eq!(
1983            get_wait_time_for_queue(QueueType::TransactionRequest),
1984            QueueType::TransactionRequest.default_wait_time_secs()
1985        );
1986        assert_eq!(
1987            get_wait_time_for_queue(QueueType::StatusCheck),
1988            QueueType::StatusCheck.default_wait_time_secs()
1989        );
1990    }
1991
1992    #[test]
1993    #[serial_test::serial]
1994    fn test_get_wait_time_for_queue_respects_env_override() {
1995        // StatusCheck default is 5; override to 12 via the real env var path
1996        let env_var = format!(
1997            "SQS_{}_WAIT_TIME_SECONDS",
1998            QueueType::StatusCheck.sqs_env_key()
1999        );
2000        std::env::set_var(&env_var, "12");
2001        assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
2002        std::env::remove_var(&env_var);
2003    }
2004
2005    #[test]
2006    #[serial_test::serial]
2007    fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
2008        let env_var = format!(
2009            "SQS_{}_WAIT_TIME_SECONDS",
2010            QueueType::Notification.sqs_env_key()
2011        );
2012        std::env::set_var(&env_var, "99");
2013        assert_eq!(
2014            get_wait_time_for_queue(QueueType::Notification),
2015            20,
2016            "Should clamp to SQS maximum of 20"
2017        );
2018        std::env::remove_var(&env_var);
2019    }
2020
2021    // ── get_poller_count_for_queue ───────────────────────────────────────
2022
2023    #[test]
2024    fn test_get_poller_count_for_queue_all_types_positive() {
2025        let all = [
2026            QueueType::TransactionRequest,
2027            QueueType::TransactionSubmission,
2028            QueueType::StatusCheck,
2029            QueueType::StatusCheckEvm,
2030            QueueType::StatusCheckStellar,
2031            QueueType::Notification,
2032            QueueType::TokenSwapRequest,
2033            QueueType::RelayerHealthCheck,
2034        ];
2035        for qt in all {
2036            assert!(
2037                get_poller_count_for_queue(qt) >= 1,
2038                "{qt:?}: poller count must be at least 1"
2039            );
2040        }
2041    }
2042
2043    #[test]
2044    fn test_get_poller_count_for_queue_matches_defaults() {
2045        // Without env overrides the helper should return the queue's default (clamped to >= 1)
2046        assert_eq!(
2047            get_poller_count_for_queue(QueueType::TransactionRequest),
2048            QueueType::TransactionRequest.default_poller_count().max(1)
2049        );
2050        assert_eq!(
2051            get_poller_count_for_queue(QueueType::Notification),
2052            QueueType::Notification.default_poller_count().max(1)
2053        );
2054    }
2055
2056    #[test]
2057    #[serial_test::serial]
2058    fn test_get_poller_count_for_queue_respects_env_override() {
2059        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
2060        std::env::set_var(&env_var, "5");
2061        assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
2062        std::env::remove_var(&env_var);
2063    }
2064
2065    #[test]
2066    #[serial_test::serial]
2067    fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
2068        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
2069        std::env::set_var(&env_var, "0");
2070        assert_eq!(
2071            get_poller_count_for_queue(QueueType::StatusCheck),
2072            1,
2073            "Zero poller count from env should be clamped to 1"
2074        );
2075        std::env::remove_var(&env_var);
2076    }
2077
2078    // ── PollLoopConfig ──────────────────────────────────────────────────
2079
2080    #[test]
2081    fn test_poll_loop_config_clone() {
2082        let config = PollLoopConfig {
2083            queue_type: QueueType::TransactionRequest,
2084            polling_interval: 15,
2085            visibility_timeout: 120,
2086            handler_timeout: Duration::from_secs(120),
2087            max_retries: 3,
2088            poller_id: 0,
2089            poller_count: 2,
2090        };
2091        let cloned = config.clone();
2092        assert_eq!(cloned.polling_interval, 15);
2093        assert_eq!(cloned.poller_id, 0);
2094        assert_eq!(cloned.poller_count, 2);
2095        assert_eq!(cloned.max_retries, 3);
2096    }
2097}