1use async_trait::async_trait;
9use aws_sdk_sqs::types::MessageAttributeValue;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::watch;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17 config::ServerConfig,
18 jobs::{
19 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
20 TransactionSend, TransactionStatusCheck,
21 },
22 models::{DefaultAppState, NetworkType},
23 queues::QueueBackendType,
24 utils::{aws_error::DisplayErrorContext, classify_sdk_error},
25};
26use actix_web::web::ThinData;
27
28use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
29
30const SQS_MAX_MESSAGE_SIZE_BYTES: usize = 256 * 1024;
32
33fn transaction_message_group_id(
40 network_type: Option<&NetworkType>,
41 _relayer_id: &str,
42 transaction_id: &str,
43) -> String {
44 match network_type {
45 Some(_) | None => transaction_id.to_string(),
46 }
47}
48
49fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
54 match network_type {
55 Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
56 Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
57 _ => QueueType::StatusCheck,
58 }
59}
60
61#[derive(Clone)]
67pub struct SqsBackend {
68 sqs_client: aws_sdk_sqs::Client,
70 queue_urls: HashMap<QueueType, String>,
72 dlq_urls: HashMap<QueueType, String>,
75 region: String,
77 shutdown_tx: Arc<watch::Sender<bool>>,
79}
80
81impl std::fmt::Debug for SqsBackend {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("SqsBackend")
84 .field("backend_type", &"sqs")
85 .field("region", &self.region)
86 .field("queue_count", &self.queue_urls.len())
87 .finish()
88 }
89}
90
91fn resolve_queue_type(
101 sqs_queue_type: &str,
102 probe_results: Option<(bool, bool)>,
103 ref_standard_url: &str,
104 ref_fifo_url: &str,
105) -> Result<bool, QueueBackendError> {
106 match sqs_queue_type {
107 "standard" => {
108 info!("Using explicit SQS queue type: standard");
109 Ok(false)
110 }
111 "fifo" => {
112 info!("Using explicit SQS queue type: fifo");
113 Ok(true)
114 }
115 "auto" => {
116 let (standard_exists, fifo_exists) = probe_results.unwrap_or((false, false));
117 match (standard_exists, fifo_exists) {
118 (true, false) => {
119 info!("Detected SQS queue type: standard");
120 Ok(false)
121 }
122 (false, true) => {
123 info!("Detected SQS queue type: fifo");
124 Ok(true)
125 }
126 (true, true) => Err(QueueBackendError::ConfigError(
127 "Ambiguous SQS queue type: both standard and FIFO \
128 'transaction-request' queues exist. Remove one set or set \
129 SQS_QUEUE_TYPE explicitly."
130 .to_string(),
131 )),
132 (false, false) => Err(QueueBackendError::ConfigError(format!(
133 "No SQS queues found. Neither '{ref_standard_url}' nor \
134 '{ref_fifo_url}' is accessible. Create queues before starting \
135 the relayer, or set SQS_QUEUE_TYPE explicitly."
136 ))),
137 }
138 }
139 other => Err(QueueBackendError::ConfigError(format!(
140 "Unsupported SQS_QUEUE_TYPE: '{other}'. Must be 'auto', 'standard', or 'fifo'."
141 ))),
142 }
143}
144
145impl SqsBackend {
146 fn is_fifo_queue_url(queue_url: &str) -> bool {
147 queue_url.ends_with(".fifo")
148 }
149
150 pub async fn new() -> Result<Self, QueueBackendError> {
167 info!("Initializing SQS queue backend");
168
169 let config = aws_config::load_from_env().await;
171 let sqs_client = aws_sdk_sqs::Client::new(&config);
172 let region = config
173 .region()
174 .ok_or_else(|| {
175 QueueBackendError::ConfigError(
176 "AWS_REGION not set. Required for SQS backend.".to_string(),
177 )
178 })?
179 .to_string();
180
181 let prefix = match std::env::var("SQS_QUEUE_URL_PREFIX") {
184 Ok(prefix) => prefix,
185 Err(_) => {
186 let account_id =
187 ServerConfig::get_aws_account_id().map_err(QueueBackendError::ConfigError)?;
188 format!("https://sqs.{region}.amazonaws.com/{account_id}/relayer-")
189 }
190 };
191 info!(
192 region = %region,
193 queue_url_prefix = %prefix,
194 "Resolved SQS queue URL prefix"
195 );
196
197 let sqs_queue_type = ServerConfig::get_sqs_queue_type().to_lowercase();
199 let ref_standard_url = format!("{prefix}transaction-request");
200 let ref_fifo_url = format!("{prefix}transaction-request.fifo");
201
202 let probe_results = if sqs_queue_type == "auto" {
204 let (standard_probe, fifo_probe) = {
205 let client_s = sqs_client.clone();
206 let client_f = sqs_client.clone();
207 let url_s = ref_standard_url.clone();
208 let url_f = ref_fifo_url.clone();
209 tokio::join!(
210 async move {
211 client_s
212 .get_queue_attributes()
213 .queue_url(&url_s)
214 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
215 .send()
216 .await
217 },
218 async move {
219 client_f
220 .get_queue_attributes()
221 .queue_url(&url_f)
222 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
223 .send()
224 .await
225 }
226 )
227 };
228 Some((standard_probe.is_ok(), fifo_probe.is_ok()))
229 } else {
230 None
231 };
232
233 let is_fifo = resolve_queue_type(
234 &sqs_queue_type,
235 probe_results,
236 &ref_standard_url,
237 &ref_fifo_url,
238 )?;
239 let suffix = if is_fifo { ".fifo" } else { "" };
240
241 let queue_urls = HashMap::from([
246 (
247 QueueType::TransactionRequest,
248 format!("{prefix}transaction-request{suffix}"),
249 ),
250 (
251 QueueType::TransactionSubmission,
252 format!("{prefix}transaction-submission{suffix}"),
253 ),
254 (
255 QueueType::StatusCheck,
256 format!("{prefix}status-check{suffix}"),
257 ),
258 (
259 QueueType::StatusCheckEvm,
260 format!("{prefix}status-check-evm{suffix}"),
261 ),
262 (
263 QueueType::StatusCheckStellar,
264 format!("{prefix}status-check-stellar{suffix}"),
265 ),
266 (
267 QueueType::Notification,
268 format!("{prefix}notification{suffix}"),
269 ),
270 (
271 QueueType::TokenSwapRequest,
272 format!("{prefix}token-swap-request{suffix}"),
273 ),
274 (
275 QueueType::RelayerHealthCheck,
276 format!("{prefix}relayer-health-check{suffix}"),
277 ),
278 ]);
279
280 let probe_futures: Vec<_> = queue_urls
285 .iter()
286 .map(|(queue_type, queue_url)| {
287 let client = sqs_client.clone();
288 let qt = *queue_type;
289 let url = queue_url.clone();
290 async move {
291 debug!(
292 queue_type = %qt,
293 queue_url = %url,
294 "Probing SQS queue accessibility at startup"
295 );
296 let probe = client
297 .get_queue_attributes()
298 .queue_url(&url)
299 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
300 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
301 .send()
302 .await;
303 (qt, url, probe)
304 }
305 })
306 .collect();
307
308 let probe_results = futures::future::join_all(probe_futures).await;
309
310 let mut missing_queues = Vec::new();
311 let mut dlq_urls: HashMap<QueueType, String> = HashMap::new();
312
313 for (queue_type, queue_url, probe) in probe_results {
314 match probe {
315 Ok(output) => {
316 debug!(
317 queue_type = %queue_type,
318 queue_url = %queue_url,
319 is_fifo = is_fifo,
320 "SQS queue probe succeeded"
321 );
322
323 if let Some(dlq_url) =
326 Self::resolve_dlq_url_from_attrs(&sqs_client, output.attributes()).await
327 {
328 dlq_urls.insert(queue_type, dlq_url);
329 }
330 }
331 Err(err) => {
332 error!(
334 queue_type = %queue_type,
335 queue_url = %queue_url,
336 error = ?err,
337 "SQS queue probe failed"
338 );
339 missing_queues.push(format!("{queue_type} ({queue_url}): {err:?}"));
340 }
341 }
342 }
343
344 if !missing_queues.is_empty() {
345 return Err(QueueBackendError::ConfigError(format!(
346 "SQS backend initialization failed. Missing/inaccessible queues: {}",
347 missing_queues.join(", ")
348 )));
349 }
350
351 info!(
352 region = %region,
353 queue_count = queue_urls.len(),
354 "SQS backend initialized"
355 );
356
357 let (shutdown_tx, _) = watch::channel(false);
358
359 Ok(Self {
360 sqs_client,
361 queue_urls,
362 dlq_urls,
363 region,
364 shutdown_tx: Arc::new(shutdown_tx),
365 })
366 }
367
368 async fn send_message_to_sqs(
380 &self,
381 queue_url: &str,
382 body: String,
383 message_group_id: String,
384 message_deduplication_id: String,
385 delay_seconds: Option<i32>,
386 target_scheduled_on: Option<i64>,
387 ) -> Result<String, QueueBackendError> {
388 if body.len() > SQS_MAX_MESSAGE_SIZE_BYTES {
389 return Err(QueueBackendError::SqsError(format!(
390 "Message body size ({} bytes) exceeds SQS limit ({} bytes)",
391 body.len(),
392 SQS_MAX_MESSAGE_SIZE_BYTES
393 )));
394 }
395
396 let mut request = self
397 .sqs_client
398 .send_message()
399 .queue_url(queue_url)
400 .message_body(body);
401
402 if Self::is_fifo_queue_url(queue_url) {
405 request = request
406 .message_group_id(message_group_id)
407 .message_deduplication_id(message_deduplication_id);
408 }
409
410 if let Some(timestamp) = target_scheduled_on {
411 request = request.message_attributes(
412 "target_scheduled_on",
413 MessageAttributeValue::builder()
414 .data_type("Number")
415 .string_value(timestamp.to_string())
416 .build()
417 .map_err(|e| {
418 QueueBackendError::SqsError(format!(
419 "Failed to build scheduled-on attribute: {e}"
420 ))
421 })?,
422 );
423 }
424
425 if let Some(delay) = delay_seconds {
428 let clamped_delay = delay.clamp(0, 900);
429 if Self::is_fifo_queue_url(queue_url) {
430 debug!(
431 queue_url = %queue_url,
432 requested_delay_seconds = delay,
433 "Skipping per-message DelaySeconds for FIFO queue; worker-side scheduling will enforce target_scheduled_on"
434 );
435 } else {
436 request = request.delay_seconds(clamped_delay);
437 if delay != clamped_delay {
438 warn!(
439 requested = delay,
440 clamped = clamped_delay,
441 "Delay seconds clamped to SQS limit (0-900)"
442 );
443 }
444 }
445 }
446
447 let response = request.send().await.map_err(|e| {
448 error!(
449 error.kind = classify_sdk_error(&e),
450 error.detail = %DisplayErrorContext(&e),
451 queue_url = %queue_url,
452 "Failed to send message to SQS"
453 );
454 QueueBackendError::SqsError(format!("SendMessage failed: {}", classify_sdk_error(&e)))
455 })?;
456
457 let message_id = response
458 .message_id()
459 .ok_or_else(|| QueueBackendError::SqsError("No message_id returned".to_string()))?
460 .to_string();
461
462 debug!(
463 message_id = %message_id,
464 queue_url = %queue_url,
465 "Message sent to SQS"
466 );
467
468 Ok(message_id)
469 }
470
471 fn calculate_delay_seconds(scheduled_on: Option<i64>) -> Option<i32> {
475 scheduled_on.and_then(|timestamp| {
476 let now = SystemTime::now()
477 .duration_since(SystemTime::UNIX_EPOCH)
478 .ok()?
479 .as_secs() as i64;
480
481 let delay = timestamp - now;
482 if delay > 0 {
483 Some(delay.min(900) as i32) } else {
485 None }
487 })
488 }
489
490 async fn resolve_dlq_url_from_attrs(
494 sqs_client: &aws_sdk_sqs::Client,
495 attrs: Option<&HashMap<aws_sdk_sqs::types::QueueAttributeName, String>>,
496 ) -> Option<String> {
497 let redrive_policy =
498 attrs.and_then(|a| a.get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy))?;
499
500 let dlq_arn = serde_json::from_str::<serde_json::Value>(redrive_policy)
501 .ok()
502 .and_then(|v| v.get("deadLetterTargetArn").cloned())
503 .and_then(|v| v.as_str().map(|s| s.to_string()));
504
505 let dlq_name = dlq_arn.as_deref()?.rsplit(':').next()?;
506
507 match sqs_client.get_queue_url().queue_name(dlq_name).send().await {
508 Ok(output) => output.queue_url().map(str::to_string),
509 Err(err) => {
510 warn!(
511 error.kind = classify_sdk_error(&err),
512 error.detail = %DisplayErrorContext(&err),
513 dlq_name = %dlq_name,
514 "Failed to resolve DLQ URL at startup"
515 );
516 None
517 }
518 }
519 }
520
521 async fn get_dlq_message_count(&self, queue_type: &QueueType) -> u64 {
526 let Some(dlq_url) = self.dlq_urls.get(queue_type) else {
527 return 0;
528 };
529
530 match self
531 .sqs_client
532 .get_queue_attributes()
533 .queue_url(dlq_url)
534 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
535 .send()
536 .await
537 {
538 Ok(output) => output
539 .attributes()
540 .and_then(|attrs| {
541 attrs.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
542 })
543 .and_then(|value| value.parse::<u64>().ok())
544 .unwrap_or(0),
545 Err(err) => {
546 warn!(
547 error.kind = classify_sdk_error(&err),
548 error.detail = %DisplayErrorContext(&err),
549 dlq_url = %dlq_url,
550 "Failed to fetch DLQ depth"
551 );
552 0
553 }
554 }
555 }
556}
557
558#[async_trait]
559impl QueueBackend for SqsBackend {
560 async fn produce_transaction_request(
561 &self,
562 job: Job<TransactionRequest>,
563 scheduled_on: Option<i64>,
564 ) -> Result<String, QueueBackendError> {
565 let queue_url = self
566 .queue_urls
567 .get(&QueueType::TransactionRequest)
568 .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionRequest".to_string()))?;
569
570 let body = serde_json::to_string(&job).map_err(|e| {
571 error!(error = %e, "Failed to serialize TransactionRequest job");
572 QueueBackendError::SerializationError(e.to_string())
573 })?;
574
575 let message_group_id = transaction_message_group_id(
576 job.data.network_type.as_ref(),
577 &job.data.relayer_id,
578 &job.data.transaction_id,
579 );
580 let message_deduplication_id = job.message_id.clone();
581 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
582
583 self.send_message_to_sqs(
584 queue_url,
585 body,
586 message_group_id,
587 message_deduplication_id,
588 delay_seconds,
589 scheduled_on,
590 )
591 .await
592 }
593
594 async fn produce_transaction_submission(
595 &self,
596 job: Job<TransactionSend>,
597 scheduled_on: Option<i64>,
598 ) -> Result<String, QueueBackendError> {
599 let queue_url = self
600 .queue_urls
601 .get(&QueueType::TransactionSubmission)
602 .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionSubmission".to_string()))?;
603
604 let body = serde_json::to_string(&job).map_err(|e| {
605 error!(error = %e, "Failed to serialize TransactionSend job");
606 QueueBackendError::SerializationError(e.to_string())
607 })?;
608
609 let message_group_id = transaction_message_group_id(
610 job.data.network_type.as_ref(),
611 &job.data.relayer_id,
612 &job.data.transaction_id,
613 );
614 let message_deduplication_id = job.message_id.clone();
615 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
616
617 self.send_message_to_sqs(
618 queue_url,
619 body,
620 message_group_id,
621 message_deduplication_id,
622 delay_seconds,
623 scheduled_on,
624 )
625 .await
626 }
627
628 async fn produce_transaction_status_check(
629 &self,
630 job: Job<TransactionStatusCheck>,
631 scheduled_on: Option<i64>,
632 ) -> Result<String, QueueBackendError> {
633 let queue_type = status_check_queue_type(job.data.network_type.as_ref());
637 let queue_url = self
638 .queue_urls
639 .get(&queue_type)
640 .ok_or_else(|| QueueBackendError::QueueNotFound(format!("{queue_type}")))?;
641
642 let body = serde_json::to_string(&job).map_err(|e| {
643 error!(error = %e, "Failed to serialize TransactionStatusCheck job");
644 QueueBackendError::SerializationError(e.to_string())
645 })?;
646
647 let message_group_id = job.data.transaction_id.clone();
648 let message_deduplication_id = job.message_id.clone();
649 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
650
651 self.send_message_to_sqs(
652 queue_url,
653 body,
654 message_group_id,
655 message_deduplication_id,
656 delay_seconds,
657 scheduled_on,
658 )
659 .await
660 }
661
662 async fn produce_notification(
663 &self,
664 job: Job<NotificationSend>,
665 scheduled_on: Option<i64>,
666 ) -> Result<String, QueueBackendError> {
667 let queue_url = self
668 .queue_urls
669 .get(&QueueType::Notification)
670 .ok_or_else(|| QueueBackendError::QueueNotFound("Notification".to_string()))?;
671
672 let body = serde_json::to_string(&job).map_err(|e| {
673 error!(error = %e, "Failed to serialize NotificationSend job");
674 QueueBackendError::SerializationError(e.to_string())
675 })?;
676
677 let message_group_id = job.data.notification_id.clone();
679 let message_deduplication_id = job.message_id.clone();
680 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
681
682 self.send_message_to_sqs(
683 queue_url,
684 body,
685 message_group_id,
686 message_deduplication_id,
687 delay_seconds,
688 scheduled_on,
689 )
690 .await
691 }
692
693 async fn produce_token_swap_request(
694 &self,
695 job: Job<TokenSwapRequest>,
696 scheduled_on: Option<i64>,
697 ) -> Result<String, QueueBackendError> {
698 let queue_url = self
699 .queue_urls
700 .get(&QueueType::TokenSwapRequest)
701 .ok_or_else(|| QueueBackendError::QueueNotFound("TokenSwapRequest".to_string()))?;
702
703 let body = serde_json::to_string(&job).map_err(|e| {
704 error!(error = %e, "Failed to serialize TokenSwapRequest job");
705 QueueBackendError::SerializationError(e.to_string())
706 })?;
707
708 let message_group_id = job.data.relayer_id.clone();
709 let message_deduplication_id = job.message_id.clone();
710 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
711
712 self.send_message_to_sqs(
713 queue_url,
714 body,
715 message_group_id,
716 message_deduplication_id,
717 delay_seconds,
718 scheduled_on,
719 )
720 .await
721 }
722
723 async fn produce_relayer_health_check(
724 &self,
725 job: Job<RelayerHealthCheck>,
726 scheduled_on: Option<i64>,
727 ) -> Result<String, QueueBackendError> {
728 let queue_url = self
729 .queue_urls
730 .get(&QueueType::RelayerHealthCheck)
731 .ok_or_else(|| QueueBackendError::QueueNotFound("RelayerHealthCheck".to_string()))?;
732
733 let body = serde_json::to_string(&job).map_err(|e| {
734 error!(error = %e, "Failed to serialize RelayerHealthCheck job");
735 QueueBackendError::SerializationError(e.to_string())
736 })?;
737
738 let message_group_id = job.data.relayer_id.clone();
739 let message_deduplication_id = job.message_id.clone();
740 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
741
742 self.send_message_to_sqs(
743 queue_url,
744 body,
745 message_group_id,
746 message_deduplication_id,
747 delay_seconds,
748 scheduled_on,
749 )
750 .await
751 }
752
753 async fn initialize_workers(
754 &self,
755 app_state: Arc<ThinData<DefaultAppState>>,
756 ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
757 info!(
758 "Initializing SQS workers for {} queues",
759 self.queue_urls.len()
760 );
761
762 let mut handles = Vec::new();
763
764 for (queue_type, queue_url) in &self.queue_urls {
766 let handle = super::sqs_worker::spawn_worker_for_queue(
767 self.sqs_client.clone(),
768 *queue_type,
769 queue_url.clone(),
770 app_state.clone(),
771 self.shutdown_tx.subscribe(),
772 )
773 .await?;
774
775 handles.push(handle);
776 }
777
778 let cron_scheduler =
780 super::sqs_cron::SqsCronScheduler::new(app_state.clone(), self.shutdown_tx.subscribe());
781 let cron_handles = cron_scheduler.start().await?;
782 handles.extend(cron_handles);
783
784 {
788 let shutdown_tx = self.shutdown_tx.clone();
789 let handle = tokio::spawn(async move {
790 let mut sigint =
791 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
792 .expect("Failed to create SIGINT handler");
793 let mut sigterm =
794 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
795 .expect("Failed to create SIGTERM handler");
796
797 tokio::select! {
798 _ = sigint.recv() => info!("SQS backend: received SIGINT, shutting down workers"),
799 _ = sigterm.recv() => info!("SQS backend: received SIGTERM, shutting down workers"),
800 }
801
802 let _ = shutdown_tx.send(true);
803 });
804 handles.push(WorkerHandle::Tokio(handle));
805 }
806
807 info!(
808 "Successfully spawned {} SQS workers and cron tasks",
809 handles.len()
810 );
811 Ok(handles)
812 }
813
814 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
815 let mut health_statuses = Vec::new();
816
817 for (queue_type, queue_url) in &self.queue_urls {
818 let result = self
820 .sqs_client
821 .get_queue_attributes()
822 .queue_url(queue_url)
823 .attribute_names(
824 aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages,
825 )
826 .attribute_names(
827 aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
828 )
829 .send()
830 .await;
831
832 let (messages_visible, messages_in_flight, messages_dlq, is_healthy) = match result {
833 Ok(output) => {
834 let attrs = output.attributes();
835 let visible = attrs
836 .and_then(|a| {
837 a.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
838 })
839 .and_then(|v| v.parse::<u64>().ok())
840 .unwrap_or(0);
841 let in_flight = attrs
842 .and_then(|a| {
843 a.get(
844 &aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
845 )
846 })
847 .and_then(|v| v.parse::<u64>().ok())
848 .unwrap_or(0);
849 let dlq_count = self.get_dlq_message_count(queue_type).await;
850 (visible, in_flight, dlq_count, true)
851 }
852 Err(e) => {
853 error!(
854 error = %e,
855 queue_type = ?queue_type,
856 "Failed to get queue attributes"
857 );
858 (0, 0, 0, false)
859 }
860 };
861
862 health_statuses.push(QueueHealth {
863 queue_type: *queue_type,
864 messages_visible,
865 messages_in_flight,
866 messages_dlq,
867 backend: "sqs".to_string(),
868 is_healthy,
869 });
870 }
871
872 Ok(health_statuses)
873 }
874
875 fn backend_type(&self) -> QueueBackendType {
876 QueueBackendType::Sqs
877 }
878
879 fn shutdown(&self) {
880 info!("SQS backend: broadcasting shutdown signal to all workers");
881 let _ = self.shutdown_tx.send(true);
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::jobs::{Job, JobType, TransactionStatusCheck};
889 use crate::models::NetworkType;
890
891 #[test]
892 fn test_calculate_delay_seconds() {
893 assert_eq!(SqsBackend::calculate_delay_seconds(None), None);
895
896 let past = SystemTime::now()
898 .duration_since(SystemTime::UNIX_EPOCH)
899 .unwrap()
900 .as_secs() as i64
901 - 10;
902 assert_eq!(SqsBackend::calculate_delay_seconds(Some(past)), None);
903
904 let future_5s = SystemTime::now()
906 .duration_since(SystemTime::UNIX_EPOCH)
907 .unwrap()
908 .as_secs() as i64
909 + 5;
910 assert_eq!(
911 SqsBackend::calculate_delay_seconds(Some(future_5s)),
912 Some(5)
913 );
914
915 let future_1000s = SystemTime::now()
917 .duration_since(SystemTime::UNIX_EPOCH)
918 .unwrap()
919 .as_secs() as i64
920 + 1000;
921 assert_eq!(
922 SqsBackend::calculate_delay_seconds(Some(future_1000s)),
923 Some(900)
924 );
925 }
926
927 #[test]
928 fn test_calculate_delay_seconds_edge_cases() {
929 let now = SystemTime::now()
931 .duration_since(SystemTime::UNIX_EPOCH)
932 .unwrap()
933 .as_secs() as i64;
934 assert_eq!(SqsBackend::calculate_delay_seconds(Some(now)), None);
935
936 let future_900s = now + 900;
938 assert_eq!(
939 SqsBackend::calculate_delay_seconds(Some(future_900s)),
940 Some(900)
941 );
942
943 let future_901s = now + 901;
945 assert_eq!(
946 SqsBackend::calculate_delay_seconds(Some(future_901s)),
947 Some(900)
948 );
949 }
950
951 #[test]
952 fn test_sqs_backend_type_value() {
953 assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
954 assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
955 }
956
957 #[test]
958 fn test_queue_url_construction() {
959 let mut queue_urls = HashMap::new();
961 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
962
963 queue_urls.insert(
964 QueueType::TransactionRequest,
965 format!("{prefix}transaction-request.fifo"),
966 );
967 queue_urls.insert(
968 QueueType::TransactionSubmission,
969 format!("{prefix}transaction-submission.fifo"),
970 );
971 queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check.fifo"));
972 queue_urls.insert(
973 QueueType::Notification,
974 format!("{prefix}notification.fifo"),
975 );
976 queue_urls.insert(
977 QueueType::TokenSwapRequest,
978 format!("{prefix}token-swap-request.fifo"),
979 );
980 queue_urls.insert(
981 QueueType::RelayerHealthCheck,
982 format!("{prefix}relayer-health-check.fifo"),
983 );
984 queue_urls.insert(
985 QueueType::StatusCheckEvm,
986 format!("{prefix}status-check-evm.fifo"),
987 );
988 queue_urls.insert(
989 QueueType::StatusCheckStellar,
990 format!("{prefix}status-check-stellar.fifo"),
991 );
992
993 assert_eq!(queue_urls.len(), 8);
995 assert!(queue_urls
996 .get(&QueueType::TransactionRequest)
997 .unwrap()
998 .ends_with(".fifo"));
999 assert!(queue_urls
1000 .get(&QueueType::TransactionSubmission)
1001 .unwrap()
1002 .contains("transaction-submission"));
1003 assert!(queue_urls
1004 .get(&QueueType::StatusCheckEvm)
1005 .unwrap()
1006 .contains("status-check-evm"));
1007 assert!(queue_urls
1008 .get(&QueueType::StatusCheckStellar)
1009 .unwrap()
1010 .contains("status-check-stellar"));
1011 }
1012
1013 #[test]
1014 fn test_queue_url_construction_standard() {
1015 let mut queue_urls = HashMap::new();
1017 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1018
1019 queue_urls.insert(
1020 QueueType::TransactionRequest,
1021 format!("{prefix}transaction-request"),
1022 );
1023 queue_urls.insert(
1024 QueueType::TransactionSubmission,
1025 format!("{prefix}transaction-submission"),
1026 );
1027 queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check"));
1028 queue_urls.insert(QueueType::Notification, format!("{prefix}notification"));
1029 queue_urls.insert(
1030 QueueType::TokenSwapRequest,
1031 format!("{prefix}token-swap-request"),
1032 );
1033 queue_urls.insert(
1034 QueueType::RelayerHealthCheck,
1035 format!("{prefix}relayer-health-check"),
1036 );
1037 queue_urls.insert(
1038 QueueType::StatusCheckEvm,
1039 format!("{prefix}status-check-evm"),
1040 );
1041 queue_urls.insert(
1042 QueueType::StatusCheckStellar,
1043 format!("{prefix}status-check-stellar"),
1044 );
1045
1046 assert_eq!(queue_urls.len(), 8);
1047 for (_, url) in &queue_urls {
1049 assert!(
1050 !url.ends_with(".fifo"),
1051 "Standard queue URL should not end with .fifo: {url}"
1052 );
1053 }
1054 assert!(queue_urls
1055 .get(&QueueType::TransactionRequest)
1056 .unwrap()
1057 .contains("transaction-request"));
1058 }
1059
1060 #[test]
1061 fn test_is_fifo_queue_url_standard() {
1062 assert!(!SqsBackend::is_fifo_queue_url(
1063 "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request"
1064 ));
1065 assert!(!SqsBackend::is_fifo_queue_url(
1066 "http://localstack:4566/000000000000/relayer-status-check"
1067 ));
1068 }
1069
1070 #[test]
1071 fn test_transaction_message_group_id_evm_uses_transaction() {
1072 let group = transaction_message_group_id(Some(&NetworkType::Evm), "relayer-1", "tx-123");
1073 assert_eq!(group, "tx-123");
1074 }
1075
1076 #[test]
1077 fn test_transaction_message_group_id_stellar_uses_transaction() {
1078 let group =
1079 transaction_message_group_id(Some(&NetworkType::Stellar), "relayer-1", "tx-123");
1080 assert_eq!(group, "tx-123");
1081 }
1082
1083 #[test]
1084 fn test_transaction_message_group_id_solana_uses_transaction() {
1085 let group = transaction_message_group_id(Some(&NetworkType::Solana), "relayer-1", "tx-123");
1086 assert_eq!(group, "tx-123");
1087 }
1088
1089 #[test]
1090 fn test_transaction_message_group_id_none_defaults_to_transaction() {
1091 let group = transaction_message_group_id(None, "relayer-1", "tx-123");
1092 assert_eq!(
1093 group, "tx-123",
1094 "Unknown network should default to transaction id"
1095 );
1096 }
1097
1098 #[test]
1099 fn test_status_check_queue_type_evm() {
1100 assert_eq!(
1101 status_check_queue_type(Some(&NetworkType::Evm)),
1102 QueueType::StatusCheckEvm
1103 );
1104 }
1105
1106 #[test]
1107 fn test_status_check_queue_type_stellar() {
1108 assert_eq!(
1109 status_check_queue_type(Some(&NetworkType::Stellar)),
1110 QueueType::StatusCheckStellar
1111 );
1112 }
1113
1114 #[test]
1115 fn test_status_check_queue_type_solana_defaults_to_generic() {
1116 assert_eq!(
1117 status_check_queue_type(Some(&NetworkType::Solana)),
1118 QueueType::StatusCheck
1119 );
1120 }
1121
1122 #[test]
1123 fn test_status_check_queue_type_none_defaults_to_generic() {
1124 assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
1125 }
1126
1127 #[test]
1128 fn test_sqs_max_message_size_constant() {
1129 assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 256 * 1024);
1130 }
1131
1132 #[test]
1133 fn test_is_fifo_queue_url() {
1134 assert!(SqsBackend::is_fifo_queue_url(
1135 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1136 ));
1137 assert!(!SqsBackend::is_fifo_queue_url(
1138 "https://sqs.us-east-1.amazonaws.com/123/queue"
1139 ));
1140 }
1141
1142 const REF_STD: &str = "http://localhost:4566/000000000000/relayer-transaction-request";
1145 const REF_FIFO: &str = "http://localhost:4566/000000000000/relayer-transaction-request.fifo";
1146
1147 #[test]
1148 fn test_resolve_queue_type_explicit_standard() {
1149 let result = resolve_queue_type("standard", None, REF_STD, REF_FIFO);
1150 assert_eq!(result.unwrap(), false);
1151 }
1152
1153 #[test]
1154 fn test_resolve_queue_type_explicit_fifo() {
1155 let result = resolve_queue_type("fifo", None, REF_STD, REF_FIFO);
1156 assert_eq!(result.unwrap(), true);
1157 }
1158
1159 #[test]
1160 fn test_resolve_queue_type_explicit_ignores_probes() {
1161 let result = resolve_queue_type("standard", Some((false, true)), REF_STD, REF_FIFO);
1163 assert_eq!(result.unwrap(), false);
1164 }
1165
1166 #[test]
1167 fn test_resolve_queue_type_auto_standard_only() {
1168 let result = resolve_queue_type("auto", Some((true, false)), REF_STD, REF_FIFO);
1169 assert_eq!(result.unwrap(), false);
1170 }
1171
1172 #[test]
1173 fn test_resolve_queue_type_auto_fifo_only() {
1174 let result = resolve_queue_type("auto", Some((false, true)), REF_STD, REF_FIFO);
1175 assert_eq!(result.unwrap(), true);
1176 }
1177
1178 #[test]
1179 fn test_resolve_queue_type_auto_both_exist_errors() {
1180 let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1181 assert!(result.is_err());
1182 let err = result.unwrap_err().to_string();
1183 assert!(
1184 err.contains("Ambiguous"),
1185 "Expected 'Ambiguous' error, got: {err}"
1186 );
1187 }
1188
1189 #[test]
1190 fn test_resolve_queue_type_auto_neither_exists_errors() {
1191 let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1192 assert!(result.is_err());
1193 let err = result.unwrap_err().to_string();
1194 assert!(
1195 err.contains("No SQS queues found"),
1196 "Expected 'No SQS queues found' error, got: {err}"
1197 );
1198 }
1199
1200 #[test]
1201 fn test_resolve_queue_type_auto_no_probes_defaults_to_neither() {
1202 let result = resolve_queue_type("auto", None, REF_STD, REF_FIFO);
1204 assert!(result.is_err());
1205 }
1206
1207 #[test]
1208 fn test_resolve_queue_type_unknown_value_errors() {
1209 let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1210 assert!(result.is_err());
1211 let err = result.unwrap_err().to_string();
1212 assert!(
1213 err.contains("Unsupported SQS_QUEUE_TYPE"),
1214 "Expected unsupported error, got: {err}"
1215 );
1216 }
1217
1218 #[test]
1221 fn test_resolve_queue_type_auto_neither_error_includes_urls() {
1222 let std_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request";
1223 let fifo_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request.fifo";
1224 let result = resolve_queue_type("auto", Some((false, false)), std_url, fifo_url);
1225 let err = result.unwrap_err();
1226 let msg = err.to_string();
1227 assert!(
1228 msg.contains(std_url),
1229 "Error should include standard URL: {msg}"
1230 );
1231 assert!(
1232 msg.contains(fifo_url),
1233 "Error should include FIFO URL: {msg}"
1234 );
1235 }
1236
1237 #[test]
1238 fn test_resolve_queue_type_returns_config_error_variant() {
1239 let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1240 assert!(
1241 matches!(result, Err(QueueBackendError::ConfigError(_))),
1242 "Expected ConfigError variant"
1243 );
1244
1245 let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1246 assert!(
1247 matches!(result, Err(QueueBackendError::ConfigError(_))),
1248 "Ambiguous case should be ConfigError"
1249 );
1250
1251 let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1252 assert!(
1253 matches!(result, Err(QueueBackendError::ConfigError(_))),
1254 "No queues case should be ConfigError"
1255 );
1256 }
1257
1258 #[test]
1259 fn test_resolve_queue_type_unknown_includes_value_in_error() {
1260 let result = resolve_queue_type("redis", None, REF_STD, REF_FIFO);
1261 let msg = result.unwrap_err().to_string();
1262 assert!(
1263 msg.contains("redis"),
1264 "Error should echo the invalid value: {msg}"
1265 );
1266
1267 let result = resolve_queue_type("", None, REF_STD, REF_FIFO);
1268 assert!(result.is_err(), "Empty string should be rejected");
1269 }
1270
1271 #[test]
1272 fn test_resolve_queue_type_case_sensitive() {
1273 assert!(resolve_queue_type("Standard", None, REF_STD, REF_FIFO).is_err());
1275 assert!(resolve_queue_type("FIFO", None, REF_STD, REF_FIFO).is_err());
1276 assert!(resolve_queue_type("Auto", None, REF_STD, REF_FIFO).is_err());
1277 }
1278
1279 #[test]
1282 fn test_calculate_delay_seconds_one_second_future() {
1283 let future_1s = SystemTime::now()
1284 .duration_since(SystemTime::UNIX_EPOCH)
1285 .unwrap()
1286 .as_secs() as i64
1287 + 2; let result = SqsBackend::calculate_delay_seconds(Some(future_1s));
1289 assert!(result.is_some(), "1-2s in future should yield Some");
1290 assert!(result.unwrap() > 0, "Delay should be positive");
1291 assert!(result.unwrap() <= 2, "Delay should be at most 2s");
1292 }
1293
1294 #[test]
1295 fn test_calculate_delay_seconds_far_past() {
1296 assert_eq!(SqsBackend::calculate_delay_seconds(Some(0)), None);
1298 assert_eq!(SqsBackend::calculate_delay_seconds(Some(-1000)), None);
1300 }
1301
1302 #[test]
1303 fn test_calculate_delay_seconds_very_far_future() {
1304 let far_future = 4_102_444_800_i64; assert_eq!(
1307 SqsBackend::calculate_delay_seconds(Some(far_future)),
1308 Some(900)
1309 );
1310 }
1311
1312 #[test]
1313 fn test_calculate_delay_seconds_exactly_900_boundary() {
1314 let now = SystemTime::now()
1315 .duration_since(SystemTime::UNIX_EPOCH)
1316 .unwrap()
1317 .as_secs() as i64;
1318
1319 let result = SqsBackend::calculate_delay_seconds(Some(now + 899));
1321 assert!(result.is_some());
1322 let val = result.unwrap();
1324 assert!((898..=899).contains(&val), "Expected ~899, got {val}");
1325 }
1326
1327 #[test]
1330 fn test_is_fifo_queue_url_empty() {
1331 assert!(!SqsBackend::is_fifo_queue_url(""));
1332 }
1333
1334 #[test]
1335 fn test_is_fifo_queue_url_case_sensitive() {
1336 assert!(!SqsBackend::is_fifo_queue_url(
1337 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1338 ));
1339 assert!(!SqsBackend::is_fifo_queue_url(
1340 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1341 ));
1342 }
1343
1344 #[test]
1345 fn test_is_fifo_queue_url_fifo_in_middle() {
1346 assert!(!SqsBackend::is_fifo_queue_url(
1347 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1348 ));
1349 }
1350
1351 #[test]
1352 fn test_is_fifo_queue_url_just_suffix() {
1353 assert!(SqsBackend::is_fifo_queue_url(".fifo"));
1354 }
1355
1356 #[test]
1357 fn test_is_fifo_queue_url_localstack() {
1358 assert!(SqsBackend::is_fifo_queue_url(
1359 "http://localhost:4566/000000000000/relayer-tx.fifo"
1360 ));
1361 }
1362
1363 #[test]
1366 fn test_transaction_message_group_id_always_returns_transaction_id() {
1367 let networks: &[Option<NetworkType>] = &[
1370 Some(NetworkType::Evm),
1371 Some(NetworkType::Stellar),
1372 Some(NetworkType::Solana),
1373 None,
1374 ];
1375
1376 for network in networks {
1377 let group = transaction_message_group_id(network.as_ref(), "relayer-99", "tx-abc");
1378 assert_eq!(
1379 group, "tx-abc",
1380 "Expected transaction_id for network {network:?}"
1381 );
1382 }
1383 }
1384
1385 #[test]
1388 fn test_status_check_queue_type_returns_distinct_queue_names() {
1389 let evm = status_check_queue_type(Some(&NetworkType::Evm));
1390 let stellar = status_check_queue_type(Some(&NetworkType::Stellar));
1391 let generic = status_check_queue_type(None);
1392
1393 assert_ne!(evm.queue_name(), stellar.queue_name());
1394 assert_ne!(evm.queue_name(), generic.queue_name());
1395 assert_ne!(stellar.queue_name(), generic.queue_name());
1396 }
1397
1398 #[test]
1399 fn test_status_check_queue_type_all_are_status_checks() {
1400 let networks: &[Option<&NetworkType>] = &[
1401 Some(&NetworkType::Evm),
1402 Some(&NetworkType::Stellar),
1403 Some(&NetworkType::Solana),
1404 None,
1405 ];
1406
1407 for network in networks {
1408 let qt = status_check_queue_type(*network);
1409 assert!(
1410 qt.is_status_check(),
1411 "{qt:?} should be a status check variant"
1412 );
1413 }
1414 }
1415
1416 #[test]
1419 fn test_queue_url_construction_algorithm_fifo() {
1420 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1422 let suffix = ".fifo";
1423
1424 let expected_urls = [
1425 (
1426 QueueType::TransactionRequest,
1427 format!("{prefix}transaction-request{suffix}"),
1428 ),
1429 (
1430 QueueType::TransactionSubmission,
1431 format!("{prefix}transaction-submission{suffix}"),
1432 ),
1433 (
1434 QueueType::StatusCheck,
1435 format!("{prefix}status-check{suffix}"),
1436 ),
1437 (
1438 QueueType::StatusCheckEvm,
1439 format!("{prefix}status-check-evm{suffix}"),
1440 ),
1441 (
1442 QueueType::StatusCheckStellar,
1443 format!("{prefix}status-check-stellar{suffix}"),
1444 ),
1445 (
1446 QueueType::Notification,
1447 format!("{prefix}notification{suffix}"),
1448 ),
1449 (
1450 QueueType::TokenSwapRequest,
1451 format!("{prefix}token-swap-request{suffix}"),
1452 ),
1453 (
1454 QueueType::RelayerHealthCheck,
1455 format!("{prefix}relayer-health-check{suffix}"),
1456 ),
1457 ];
1458
1459 for (qt, expected) in &expected_urls {
1460 assert!(
1461 SqsBackend::is_fifo_queue_url(expected),
1462 "{qt:?}: URL should be FIFO: {expected}"
1463 );
1464 }
1465 }
1466
1467 #[test]
1468 fn test_queue_url_construction_algorithm_standard() {
1469 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1470 let suffix = "";
1471
1472 let urls = [
1473 format!("{prefix}transaction-request{suffix}"),
1474 format!("{prefix}transaction-submission{suffix}"),
1475 format!("{prefix}status-check{suffix}"),
1476 format!("{prefix}notification{suffix}"),
1477 ];
1478
1479 for url in &urls {
1480 assert!(
1481 !SqsBackend::is_fifo_queue_url(url),
1482 "Standard URL should not be FIFO: {url}"
1483 );
1484 }
1485 }
1486
1487 #[test]
1490 fn test_sqs_max_message_size_matches_aws_limit() {
1491 assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 262_144);
1493 }
1494
1495 #[test]
1498 fn test_resolve_queue_type_suffix_logic() {
1499 let test_cases = [
1501 ("standard", None, false),
1502 ("fifo", None, true),
1503 ("auto", Some((true, false)), false),
1504 ("auto", Some((false, true)), true),
1505 ];
1506
1507 for (sqs_type, probes, expected_fifo) in test_cases {
1508 let is_fifo = resolve_queue_type(sqs_type, probes, REF_STD, REF_FIFO).unwrap();
1509 assert_eq!(is_fifo, expected_fifo, "sqs_type={sqs_type}");
1510
1511 let suffix = if is_fifo { ".fifo" } else { "" };
1512 let url = format!("https://sqs.us-east-1.amazonaws.com/123/relayer-tx{suffix}");
1513 assert_eq!(
1514 SqsBackend::is_fifo_queue_url(&url),
1515 expected_fifo,
1516 "URL FIFO detection mismatch for sqs_type={sqs_type}"
1517 );
1518 }
1519 }
1520
1521 #[tokio::test]
1522 #[ignore]
1523 async fn smoke_push_status_check_to_sqs() {
1524 let backend = SqsBackend::new()
1530 .await
1531 .expect("SQS backend initialization failed");
1532 let job = Job::new(
1533 JobType::TransactionStatusCheck,
1534 TransactionStatusCheck::new("smoke-tx-id", "smoke-relayer", NetworkType::Stellar),
1535 );
1536 let now = SystemTime::now()
1537 .duration_since(SystemTime::UNIX_EPOCH)
1538 .expect("system time before unix epoch")
1539 .as_secs() as i64;
1540 let scheduled_on = Some(now + 2);
1541 let result = backend
1542 .produce_transaction_status_check(job, scheduled_on)
1543 .await;
1544 assert!(
1545 result.is_ok(),
1546 "Expected SendMessage via SQS backend to succeed, got: {result:?}"
1547 );
1548 }
1549}