1use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14 DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::metrics::observe_queue_pickup_latency;
23use crate::queues::{backoff_config_for_queue, retry_delay_secs};
24use crate::{
25 config::ServerConfig,
26 jobs::{
27 notification_handler, relayer_health_check_handler, token_swap_request_handler,
28 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
29 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30 TransactionSend, TransactionStatusCheck,
31 },
32 utils::{aws_error::DisplayErrorContext, classify_sdk_error},
33};
34
35use super::{HandlerError, WorkerContext};
36use super::{QueueBackendError, QueueType, WorkerHandle};
37
38#[derive(Debug)]
39enum ProcessingError {
40 Retryable(String),
41 Permanent(String),
42}
43
44#[derive(Debug)]
47enum MessageOutcome {
48 Delete { receipt_handle: String },
50 Retain,
53}
54
55#[derive(Clone)]
58struct PollLoopConfig {
59 queue_type: QueueType,
60 polling_interval: u64,
61 visibility_timeout: u32,
62 handler_timeout: Duration,
63 max_retries: usize,
64 poller_id: usize,
65 poller_count: usize,
66}
67
68pub async fn spawn_worker_for_queue(
82 sqs_client: aws_sdk_sqs::Client,
83 queue_type: QueueType,
84 queue_url: String,
85 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
86 shutdown_rx: watch::Receiver<bool>,
87) -> Result<WorkerHandle, QueueBackendError> {
88 let concurrency = get_concurrency_for_queue(queue_type);
89 let max_retries = queue_type.max_retries();
90 let polling_interval = get_wait_time_for_queue(queue_type);
91 let poller_count = get_poller_count_for_queue(queue_type);
92 let visibility_timeout = queue_type.visibility_timeout_secs();
93 let handler_timeout_secs = handler_timeout_secs(queue_type);
94 let handler_timeout = Duration::from_secs(handler_timeout_secs);
95
96 info!(
97 queue_type = ?queue_type,
98 queue_url = %queue_url,
99 concurrency = concurrency,
100 max_retries = max_retries,
101 polling_interval_secs = polling_interval,
102 poller_count = poller_count,
103 visibility_timeout_secs = visibility_timeout,
104 handler_timeout_secs = handler_timeout_secs,
105 "Spawning SQS worker"
106 );
107
108 let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
110
111 let handle: JoinHandle<()> = tokio::spawn(async move {
112 let mut poller_handles: JoinSet<()> = JoinSet::new();
113
114 for poller_id in 0..poller_count {
115 let client = sqs_client.clone();
116 let url = queue_url.clone();
117 let state = app_state.clone();
118 let sem = semaphore.clone();
119 let mut rx = shutdown_rx.clone();
120 let config = PollLoopConfig {
121 queue_type,
122 polling_interval,
123 visibility_timeout,
124 handler_timeout,
125 max_retries,
126 poller_id,
127 poller_count,
128 };
129
130 poller_handles.spawn(async move {
131 run_poll_loop(client, url, state, sem, &mut rx, config).await;
132 });
133 }
134
135 while let Some(join_result) = poller_handles.join_next().await {
137 if let Err(err) = join_result {
138 error!(
139 queue_type = ?queue_type,
140 error = %err,
141 "SQS poller task terminated unexpectedly"
142 );
143 }
144 }
145 info!(queue_type = ?queue_type, "SQS worker stopped");
146 });
147
148 Ok(WorkerHandle::Tokio(handle))
149}
150
151async fn run_poll_loop(
154 sqs_client: aws_sdk_sqs::Client,
155 queue_url: String,
156 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
157 semaphore: Arc<tokio::sync::Semaphore>,
158 shutdown_rx: &mut watch::Receiver<bool>,
159 config: PollLoopConfig,
160) {
161 let PollLoopConfig {
162 queue_type,
163 polling_interval,
164 visibility_timeout,
165 handler_timeout,
166 max_retries,
167 poller_id,
168 poller_count,
169 } = config;
170 let mut inflight: JoinSet<Option<String>> = JoinSet::new();
171 let mut consecutive_poll_errors: u32 = 0;
172 let mut pending_deletes: Vec<String> = Vec::new();
173
174 loop {
175 while let Some(result) = inflight.try_join_next() {
177 match result {
178 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
179 Ok(None) => {} Err(e) => {
181 warn!(
182 queue_type = ?queue_type,
183 poller_id = poller_id,
184 error = %e,
185 "In-flight task failed"
186 );
187 }
188 }
189 }
190
191 if !pending_deletes.is_empty() {
193 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
194 pending_deletes.clear();
195 }
196
197 if *shutdown_rx.borrow() {
199 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
200 break;
201 }
202
203 let available_permits = semaphore.available_permits();
210 let base_share = available_permits / poller_count;
211 let remainder = available_permits % poller_count;
212 let my_share = base_share + usize::from(poller_id < remainder);
213 if my_share == 0 {
214 tokio::select! {
215 _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
216 _ = shutdown_rx.changed() => {
217 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
218 break;
219 }
220 }
221 }
222
223 let batch_size = my_share.min(10) as i32;
225
226 let messages_result = tokio::select! {
228 result = sqs_client
229 .receive_message()
230 .queue_url(&queue_url)
231 .max_number_of_messages(batch_size) .wait_time_seconds(polling_interval as i32)
233 .visibility_timeout(visibility_timeout as i32)
234 .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
235 .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
236 .message_system_attribute_names(MessageSystemAttributeName::SentTimestamp)
237 .message_attribute_names("target_scheduled_on")
238 .message_attribute_names("retry_attempt")
239 .send() => result,
240 _ = shutdown_rx.changed() => {
241 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
242 break;
243 }
244 };
245
246 match messages_result {
247 Ok(output) => {
248 if consecutive_poll_errors > 0 {
249 info!(
250 queue_type = ?queue_type,
251 poller_id = poller_id,
252 previous_errors = consecutive_poll_errors,
253 "SQS polling recovered after consecutive errors"
254 );
255 }
256 consecutive_poll_errors = 0;
257
258 if let Some(messages) = output.messages {
259 if !messages.is_empty() {
260 debug!(
261 queue_type = ?queue_type,
262 poller_id = poller_id,
263 message_count = messages.len(),
264 "Received messages from SQS"
265 );
266
267 for message in messages {
269 let permit = match semaphore.clone().acquire_owned().await {
270 Ok(permit) => permit,
271 Err(err) => {
272 error!(
273 queue_type = ?queue_type,
274 poller_id = poller_id,
275 error = %err,
276 "Semaphore closed, stopping SQS poller loop"
277 );
278 return;
279 }
280 };
281 let client = sqs_client.clone();
282 let url = queue_url.clone();
283 let state = app_state.clone();
284
285 inflight.spawn(async move {
286 let _permit = permit; let result = tokio::time::timeout(
289 handler_timeout,
290 AssertUnwindSafe(process_message(
291 client.clone(),
292 message,
293 queue_type,
294 &url,
295 state,
296 max_retries,
297 ))
298 .catch_unwind(),
299 )
300 .await;
301
302 match result {
303 Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
304 Some(receipt_handle)
305 }
306 Ok(Ok(Ok(MessageOutcome::Retain))) => None,
307 Ok(Ok(Err(e))) => {
308 error!(
309 queue_type = ?queue_type,
310 error = %e,
311 "Failed to process message"
312 );
313 None
314 }
315 Ok(Err(panic_info)) => {
316 let msg = panic_info
317 .downcast_ref::<String>()
318 .map(|s| s.as_str())
319 .or_else(|| {
320 panic_info.downcast_ref::<&str>().copied()
321 })
322 .unwrap_or("unknown panic");
323 error!(
324 queue_type = ?queue_type,
325 panic = %msg,
326 "Message handler panicked"
327 );
328 None
329 }
330 Err(_) => {
331 error!(
332 queue_type = ?queue_type,
333 timeout_secs = handler_timeout.as_secs(),
334 "Message handler timed out; message will be retried after visibility timeout"
335 );
336 None
337 }
338 }
339 });
340 }
341 }
342 }
343 }
344 Err(e) => {
345 consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
346 let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
347 let (error_code, error_message) = match &e {
348 SdkError::ServiceError(ctx) => (ctx.err().code(), ctx.err().message()),
349 _ => (None, None),
350 };
351 error!(
352 queue_type = ?queue_type,
353 poller_id = poller_id,
354 error.kind = classify_sdk_error(&e),
355 error.detail = %DisplayErrorContext(&e),
356 error_code = error_code.unwrap_or("unknown"),
357 error_message = error_message.unwrap_or("n/a"),
358 consecutive_errors = consecutive_poll_errors,
359 backoff_secs = backoff_secs,
360 "Failed to receive messages from SQS, backing off"
361 );
362 tokio::select! {
363 _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
364 _ = shutdown_rx.changed() => {
365 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
366 break;
367 }
368 }
369 }
370 }
371 }
372
373 if !inflight.is_empty() {
375 info!(
376 queue_type = ?queue_type,
377 poller_id = poller_id,
378 count = inflight.len(),
379 "Draining in-flight tasks before shutdown"
380 );
381 match tokio::time::timeout(Duration::from_secs(30), async {
382 while let Some(result) = inflight.join_next().await {
383 match result {
384 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
385 Ok(None) => {}
386 Err(e) => {
387 warn!(
388 queue_type = ?queue_type,
389 poller_id = poller_id,
390 error = %e,
391 "In-flight task failed during drain"
392 );
393 }
394 }
395 }
396 })
397 .await
398 {
399 Ok(()) => {
400 info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
401 }
402 Err(_) => {
403 warn!(
404 queue_type = ?queue_type,
405 poller_id = poller_id,
406 remaining = inflight.len(),
407 "Drain timeout, abandoning remaining tasks"
408 );
409 inflight.abort_all();
410 }
411 }
412 }
413
414 if !pending_deletes.is_empty() {
416 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
417 }
418}
419
420async fn process_message(
425 sqs_client: aws_sdk_sqs::Client,
426 message: Message,
427 queue_type: QueueType,
428 queue_url: &str,
429 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
430 max_retries: usize,
431) -> Result<MessageOutcome, QueueBackendError> {
432 let body = message
433 .body()
434 .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
435
436 let receipt_handle = message
437 .receipt_handle()
438 .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
439
440 if let Some(baseline) = queue_pickup_baseline_ms(&message) {
454 let now_ms = chrono::Utc::now().timestamp_millis();
455 let delta_ms = now_ms - baseline;
460 if parse_target_scheduled_on(&message).is_none()
461 && delta_ms > PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS
462 {
463 warn!(
464 queue_type = ?queue_type,
465 latency_ms = delta_ms,
466 "queue_pickup_latency above sanity threshold for non-scheduled SQS message; check broker/consumer clock skew"
467 );
468 }
469 observe_queue_pickup_latency(
470 queue_type.queue_name(),
471 "sqs",
472 pickup_latency_secs(baseline, now_ms),
473 );
474 }
475
476 if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
478 let now = std::time::SystemTime::now()
479 .duration_since(std::time::SystemTime::UNIX_EPOCH)
480 .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
481 .as_secs() as i64;
482 let remaining = target_scheduled_on - now;
483 if remaining > 0 {
484 let should_delete_original = defer_message(
485 &sqs_client,
486 queue_url,
487 body.to_string(),
488 &message,
489 target_scheduled_on,
490 remaining.min(900) as i32,
491 )
492 .await?;
493
494 debug!(
495 queue_type = ?queue_type,
496 remaining_seconds = remaining,
497 "Deferred scheduled SQS message for next delay hop"
498 );
499 return if should_delete_original {
500 Ok(MessageOutcome::Delete {
501 receipt_handle: receipt_handle.to_string(),
502 })
503 } else {
504 Ok(MessageOutcome::Retain)
505 };
506 }
507 }
508
509 let receive_count = message
511 .attributes()
512 .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
513 .and_then(|count| count.parse::<usize>().ok())
514 .unwrap_or(1);
515 let attempt_number = receive_count.saturating_sub(1);
517 let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
520
521 let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
523
524 debug!(
525 queue_type = ?queue_type,
526 message_id = %sqs_message_id,
527 attempt = attempt_number,
528 receive_count = receive_count,
529 max_retries = max_retries,
530 "Processing message"
531 );
532
533 let result = match queue_type {
535 QueueType::TransactionRequest => {
536 process_job::<TransactionRequest, _, _>(
537 body,
538 app_state,
539 attempt_number,
540 sqs_message_id,
541 "TransactionRequest",
542 transaction_request_handler,
543 )
544 .await
545 }
546 QueueType::TransactionSubmission => {
547 process_job::<TransactionSend, _, _>(
548 body,
549 app_state,
550 attempt_number,
551 sqs_message_id,
552 "TransactionSend",
553 transaction_submission_handler,
554 )
555 .await
556 }
557 QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
558 process_job::<TransactionStatusCheck, _, _>(
559 body,
560 app_state,
561 attempt_number,
562 sqs_message_id,
563 "TransactionStatusCheck",
564 transaction_status_handler,
565 )
566 .await
567 }
568 QueueType::Notification => {
569 process_job::<NotificationSend, _, _>(
570 body,
571 app_state,
572 attempt_number,
573 sqs_message_id,
574 "NotificationSend",
575 notification_handler,
576 )
577 .await
578 }
579 QueueType::TokenSwapRequest => {
580 process_job::<TokenSwapRequest, _, _>(
581 body,
582 app_state,
583 attempt_number,
584 sqs_message_id,
585 "TokenSwapRequest",
586 token_swap_request_handler,
587 )
588 .await
589 }
590 QueueType::RelayerHealthCheck => {
591 process_job::<RelayerHealthCheck, _, _>(
592 body,
593 app_state,
594 attempt_number,
595 sqs_message_id,
596 "RelayerHealthCheck",
597 relayer_health_check_handler,
598 )
599 .await
600 }
601 };
602
603 match result {
604 Ok(()) => {
605 debug!(
606 queue_type = ?queue_type,
607 attempt = attempt_number,
608 "Message processed successfully"
609 );
610
611 Ok(MessageOutcome::Delete {
612 receipt_handle: receipt_handle.to_string(),
613 })
614 }
615 Err(ProcessingError::Permanent(e)) => {
616 error!(
617 queue_type = ?queue_type,
618 attempt = attempt_number,
619 error = %e,
620 "Permanent handler failure, message will be deleted"
621 );
622
623 Ok(MessageOutcome::Delete {
624 receipt_handle: receipt_handle.to_string(),
625 })
626 }
627 Err(ProcessingError::Retryable(e)) => {
628 if max_retries != usize::MAX && receive_count > max_retries {
630 error!(
631 queue_type = ?queue_type,
632 attempt = attempt_number,
633 receive_count = receive_count,
634 max_retries = max_retries,
635 error = %e,
636 "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
637 );
638 return Ok(MessageOutcome::Retain);
639 }
640
641 let delay = if queue_type.is_status_check() {
645 compute_status_retry_delay(body, logical_retry_attempt)
646 } else {
647 retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
648 };
649
650 if is_fifo_queue_url(queue_url) {
653 if let Err(err) = sqs_client
654 .change_message_visibility()
655 .queue_url(queue_url)
656 .receipt_handle(receipt_handle)
657 .visibility_timeout(delay.clamp(1, 900))
658 .send()
659 .await
660 {
661 error!(
662 queue_type = ?queue_type,
663 error = %err,
664 "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
665 );
666 return Ok(MessageOutcome::Retain);
667 }
668
669 debug!(
670 queue_type = ?queue_type,
671 attempt = logical_retry_attempt,
672 delay_seconds = delay,
673 error = %e,
674 "Retry scheduled via visibility timeout"
675 );
676
677 return Ok(MessageOutcome::Retain);
678 }
679
680 let next_retry_attempt = logical_retry_attempt.saturating_add(1);
681
682 if let Err(send_err) = sqs_client
686 .send_message()
687 .queue_url(queue_url)
688 .message_body(body.to_string())
689 .delay_seconds(delay)
690 .message_attributes(
691 "retry_attempt",
692 MessageAttributeValue::builder()
693 .data_type("Number")
694 .string_value(next_retry_attempt.to_string())
695 .build()
696 .map_err(|err| {
697 QueueBackendError::SqsError(format!(
698 "Failed to build retry_attempt attribute: {err}"
699 ))
700 })?,
701 )
702 .send()
703 .await
704 {
705 error!(
706 queue_type = ?queue_type,
707 error.kind = classify_sdk_error(&send_err),
708 error.detail = %DisplayErrorContext(&send_err),
709 "Failed to re-enqueue message; leaving original for visibility timeout retry"
710 );
711 return Ok(MessageOutcome::Retain);
713 }
714
715 debug!(
716 queue_type = ?queue_type,
717 attempt = logical_retry_attempt,
718 delay_seconds = delay,
719 error = %e,
720 "Message re-enqueued with backoff delay"
721 );
722
723 Ok(MessageOutcome::Delete {
725 receipt_handle: receipt_handle.to_string(),
726 })
727 }
728 }
729}
730
731async fn process_job<T, F, Fut>(
734 body: &str,
735 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
736 attempt: usize,
737 task_id: String,
738 type_name: &str,
739 handler: F,
740) -> Result<(), ProcessingError>
741where
742 T: DeserializeOwned,
743 F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
744 Fut: Future<Output = Result<(), HandlerError>>,
745{
746 let job: Job<T> = serde_json::from_str(body).map_err(|e| {
747 error!(error = %e, "Failed to deserialize {} job", type_name);
748 ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
750 })?;
751
752 let ctx = WorkerContext::new(attempt, task_id);
753 handler(job, (*app_state).clone(), ctx)
754 .await
755 .map_err(map_handler_error)
756}
757
758fn map_handler_error(error: HandlerError) -> ProcessingError {
759 match error {
760 HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
761 HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
762 }
763}
764
765fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
766 message
767 .message_attributes()
768 .and_then(|attrs| attrs.get("target_scheduled_on"))
769 .and_then(|value| value.string_value())
770 .and_then(|value| value.parse::<i64>().ok())
771}
772
773fn parse_retry_attempt(message: &Message) -> Option<usize> {
774 message
775 .message_attributes()
776 .and_then(|attrs| attrs.get("retry_attempt"))
777 .and_then(|value| value.string_value())
778 .and_then(|value| value.parse::<usize>().ok())
779}
780
781fn pickup_latency_secs(baseline_ms: i64, now_ms: i64) -> f64 {
785 (now_ms - baseline_ms).max(0) as f64 / 1000.0
786}
787
788const PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS: i64 = 60 * 60 * 1000;
793
794fn queue_pickup_baseline_ms(message: &Message) -> Option<i64> {
795 let receive_count = message
821 .attributes()
822 .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
823 .and_then(|count| count.parse::<usize>().ok())
824 .unwrap_or(1);
825 if receive_count != 1 {
826 return None;
827 }
828
829 if parse_retry_attempt(message).is_some_and(|n| n > 0) {
833 return None;
834 }
835
836 parse_target_scheduled_on(message)
837 .map(|ts_secs| ts_secs * 1000)
838 .or_else(|| {
839 message
840 .attributes()
841 .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp))
842 .and_then(|v| v.parse::<i64>().ok())
843 })
844}
845
846fn is_fifo_queue_url(queue_url: &str) -> bool {
847 queue_url.ends_with(".fifo")
848}
849
850async fn defer_message(
851 sqs_client: &aws_sdk_sqs::Client,
852 queue_url: &str,
853 body: String,
854 message: &Message,
855 target_scheduled_on: i64,
856 delay_seconds: i32,
857) -> Result<bool, QueueBackendError> {
858 if is_fifo_queue_url(queue_url) {
859 let receipt_handle = message.receipt_handle().ok_or_else(|| {
860 QueueBackendError::QueueError(
861 "Cannot defer FIFO message: missing receipt handle".to_string(),
862 )
863 })?;
864
865 sqs_client
866 .change_message_visibility()
867 .queue_url(queue_url)
868 .receipt_handle(receipt_handle)
869 .visibility_timeout(delay_seconds.clamp(1, 900))
870 .send()
871 .await
872 .map_err(|e| {
873 error!(
874 error.kind = classify_sdk_error(&e),
875 error.detail = %DisplayErrorContext(&e),
876 queue_url = %queue_url,
877 "Failed to defer FIFO message via visibility timeout"
878 );
879 QueueBackendError::SqsError(format!(
880 "Failed to defer FIFO message via visibility timeout: {}",
881 classify_sdk_error(&e)
882 ))
883 })?;
884
885 return Ok(false);
886 }
887
888 let request = sqs_client
891 .send_message()
892 .queue_url(queue_url)
893 .message_body(body)
894 .delay_seconds(delay_seconds.clamp(1, 900))
895 .message_attributes(
896 "target_scheduled_on",
897 MessageAttributeValue::builder()
898 .data_type("Number")
899 .string_value(target_scheduled_on.to_string())
900 .build()
901 .map_err(|e| {
902 QueueBackendError::SqsError(format!(
903 "Failed to build deferred scheduled attribute: {e}"
904 ))
905 })?,
906 );
907
908 request.send().await.map_err(|e| {
909 error!(
910 error.kind = classify_sdk_error(&e),
911 error.detail = %DisplayErrorContext(&e),
912 queue_url = %queue_url,
913 "Failed to defer scheduled message"
914 );
915 QueueBackendError::SqsError(format!(
916 "Failed to defer scheduled message: {}",
917 classify_sdk_error(&e)
918 ))
919 })?;
920
921 Ok(true)
922}
923
924#[derive(serde::Deserialize)]
929struct StatusCheckData {
930 network_type: Option<crate::models::NetworkType>,
931}
932
933#[derive(serde::Deserialize)]
938struct PartialStatusCheckJob {
939 data: StatusCheckData,
940}
941
942fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
949 let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
950 .ok()
951 .and_then(|j| j.data.network_type);
952
953 crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
954}
955
956fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
958 ServerConfig::get_sqs_wait_time(
959 queue_type.sqs_env_key(),
960 queue_type.default_wait_time_secs(),
961 )
962}
963
964fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
966 let configured = ServerConfig::get_sqs_poller_count(
967 queue_type.sqs_env_key(),
968 queue_type.default_poller_count(),
969 );
970 if configured == 0 {
971 warn!(
972 queue_type = ?queue_type,
973 "Configured poller count is 0; clamping to 1"
974 );
975 1
976 } else {
977 configured
978 }
979}
980
981fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
983 let configured = ServerConfig::get_worker_concurrency(
984 queue_type.concurrency_env_key(),
985 queue_type.default_concurrency(),
986 );
987 if configured == 0 {
988 warn!(
989 queue_type = ?queue_type,
990 "Configured concurrency is 0; clamping to 1"
991 );
992 1
993 } else {
994 configured
995 }
996}
997
998fn handler_timeout_secs(queue_type: QueueType) -> u64 {
1002 u64::from(queue_type.visibility_timeout_secs().max(1))
1003}
1004
1005const MAX_POLL_BACKOFF_SECS: u64 = 60;
1007
1008const RECOVERY_PROBE_EVERY: u32 = 4;
1012
1013fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
1017 let base: u64 = 5;
1018
1019 if consecutive_errors >= 7 && consecutive_errors.is_multiple_of(RECOVERY_PROBE_EVERY) {
1022 return base;
1023 }
1024
1025 let exponent = consecutive_errors.saturating_sub(1).min(16);
1026 base.saturating_mul(2_u64.saturating_pow(exponent))
1027 .min(MAX_POLL_BACKOFF_SECS)
1028}
1029
1030async fn flush_delete_batch(
1036 sqs_client: &aws_sdk_sqs::Client,
1037 queue_url: &str,
1038 batch: &[String],
1039 queue_type: QueueType,
1040) -> usize {
1041 if batch.is_empty() {
1042 return 0;
1043 }
1044
1045 let mut deleted = 0;
1046
1047 for chunk in batch.chunks(10) {
1048 let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
1049 .iter()
1050 .enumerate()
1051 .map(|(i, handle)| {
1052 DeleteMessageBatchRequestEntry::builder()
1053 .id(i.to_string())
1054 .receipt_handle(handle)
1055 .build()
1056 .expect("id and receipt_handle are always set")
1057 })
1058 .collect();
1059
1060 match sqs_client
1061 .delete_message_batch()
1062 .queue_url(queue_url)
1063 .set_entries(Some(entries))
1064 .send()
1065 .await
1066 {
1067 Ok(output) => {
1068 deleted += output.successful().len();
1069
1070 for f in output.failed() {
1071 warn!(
1072 queue_type = ?queue_type,
1073 id = %f.id(),
1074 code = %f.code(),
1075 message = f.message().unwrap_or("unknown"),
1076 "Batch delete entry failed (message will be redelivered)"
1077 );
1078 }
1079 }
1080 Err(e) => {
1081 error!(
1082 queue_type = ?queue_type,
1083 error.kind = classify_sdk_error(&e),
1084 error.detail = %DisplayErrorContext(&e),
1085 batch_size = chunk.len(),
1086 "Batch delete API call failed (messages will be redelivered)"
1087 );
1088 }
1089 }
1090 }
1091
1092 deleted
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097 use super::*;
1098
1099 #[test]
1100 fn test_get_concurrency_for_queue() {
1101 let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
1103 assert!(concurrency > 0);
1104
1105 let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
1106 assert!(concurrency > 0);
1107 }
1108
1109 #[test]
1110 fn test_handler_timeout_secs_is_positive() {
1111 let all = [
1112 QueueType::TransactionRequest,
1113 QueueType::TransactionSubmission,
1114 QueueType::StatusCheck,
1115 QueueType::StatusCheckEvm,
1116 QueueType::StatusCheckStellar,
1117 QueueType::Notification,
1118 QueueType::TokenSwapRequest,
1119 QueueType::RelayerHealthCheck,
1120 ];
1121 for queue_type in all {
1122 assert!(handler_timeout_secs(queue_type) > 0);
1123 }
1124 }
1125
1126 #[test]
1127 fn test_handler_timeout_secs_uses_visibility_timeout() {
1128 assert_eq!(
1129 handler_timeout_secs(QueueType::StatusCheckEvm),
1130 QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1131 );
1132 assert_eq!(
1133 handler_timeout_secs(QueueType::Notification),
1134 QueueType::Notification.visibility_timeout_secs() as u64
1135 );
1136 }
1137
1138 #[test]
1139 fn test_parse_target_scheduled_on() {
1140 let message = Message::builder().build();
1142
1143 assert_eq!(parse_target_scheduled_on(&message), None);
1145
1146 let message = Message::builder()
1148 .message_attributes(
1149 "target_scheduled_on",
1150 MessageAttributeValue::builder()
1151 .data_type("Number")
1152 .string_value("1234567890")
1153 .build()
1154 .unwrap(),
1155 )
1156 .build();
1157
1158 assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1159 }
1160
1161 #[test]
1162 fn test_parse_retry_attempt() {
1163 let message = Message::builder().build();
1164 assert_eq!(parse_retry_attempt(&message), None);
1165
1166 let message = Message::builder()
1167 .message_attributes(
1168 "retry_attempt",
1169 MessageAttributeValue::builder()
1170 .data_type("Number")
1171 .string_value("7")
1172 .build()
1173 .unwrap(),
1174 )
1175 .build();
1176 assert_eq!(parse_retry_attempt(&message), Some(7));
1177 }
1178
1179 #[test]
1180 fn test_map_handler_error() {
1181 let error = HandlerError::Abort("Validation failed".to_string());
1183 let result = map_handler_error(error);
1184 assert!(matches!(result, ProcessingError::Permanent(_)));
1185
1186 let error = HandlerError::Retry("Network timeout".to_string());
1188 let result = map_handler_error(error);
1189 assert!(matches!(result, ProcessingError::Retryable(_)));
1190 }
1191
1192 #[test]
1193 fn test_is_fifo_queue_url() {
1194 assert!(is_fifo_queue_url(
1195 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1196 ));
1197 assert!(!is_fifo_queue_url(
1198 "https://sqs.us-east-1.amazonaws.com/123/queue"
1199 ));
1200 }
1201
1202 #[test]
1203 fn test_compute_status_retry_delay_evm() {
1204 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1206 assert_eq!(compute_status_retry_delay(body, 0), 8);
1207 assert_eq!(compute_status_retry_delay(body, 1), 12);
1208 assert_eq!(compute_status_retry_delay(body, 8), 12);
1209 }
1210
1211 #[test]
1212 fn test_compute_status_retry_delay_stellar() {
1213 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1214 assert_eq!(compute_status_retry_delay(body, 0), 2);
1215 assert_eq!(compute_status_retry_delay(body, 1), 3);
1216 assert_eq!(compute_status_retry_delay(body, 8), 3);
1217 }
1218
1219 #[test]
1220 fn test_compute_status_retry_delay_solana() {
1221 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1222 assert_eq!(compute_status_retry_delay(body, 0), 5);
1223 assert_eq!(compute_status_retry_delay(body, 1), 8);
1224 assert_eq!(compute_status_retry_delay(body, 8), 8);
1225 }
1226
1227 #[test]
1228 fn test_compute_status_retry_delay_missing_network() {
1229 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1230 assert_eq!(compute_status_retry_delay(body, 0), 5);
1231 assert_eq!(compute_status_retry_delay(body, 1), 8);
1232 assert_eq!(compute_status_retry_delay(body, 8), 8);
1233 }
1234
1235 #[test]
1236 fn test_compute_status_retry_delay_invalid_body() {
1237 assert_eq!(compute_status_retry_delay("not json", 0), 5);
1238 assert_eq!(compute_status_retry_delay("not json", 1), 8);
1239 assert_eq!(compute_status_retry_delay("not json", 8), 8);
1240 }
1241
1242 #[tokio::test]
1243 async fn test_semaphore_released_on_panic() {
1244 let sem = Arc::new(tokio::sync::Semaphore::new(1));
1245 let permit = sem.clone().acquire_owned().await.unwrap();
1246
1247 let handle = tokio::spawn(async move {
1248 let _permit = permit; let _ = AssertUnwindSafe(async { panic!("test panic") })
1250 .catch_unwind()
1251 .await;
1252 });
1253
1254 handle.await.unwrap();
1255 let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1257 .await
1258 .expect("permit should be available after panic");
1259 }
1260
1261 #[test]
1262 fn test_poll_error_backoff_secs() {
1263 assert_eq!(poll_error_backoff_secs(1), 5);
1265 assert_eq!(poll_error_backoff_secs(2), 10);
1267 assert_eq!(poll_error_backoff_secs(3), 20);
1269 assert_eq!(poll_error_backoff_secs(4), 40);
1271 assert_eq!(poll_error_backoff_secs(5), 60);
1273 assert_eq!(poll_error_backoff_secs(6), 60);
1274 assert_eq!(poll_error_backoff_secs(7), 60);
1275 assert_eq!(poll_error_backoff_secs(8), 5);
1277 assert_eq!(poll_error_backoff_secs(9), 60);
1278 assert_eq!(poll_error_backoff_secs(12), 5); }
1280
1281 #[test]
1282 fn test_poll_error_backoff_zero_errors() {
1283 assert_eq!(poll_error_backoff_secs(0), 5);
1285 }
1286
1287 #[test]
1288 fn test_poll_error_backoff_recovery_probes() {
1289 for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1291 assert_eq!(
1292 poll_error_backoff_secs(i as u32),
1293 5,
1294 "Expected recovery probe at error {i}"
1295 );
1296 }
1297 }
1298
1299 #[test]
1300 fn test_message_outcome_delete_carries_receipt_handle() {
1301 let handle = "test-receipt-handle-123".to_string();
1302 let outcome = MessageOutcome::Delete {
1303 receipt_handle: handle.clone(),
1304 };
1305 match outcome {
1306 MessageOutcome::Delete { receipt_handle } => {
1307 assert_eq!(receipt_handle, handle);
1308 }
1309 MessageOutcome::Retain => panic!("Expected Delete variant"),
1310 }
1311 }
1312
1313 #[test]
1314 fn test_message_outcome_retain() {
1315 let outcome = MessageOutcome::Retain;
1316 assert!(matches!(outcome, MessageOutcome::Retain));
1317 }
1318
1319 #[test]
1320 fn test_batch_delete_entry_builder() {
1321 let handles = vec![
1324 "receipt-0".to_string(),
1325 "receipt-1".to_string(),
1326 "receipt-2".to_string(),
1327 ];
1328 let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1329 .iter()
1330 .enumerate()
1331 .map(|(i, handle)| {
1332 DeleteMessageBatchRequestEntry::builder()
1333 .id(i.to_string())
1334 .receipt_handle(handle)
1335 .build()
1336 .expect("id and receipt_handle are set")
1337 })
1338 .collect();
1339
1340 assert_eq!(entries.len(), 3);
1341 assert_eq!(entries[0].id(), "0");
1342 assert_eq!(entries[0].receipt_handle(), "receipt-0");
1343 assert_eq!(entries[2].id(), "2");
1344 assert_eq!(entries[2].receipt_handle(), "receipt-2");
1345 }
1346
1347 #[test]
1348 fn test_batch_chunking_logic() {
1349 let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1352 let chunks: Vec<&[String]> = handles.chunks(10).collect();
1353
1354 assert_eq!(chunks.len(), 3);
1355 assert_eq!(chunks[0].len(), 10);
1356 assert_eq!(chunks[1].len(), 10);
1357 assert_eq!(chunks[2].len(), 5);
1358 }
1359
1360 #[test]
1361 fn test_outcome_collection_pattern() {
1362 let outcomes = vec![
1365 Some("receipt-1".to_string()), None, Some("receipt-2".to_string()), None, Some("receipt-3".to_string()), ];
1371
1372 let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1373
1374 assert_eq!(pending_deletes.len(), 3);
1375 assert_eq!(pending_deletes[0], "receipt-1");
1376 assert_eq!(pending_deletes[1], "receipt-2");
1377 assert_eq!(pending_deletes[2], "receipt-3");
1378 }
1379
1380 #[test]
1383 fn test_parse_target_scheduled_on_non_numeric_string() {
1384 let message = Message::builder()
1385 .message_attributes(
1386 "target_scheduled_on",
1387 MessageAttributeValue::builder()
1388 .data_type("String")
1389 .string_value("not-a-number")
1390 .build()
1391 .unwrap(),
1392 )
1393 .build();
1394 assert_eq!(parse_target_scheduled_on(&message), None);
1395 }
1396
1397 #[test]
1398 fn test_parse_target_scheduled_on_empty_string() {
1399 let message = Message::builder()
1400 .message_attributes(
1401 "target_scheduled_on",
1402 MessageAttributeValue::builder()
1403 .data_type("Number")
1404 .string_value("")
1405 .build()
1406 .unwrap(),
1407 )
1408 .build();
1409 assert_eq!(parse_target_scheduled_on(&message), None);
1410 }
1411
1412 #[test]
1413 fn test_parse_target_scheduled_on_negative_value() {
1414 let message = Message::builder()
1415 .message_attributes(
1416 "target_scheduled_on",
1417 MessageAttributeValue::builder()
1418 .data_type("Number")
1419 .string_value("-1000")
1420 .build()
1421 .unwrap(),
1422 )
1423 .build();
1424 assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1426 }
1427
1428 #[test]
1429 fn test_parse_target_scheduled_on_float_string() {
1430 let message = Message::builder()
1431 .message_attributes(
1432 "target_scheduled_on",
1433 MessageAttributeValue::builder()
1434 .data_type("Number")
1435 .string_value("1234567890.5")
1436 .build()
1437 .unwrap(),
1438 )
1439 .build();
1440 assert_eq!(parse_target_scheduled_on(&message), None);
1442 }
1443
1444 #[test]
1445 fn test_parse_target_scheduled_on_zero() {
1446 let message = Message::builder()
1447 .message_attributes(
1448 "target_scheduled_on",
1449 MessageAttributeValue::builder()
1450 .data_type("Number")
1451 .string_value("0")
1452 .build()
1453 .unwrap(),
1454 )
1455 .build();
1456 assert_eq!(parse_target_scheduled_on(&message), Some(0));
1457 }
1458
1459 #[test]
1460 fn test_parse_target_scheduled_on_wrong_attribute_name() {
1461 let message = Message::builder()
1463 .message_attributes(
1464 "wrong_key",
1465 MessageAttributeValue::builder()
1466 .data_type("Number")
1467 .string_value("1234567890")
1468 .build()
1469 .unwrap(),
1470 )
1471 .build();
1472 assert_eq!(parse_target_scheduled_on(&message), None);
1473 }
1474
1475 #[test]
1478 fn test_parse_retry_attempt_non_numeric_string() {
1479 let message = Message::builder()
1480 .message_attributes(
1481 "retry_attempt",
1482 MessageAttributeValue::builder()
1483 .data_type("String")
1484 .string_value("abc")
1485 .build()
1486 .unwrap(),
1487 )
1488 .build();
1489 assert_eq!(parse_retry_attempt(&message), None);
1490 }
1491
1492 #[test]
1493 fn test_parse_retry_attempt_negative_value() {
1494 let message = Message::builder()
1495 .message_attributes(
1496 "retry_attempt",
1497 MessageAttributeValue::builder()
1498 .data_type("Number")
1499 .string_value("-1")
1500 .build()
1501 .unwrap(),
1502 )
1503 .build();
1504 assert_eq!(parse_retry_attempt(&message), None);
1506 }
1507
1508 #[test]
1509 fn test_parse_retry_attempt_zero() {
1510 let message = Message::builder()
1511 .message_attributes(
1512 "retry_attempt",
1513 MessageAttributeValue::builder()
1514 .data_type("Number")
1515 .string_value("0")
1516 .build()
1517 .unwrap(),
1518 )
1519 .build();
1520 assert_eq!(parse_retry_attempt(&message), Some(0));
1521 }
1522
1523 #[test]
1524 fn test_parse_retry_attempt_large_value() {
1525 let message = Message::builder()
1526 .message_attributes(
1527 "retry_attempt",
1528 MessageAttributeValue::builder()
1529 .data_type("Number")
1530 .string_value("999999")
1531 .build()
1532 .unwrap(),
1533 )
1534 .build();
1535 assert_eq!(parse_retry_attempt(&message), Some(999999));
1536 }
1537
1538 #[test]
1539 fn test_queue_pickup_baseline_ms_uses_scheduled_time_on_first_delivery() {
1540 let message = Message::builder()
1541 .message_attributes(
1542 "target_scheduled_on",
1543 MessageAttributeValue::builder()
1544 .data_type("Number")
1545 .string_value("123")
1546 .build()
1547 .unwrap(),
1548 )
1549 .set_attributes(Some(std::collections::HashMap::from([(
1550 MessageSystemAttributeName::SentTimestamp,
1551 "999999".to_string(),
1552 )])))
1553 .build();
1554
1555 assert_eq!(queue_pickup_baseline_ms(&message), Some(123_000));
1557 }
1558
1559 #[test]
1560 fn test_queue_pickup_baseline_ms_falls_back_to_sent_timestamp() {
1561 let message = Message::builder()
1562 .set_attributes(Some(std::collections::HashMap::from([(
1563 MessageSystemAttributeName::SentTimestamp,
1564 "123456".to_string(),
1565 )])))
1566 .build();
1567
1568 assert_eq!(queue_pickup_baseline_ms(&message), Some(123456));
1569 }
1570
1571 #[test]
1572 fn test_pickup_latency_secs_clamps_negative_skew() {
1573 let now_ms = 1_000_000_i64;
1576 let baseline_ms = now_ms + 5_000;
1577 assert_eq!(pickup_latency_secs(baseline_ms, now_ms), 0.0);
1578 }
1579
1580 #[test]
1581 fn test_pickup_latency_secs_positive_delta() {
1582 assert_eq!(pickup_latency_secs(1_000_000, 1_002_500), 2.5);
1584 }
1585
1586 #[test]
1587 fn test_queue_pickup_baseline_ms_skips_when_retry_attempt_positive() {
1588 let message = Message::builder()
1589 .message_attributes(
1590 "target_scheduled_on",
1591 MessageAttributeValue::builder()
1592 .data_type("Number")
1593 .string_value("123")
1594 .build()
1595 .unwrap(),
1596 )
1597 .message_attributes(
1598 "retry_attempt",
1599 MessageAttributeValue::builder()
1600 .data_type("Number")
1601 .string_value("1")
1602 .build()
1603 .unwrap(),
1604 )
1605 .set_attributes(Some(std::collections::HashMap::from([(
1606 MessageSystemAttributeName::SentTimestamp,
1607 "123456".to_string(),
1608 )])))
1609 .build();
1610
1611 assert_eq!(queue_pickup_baseline_ms(&message), None);
1613 }
1614
1615 #[test]
1616 fn test_queue_pickup_baseline_ms_accepts_retry_attempt_zero() {
1617 let message = Message::builder()
1620 .message_attributes(
1621 "target_scheduled_on",
1622 MessageAttributeValue::builder()
1623 .data_type("Number")
1624 .string_value("777")
1625 .build()
1626 .unwrap(),
1627 )
1628 .message_attributes(
1629 "retry_attempt",
1630 MessageAttributeValue::builder()
1631 .data_type("Number")
1632 .string_value("0")
1633 .build()
1634 .unwrap(),
1635 )
1636 .build();
1637
1638 assert_eq!(queue_pickup_baseline_ms(&message), Some(777_000));
1639 }
1640
1641 #[test]
1642 fn test_queue_pickup_baseline_ms_skips_when_receive_count_gt_one() {
1643 let message = Message::builder()
1648 .message_attributes(
1649 "target_scheduled_on",
1650 MessageAttributeValue::builder()
1651 .data_type("Number")
1652 .string_value("500")
1653 .build()
1654 .unwrap(),
1655 )
1656 .set_attributes(Some(std::collections::HashMap::from([
1657 (
1658 MessageSystemAttributeName::ApproximateReceiveCount,
1659 "2".to_string(),
1660 ),
1661 (MessageSystemAttributeName::SentTimestamp, "999".to_string()),
1662 ])))
1663 .build();
1664
1665 assert_eq!(queue_pickup_baseline_ms(&message), None);
1666 }
1667
1668 #[test]
1669 fn test_queue_pickup_baseline_ms_observes_when_receive_count_explicitly_one() {
1670 let message = Message::builder()
1674 .message_attributes(
1675 "target_scheduled_on",
1676 MessageAttributeValue::builder()
1677 .data_type("Number")
1678 .string_value("250")
1679 .build()
1680 .unwrap(),
1681 )
1682 .set_attributes(Some(std::collections::HashMap::from([(
1683 MessageSystemAttributeName::ApproximateReceiveCount,
1684 "1".to_string(),
1685 )])))
1686 .build();
1687
1688 assert_eq!(queue_pickup_baseline_ms(&message), Some(250_000));
1689 }
1690
1691 #[test]
1694 fn test_is_fifo_queue_url_empty_string() {
1695 assert!(!is_fifo_queue_url(""));
1696 }
1697
1698 #[test]
1699 fn test_is_fifo_queue_url_just_fifo_suffix() {
1700 assert!(is_fifo_queue_url("my-queue.fifo"));
1701 }
1702
1703 #[test]
1704 fn test_is_fifo_queue_url_fifo_in_middle() {
1705 assert!(!is_fifo_queue_url(
1707 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1708 ));
1709 }
1710
1711 #[test]
1712 fn test_is_fifo_queue_url_case_sensitive() {
1713 assert!(!is_fifo_queue_url(
1714 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1715 ));
1716 assert!(!is_fifo_queue_url(
1717 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1718 ));
1719 }
1720
1721 #[test]
1722 fn test_is_fifo_queue_url_standard_queue_variations() {
1723 assert!(!is_fifo_queue_url(
1724 "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1725 ));
1726 assert!(!is_fifo_queue_url(
1727 "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1728 ));
1729 assert!(!is_fifo_queue_url(
1730 "http://localhost:4566/000000000000/test-queue"
1731 ));
1732 }
1733
1734 #[test]
1735 fn test_is_fifo_queue_url_localstack() {
1736 assert!(is_fifo_queue_url(
1738 "http://localhost:4566/000000000000/test-queue.fifo"
1739 ));
1740 }
1741
1742 #[test]
1745 fn test_map_handler_error_preserves_abort_message() {
1746 let msg = "Validation failed: invalid nonce";
1747 let error = HandlerError::Abort(msg.to_string());
1748 match map_handler_error(error) {
1749 ProcessingError::Permanent(s) => assert_eq!(s, msg),
1750 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1751 }
1752 }
1753
1754 #[test]
1755 fn test_map_handler_error_preserves_retry_message() {
1756 let msg = "RPC timeout after 30s";
1757 let error = HandlerError::Retry(msg.to_string());
1758 match map_handler_error(error) {
1759 ProcessingError::Retryable(s) => assert_eq!(s, msg),
1760 ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1761 }
1762 }
1763
1764 #[test]
1765 fn test_map_handler_error_empty_message() {
1766 let error = HandlerError::Abort(String::new());
1767 match map_handler_error(error) {
1768 ProcessingError::Permanent(s) => assert!(s.is_empty()),
1769 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1770 }
1771 }
1772
1773 #[test]
1776 fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1777 let all = [
1778 QueueType::TransactionRequest,
1779 QueueType::TransactionSubmission,
1780 QueueType::StatusCheck,
1781 QueueType::StatusCheckEvm,
1782 QueueType::StatusCheckStellar,
1783 QueueType::Notification,
1784 QueueType::TokenSwapRequest,
1785 QueueType::RelayerHealthCheck,
1786 ];
1787 for qt in all {
1788 assert_eq!(
1789 handler_timeout_secs(qt),
1790 qt.visibility_timeout_secs().max(1) as u64,
1791 "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1792 );
1793 }
1794 }
1795
1796 #[test]
1799 fn test_get_concurrency_for_queue_all_types_positive() {
1800 let all = [
1801 QueueType::TransactionRequest,
1802 QueueType::TransactionSubmission,
1803 QueueType::StatusCheck,
1804 QueueType::StatusCheckEvm,
1805 QueueType::StatusCheckStellar,
1806 QueueType::Notification,
1807 QueueType::TokenSwapRequest,
1808 QueueType::RelayerHealthCheck,
1809 ];
1810 for qt in all {
1811 assert!(
1812 get_concurrency_for_queue(qt) > 0,
1813 "{qt:?}: concurrency must be positive (clamped to at least 1)"
1814 );
1815 }
1816 }
1817
1818 #[test]
1821 fn test_poll_error_backoff_never_exceeds_max() {
1822 for i in 0..200 {
1823 let backoff = poll_error_backoff_secs(i);
1824 assert!(
1825 backoff <= MAX_POLL_BACKOFF_SECS,
1826 "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1827 );
1828 }
1829 }
1830
1831 #[test]
1832 fn test_poll_error_backoff_u32_max_does_not_overflow() {
1833 let backoff = poll_error_backoff_secs(u32::MAX);
1834 assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1835 assert!(backoff > 0);
1836 }
1837
1838 #[test]
1839 fn test_poll_error_backoff_always_positive() {
1840 for i in 0..200 {
1841 assert!(
1842 poll_error_backoff_secs(i) > 0,
1843 "Error count {i}: backoff must be positive"
1844 );
1845 }
1846 }
1847
1848 #[test]
1849 fn test_poll_error_backoff_monotonic_before_cap() {
1850 let mut prev = poll_error_backoff_secs(0);
1852 for i in 1..=4 {
1853 let curr = poll_error_backoff_secs(i);
1854 assert!(
1855 curr >= prev,
1856 "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1857 );
1858 prev = curr;
1859 }
1860 }
1861
1862 #[test]
1865 fn test_max_poll_backoff_is_reasonable() {
1866 assert!(
1867 MAX_POLL_BACKOFF_SECS >= 10,
1868 "Max backoff should be at least 10s to avoid tight error loops"
1869 );
1870 assert!(
1871 MAX_POLL_BACKOFF_SECS <= 300,
1872 "Max backoff should be at most 5 minutes to detect recovery promptly"
1873 );
1874 }
1875
1876 #[test]
1877 fn test_recovery_probe_every_is_valid() {
1878 assert!(
1879 RECOVERY_PROBE_EVERY >= 2,
1880 "Recovery probe interval must be at least 2 to avoid probing every attempt"
1881 );
1882 assert!(
1883 RECOVERY_PROBE_EVERY <= 10,
1884 "Recovery probe interval should not be too large or recovery detection is slow"
1885 );
1886 }
1887
1888 #[test]
1891 fn test_compute_status_retry_delay_very_high_attempt() {
1892 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1893 assert_eq!(compute_status_retry_delay(body, 1000), 12);
1895 assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1896 }
1897
1898 #[test]
1899 fn test_compute_status_retry_delay_empty_body() {
1900 assert_eq!(compute_status_retry_delay("", 0), 5);
1902 assert_eq!(compute_status_retry_delay("{}", 0), 5);
1903 }
1904
1905 #[test]
1906 fn test_compute_status_retry_delay_partial_json() {
1907 assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1909 assert_eq!(
1910 compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1911 8
1912 );
1913 }
1914
1915 #[test]
1918 fn test_partial_status_check_job_deserializes_network_type() {
1919 let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1920 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1921 assert_eq!(
1922 parsed.data.network_type,
1923 Some(crate::models::NetworkType::Evm)
1924 );
1925 }
1926
1927 #[test]
1928 fn test_partial_status_check_job_handles_missing_network_type() {
1929 let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1930 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1931 assert_eq!(parsed.data.network_type, None);
1932 }
1933
1934 #[test]
1935 fn test_partial_status_check_job_rejects_missing_data() {
1936 let body = r#"{"not_data":{}}"#;
1937 let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1938 assert!(result.is_err());
1939 }
1940
1941 #[test]
1944 fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1945 let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1950 let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1951
1952 assert!(!is_fifo_queue_url(standard));
1953 assert!(is_fifo_queue_url(fifo));
1954 }
1955
1956 #[test]
1959 fn test_get_wait_time_for_queue_returns_positive() {
1960 let all = [
1961 QueueType::TransactionRequest,
1962 QueueType::TransactionSubmission,
1963 QueueType::StatusCheck,
1964 QueueType::StatusCheckEvm,
1965 QueueType::StatusCheckStellar,
1966 QueueType::Notification,
1967 QueueType::TokenSwapRequest,
1968 QueueType::RelayerHealthCheck,
1969 ];
1970 for qt in all {
1971 let wt = get_wait_time_for_queue(qt);
1972 assert!(
1973 wt <= 20,
1974 "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1975 );
1976 }
1977 }
1978
1979 #[test]
1980 fn test_get_wait_time_for_queue_matches_defaults() {
1981 assert_eq!(
1983 get_wait_time_for_queue(QueueType::TransactionRequest),
1984 QueueType::TransactionRequest.default_wait_time_secs()
1985 );
1986 assert_eq!(
1987 get_wait_time_for_queue(QueueType::StatusCheck),
1988 QueueType::StatusCheck.default_wait_time_secs()
1989 );
1990 }
1991
1992 #[test]
1993 #[serial_test::serial]
1994 fn test_get_wait_time_for_queue_respects_env_override() {
1995 let env_var = format!(
1997 "SQS_{}_WAIT_TIME_SECONDS",
1998 QueueType::StatusCheck.sqs_env_key()
1999 );
2000 std::env::set_var(&env_var, "12");
2001 assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
2002 std::env::remove_var(&env_var);
2003 }
2004
2005 #[test]
2006 #[serial_test::serial]
2007 fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
2008 let env_var = format!(
2009 "SQS_{}_WAIT_TIME_SECONDS",
2010 QueueType::Notification.sqs_env_key()
2011 );
2012 std::env::set_var(&env_var, "99");
2013 assert_eq!(
2014 get_wait_time_for_queue(QueueType::Notification),
2015 20,
2016 "Should clamp to SQS maximum of 20"
2017 );
2018 std::env::remove_var(&env_var);
2019 }
2020
2021 #[test]
2024 fn test_get_poller_count_for_queue_all_types_positive() {
2025 let all = [
2026 QueueType::TransactionRequest,
2027 QueueType::TransactionSubmission,
2028 QueueType::StatusCheck,
2029 QueueType::StatusCheckEvm,
2030 QueueType::StatusCheckStellar,
2031 QueueType::Notification,
2032 QueueType::TokenSwapRequest,
2033 QueueType::RelayerHealthCheck,
2034 ];
2035 for qt in all {
2036 assert!(
2037 get_poller_count_for_queue(qt) >= 1,
2038 "{qt:?}: poller count must be at least 1"
2039 );
2040 }
2041 }
2042
2043 #[test]
2044 fn test_get_poller_count_for_queue_matches_defaults() {
2045 assert_eq!(
2047 get_poller_count_for_queue(QueueType::TransactionRequest),
2048 QueueType::TransactionRequest.default_poller_count().max(1)
2049 );
2050 assert_eq!(
2051 get_poller_count_for_queue(QueueType::Notification),
2052 QueueType::Notification.default_poller_count().max(1)
2053 );
2054 }
2055
2056 #[test]
2057 #[serial_test::serial]
2058 fn test_get_poller_count_for_queue_respects_env_override() {
2059 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
2060 std::env::set_var(&env_var, "5");
2061 assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
2062 std::env::remove_var(&env_var);
2063 }
2064
2065 #[test]
2066 #[serial_test::serial]
2067 fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
2068 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
2069 std::env::set_var(&env_var, "0");
2070 assert_eq!(
2071 get_poller_count_for_queue(QueueType::StatusCheck),
2072 1,
2073 "Zero poller count from env should be clamped to 1"
2074 );
2075 std::env::remove_var(&env_var);
2076 }
2077
2078 #[test]
2081 fn test_poll_loop_config_clone() {
2082 let config = PollLoopConfig {
2083 queue_type: QueueType::TransactionRequest,
2084 polling_interval: 15,
2085 visibility_timeout: 120,
2086 handler_timeout: Duration::from_secs(120),
2087 max_retries: 3,
2088 poller_id: 0,
2089 poller_count: 2,
2090 };
2091 let cloned = config.clone();
2092 assert_eq!(cloned.polling_interval, 15);
2093 assert_eq!(cloned.poller_id, 0);
2094 assert_eq!(cloned.poller_count, 2);
2095 assert_eq!(cloned.max_retries, 3);
2096 }
2097}