openzeppelin_relayer/queues/sqs/
backend.rs

1//! AWS SQS backend implementation.
2//!
3//! This module provides an AWS SQS-backed implementation of the QueueBackend trait.
4//! Supports both Standard and FIFO queues. By default (`SQS_QUEUE_TYPE=auto`),
5//! the queue type is auto-detected at startup by probing a reference queue.
6//! Can also be set explicitly to `standard` or `fifo`.
7
8use async_trait::async_trait;
9use aws_sdk_sqs::types::MessageAttributeValue;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::watch;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17    config::ServerConfig,
18    jobs::{
19        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
20        TransactionSend, TransactionStatusCheck,
21    },
22    models::{DefaultAppState, NetworkType},
23    queues::QueueBackendType,
24    utils::{aws_error::DisplayErrorContext, classify_sdk_error},
25};
26use actix_web::web::ThinData;
27
28use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
29
30/// SQS maximum message body size (256 KB).
31const SQS_MAX_MESSAGE_SIZE_BYTES: usize = 256 * 1024;
32
33/// Chooses the FIFO message group ID based on network type.
34///
35/// EVM requires per-relayer ordering (nonce management), so uses `relayer_id`.
36/// Non-EVM networks (Stellar, Solana) can safely parallelize per transaction,
37/// so uses `transaction_id` for better throughput.
38/// Falls back to `relayer_id` when network type is unknown (conservative/safe).
39fn transaction_message_group_id(
40    network_type: Option<&NetworkType>,
41    _relayer_id: &str,
42    transaction_id: &str,
43) -> String {
44    match network_type {
45        Some(_) | None => transaction_id.to_string(),
46    }
47}
48
49/// Selects the status-check queue for a given network type.
50///
51/// EVM and Stellar use dedicated queues; all other/unknown network types use
52/// the generic status-check queue.
53fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
54    match network_type {
55        Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
56        Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
57        _ => QueueType::StatusCheck,
58    }
59}
60
61/// AWS SQS backend for job queue operations.
62///
63/// Supports both Standard and FIFO queues (auto-detected at startup, or set via `SQS_QUEUE_TYPE`).
64/// FIFO mode provides message ordering and exactly-once delivery;
65/// Standard mode offers higher throughput and native per-message delays.
66#[derive(Clone)]
67pub struct SqsBackend {
68    /// AWS SQS client for all operations (send, delete, poll, change visibility)
69    sqs_client: aws_sdk_sqs::Client,
70    /// Mapping of queue types to SQS queue URLs
71    queue_urls: HashMap<QueueType, String>,
72    /// Cached DLQ URLs resolved once at startup, keyed by the source queue type.
73    /// Avoids repeated `get_queue_url` calls on every health check.
74    dlq_urls: HashMap<QueueType, String>,
75    /// AWS region
76    region: String,
77    /// Shutdown signal sender — sending `true` tells all workers and cron tasks to stop
78    shutdown_tx: Arc<watch::Sender<bool>>,
79}
80
81impl std::fmt::Debug for SqsBackend {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("SqsBackend")
84            .field("backend_type", &"sqs")
85            .field("region", &self.region)
86            .field("queue_count", &self.queue_urls.len())
87            .finish()
88    }
89}
90
91/// Resolves the queue type from the configured `SQS_QUEUE_TYPE` value and
92/// probe results.
93///
94/// - `"standard"` / `"fifo"` → returns immediately (probes ignored).
95/// - `"auto"` → decides based on which probe succeeded.
96/// - anything else → error.
97///
98/// `probe_results` is `Option<(bool, bool)>`: `Some((standard_ok, fifo_ok))`
99/// when `sqs_queue_type == "auto"`, `None` otherwise.
100fn resolve_queue_type(
101    sqs_queue_type: &str,
102    probe_results: Option<(bool, bool)>,
103    ref_standard_url: &str,
104    ref_fifo_url: &str,
105) -> Result<bool, QueueBackendError> {
106    match sqs_queue_type {
107        "standard" => {
108            info!("Using explicit SQS queue type: standard");
109            Ok(false)
110        }
111        "fifo" => {
112            info!("Using explicit SQS queue type: fifo");
113            Ok(true)
114        }
115        "auto" => {
116            let (standard_exists, fifo_exists) = probe_results.unwrap_or((false, false));
117            match (standard_exists, fifo_exists) {
118                (true, false) => {
119                    info!("Detected SQS queue type: standard");
120                    Ok(false)
121                }
122                (false, true) => {
123                    info!("Detected SQS queue type: fifo");
124                    Ok(true)
125                }
126                (true, true) => Err(QueueBackendError::ConfigError(
127                    "Ambiguous SQS queue type: both standard and FIFO \
128                     'transaction-request' queues exist. Remove one set or set \
129                     SQS_QUEUE_TYPE explicitly."
130                        .to_string(),
131                )),
132                (false, false) => Err(QueueBackendError::ConfigError(format!(
133                    "No SQS queues found. Neither '{ref_standard_url}' nor \
134                     '{ref_fifo_url}' is accessible. Create queues before starting \
135                     the relayer, or set SQS_QUEUE_TYPE explicitly."
136                ))),
137            }
138        }
139        other => Err(QueueBackendError::ConfigError(format!(
140            "Unsupported SQS_QUEUE_TYPE: '{other}'. Must be 'auto', 'standard', or 'fifo'."
141        ))),
142    }
143}
144
145impl SqsBackend {
146    fn is_fifo_queue_url(queue_url: &str) -> bool {
147        queue_url.ends_with(".fifo")
148    }
149
150    /// Creates a new SQS backend.
151    ///
152    /// Loads AWS configuration from environment and builds queue URLs.
153    /// Queue type is determined by `SQS_QUEUE_TYPE`:
154    /// - `auto` (default): probes a reference queue at startup to detect the type
155    /// - `standard` / `fifo`: uses the specified type directly, skipping probing
156    ///
157    /// # Environment Variables
158    /// - `AWS_REGION` - AWS region (required)
159    /// - `SQS_QUEUE_URL_PREFIX` - Optional custom prefix
160    /// - `AWS_ACCOUNT_ID` - Required only when `SQS_QUEUE_URL_PREFIX` is not set
161    /// - `SQS_QUEUE_TYPE` - Queue type: `auto` (default), `standard`, or `fifo`
162    ///
163    /// # Errors
164    /// Returns ConfigError if required environment variables are missing or
165    /// if queue type cannot be determined (no queues found, or both types exist).
166    pub async fn new() -> Result<Self, QueueBackendError> {
167        info!("Initializing SQS queue backend");
168
169        // Load AWS config from environment
170        let config = aws_config::load_from_env().await;
171        let sqs_client = aws_sdk_sqs::Client::new(&config);
172        let region = config
173            .region()
174            .ok_or_else(|| {
175                QueueBackendError::ConfigError(
176                    "AWS_REGION not set. Required for SQS backend.".to_string(),
177                )
178            })?
179            .to_string();
180
181        // Build queue URL prefix.
182        // If an explicit prefix is provided, avoid forcing AWS_ACCOUNT_ID.
183        let prefix = match std::env::var("SQS_QUEUE_URL_PREFIX") {
184            Ok(prefix) => prefix,
185            Err(_) => {
186                let account_id =
187                    ServerConfig::get_aws_account_id().map_err(QueueBackendError::ConfigError)?;
188                format!("https://sqs.{region}.amazonaws.com/{account_id}/relayer-")
189            }
190        };
191        info!(
192            region = %region,
193            queue_url_prefix = %prefix,
194            "Resolved SQS queue URL prefix"
195        );
196
197        // Determine queue type: explicit override or auto-detect by probing.
198        let sqs_queue_type = ServerConfig::get_sqs_queue_type().to_lowercase();
199        let ref_standard_url = format!("{prefix}transaction-request");
200        let ref_fifo_url = format!("{prefix}transaction-request.fifo");
201
202        // Only probe when auto-detecting; explicit values skip the network call.
203        let probe_results = if sqs_queue_type == "auto" {
204            let (standard_probe, fifo_probe) = {
205                let client_s = sqs_client.clone();
206                let client_f = sqs_client.clone();
207                let url_s = ref_standard_url.clone();
208                let url_f = ref_fifo_url.clone();
209                tokio::join!(
210                    async move {
211                        client_s
212                            .get_queue_attributes()
213                            .queue_url(&url_s)
214                            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
215                            .send()
216                            .await
217                    },
218                    async move {
219                        client_f
220                            .get_queue_attributes()
221                            .queue_url(&url_f)
222                            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
223                            .send()
224                            .await
225                    }
226                )
227            };
228            Some((standard_probe.is_ok(), fifo_probe.is_ok()))
229        } else {
230            None
231        };
232
233        let is_fifo = resolve_queue_type(
234            &sqs_queue_type,
235            probe_results,
236            &ref_standard_url,
237            &ref_fifo_url,
238        )?;
239        let suffix = if is_fifo { ".fifo" } else { "" };
240
241        // Build queue URL mapping.
242        // Status checks use per-network queues (EVM, Stellar, generic/Solana)
243        // to match the Redis backend's separate worker setup with independent
244        // concurrency pools and network-tuned polling intervals.
245        let queue_urls = HashMap::from([
246            (
247                QueueType::TransactionRequest,
248                format!("{prefix}transaction-request{suffix}"),
249            ),
250            (
251                QueueType::TransactionSubmission,
252                format!("{prefix}transaction-submission{suffix}"),
253            ),
254            (
255                QueueType::StatusCheck,
256                format!("{prefix}status-check{suffix}"),
257            ),
258            (
259                QueueType::StatusCheckEvm,
260                format!("{prefix}status-check-evm{suffix}"),
261            ),
262            (
263                QueueType::StatusCheckStellar,
264                format!("{prefix}status-check-stellar{suffix}"),
265            ),
266            (
267                QueueType::Notification,
268                format!("{prefix}notification{suffix}"),
269            ),
270            (
271                QueueType::TokenSwapRequest,
272                format!("{prefix}token-swap-request{suffix}"),
273            ),
274            (
275                QueueType::RelayerHealthCheck,
276                format!("{prefix}relayer-health-check{suffix}"),
277            ),
278        ]);
279
280        // Fail fast at startup when expected queues are missing/misconfigured.
281        // This avoids silent runtime polling failures and makes infra drift explicit.
282        // Probe all queues concurrently to avoid slow sequential startup under
283        // SQS throttling during scale-out events.
284        let probe_futures: Vec<_> = queue_urls
285            .iter()
286            .map(|(queue_type, queue_url)| {
287                let client = sqs_client.clone();
288                let qt = *queue_type;
289                let url = queue_url.clone();
290                async move {
291                    debug!(
292                        queue_type = %qt,
293                        queue_url = %url,
294                        "Probing SQS queue accessibility at startup"
295                    );
296                    let probe = client
297                        .get_queue_attributes()
298                        .queue_url(&url)
299                        .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
300                        .attribute_names(aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
301                        .send()
302                        .await;
303                    (qt, url, probe)
304                }
305            })
306            .collect();
307
308        let probe_results = futures::future::join_all(probe_futures).await;
309
310        let mut missing_queues = Vec::new();
311        let mut dlq_urls: HashMap<QueueType, String> = HashMap::new();
312
313        for (queue_type, queue_url, probe) in probe_results {
314            match probe {
315                Ok(output) => {
316                    debug!(
317                        queue_type = %queue_type,
318                        queue_url = %queue_url,
319                        is_fifo = is_fifo,
320                        "SQS queue probe succeeded"
321                    );
322
323                    // Resolve and cache DLQ URL from the redrive policy while we
324                    // already have the attributes, avoiding per-health-check lookups.
325                    if let Some(dlq_url) =
326                        Self::resolve_dlq_url_from_attrs(&sqs_client, output.attributes()).await
327                    {
328                        dlq_urls.insert(queue_type, dlq_url);
329                    }
330                }
331                Err(err) => {
332                    // Include debug details because Display often collapses to "service error".
333                    error!(
334                        queue_type = %queue_type,
335                        queue_url = %queue_url,
336                        error = ?err,
337                        "SQS queue probe failed"
338                    );
339                    missing_queues.push(format!("{queue_type} ({queue_url}): {err:?}"));
340                }
341            }
342        }
343
344        if !missing_queues.is_empty() {
345            return Err(QueueBackendError::ConfigError(format!(
346                "SQS backend initialization failed. Missing/inaccessible queues: {}",
347                missing_queues.join(", ")
348            )));
349        }
350
351        info!(
352            region = %region,
353            queue_count = queue_urls.len(),
354            "SQS backend initialized"
355        );
356
357        let (shutdown_tx, _) = watch::channel(false);
358
359        Ok(Self {
360            sqs_client,
361            queue_urls,
362            dlq_urls,
363            region,
364            shutdown_tx: Arc::new(shutdown_tx),
365        })
366    }
367
368    /// Sends a message to SQS with FIFO parameters.
369    ///
370    /// # Arguments
371    /// * `queue_url` - SQS queue URL
372    /// * `body` - JSON-serialized job
373    /// * `message_group_id` - FIFO group ID (for ordering)
374    /// * `message_deduplication_id` - Deduplication ID (prevent duplicates)
375    /// * `delay_seconds` - Optional delay (0-900 seconds). Applied only for non-FIFO queues.
376    ///
377    /// # Returns
378    /// SQS message ID on success
379    async fn send_message_to_sqs(
380        &self,
381        queue_url: &str,
382        body: String,
383        message_group_id: String,
384        message_deduplication_id: String,
385        delay_seconds: Option<i32>,
386        target_scheduled_on: Option<i64>,
387    ) -> Result<String, QueueBackendError> {
388        if body.len() > SQS_MAX_MESSAGE_SIZE_BYTES {
389            return Err(QueueBackendError::SqsError(format!(
390                "Message body size ({} bytes) exceeds SQS limit ({} bytes)",
391                body.len(),
392                SQS_MAX_MESSAGE_SIZE_BYTES
393            )));
394        }
395
396        let mut request = self
397            .sqs_client
398            .send_message()
399            .queue_url(queue_url)
400            .message_body(body);
401
402        // FIFO queues require MessageGroupId and MessageDeduplicationId;
403        // standard queues reject these parameters.
404        if Self::is_fifo_queue_url(queue_url) {
405            request = request
406                .message_group_id(message_group_id)
407                .message_deduplication_id(message_deduplication_id);
408        }
409
410        if let Some(timestamp) = target_scheduled_on {
411            request = request.message_attributes(
412                "target_scheduled_on",
413                MessageAttributeValue::builder()
414                    .data_type("Number")
415                    .string_value(timestamp.to_string())
416                    .build()
417                    .map_err(|e| {
418                        QueueBackendError::SqsError(format!(
419                            "Failed to build scheduled-on attribute: {e}"
420                        ))
421                    })?,
422            );
423        }
424
425        // Add delay if specified (max 900 seconds = 15 minutes).
426        // FIFO queues do not support per-message DelaySeconds.
427        if let Some(delay) = delay_seconds {
428            let clamped_delay = delay.clamp(0, 900);
429            if Self::is_fifo_queue_url(queue_url) {
430                debug!(
431                    queue_url = %queue_url,
432                    requested_delay_seconds = delay,
433                    "Skipping per-message DelaySeconds for FIFO queue; worker-side scheduling will enforce target_scheduled_on"
434                );
435            } else {
436                request = request.delay_seconds(clamped_delay);
437                if delay != clamped_delay {
438                    warn!(
439                        requested = delay,
440                        clamped = clamped_delay,
441                        "Delay seconds clamped to SQS limit (0-900)"
442                    );
443                }
444            }
445        }
446
447        let response = request.send().await.map_err(|e| {
448            error!(
449                error.kind = classify_sdk_error(&e),
450                error.detail = %DisplayErrorContext(&e),
451                queue_url = %queue_url,
452                "Failed to send message to SQS"
453            );
454            QueueBackendError::SqsError(format!("SendMessage failed: {}", classify_sdk_error(&e)))
455        })?;
456
457        let message_id = response
458            .message_id()
459            .ok_or_else(|| QueueBackendError::SqsError("No message_id returned".to_string()))?
460            .to_string();
461
462        debug!(
463            message_id = %message_id,
464            queue_url = %queue_url,
465            "Message sent to SQS"
466        );
467
468        Ok(message_id)
469    }
470
471    /// Calculates delay in seconds from Unix timestamp.
472    ///
473    /// Returns None if scheduled_on is in the past or None.
474    fn calculate_delay_seconds(scheduled_on: Option<i64>) -> Option<i32> {
475        scheduled_on.and_then(|timestamp| {
476            let now = SystemTime::now()
477                .duration_since(SystemTime::UNIX_EPOCH)
478                .ok()?
479                .as_secs() as i64;
480
481            let delay = timestamp - now;
482            if delay > 0 {
483                Some(delay.min(900) as i32) // SQS max delay: 900 seconds
484            } else {
485                None // Already past scheduled time
486            }
487        })
488    }
489
490    /// Extracts the DLQ ARN from a redrive policy and resolves its queue URL.
491    ///
492    /// Called once at startup so the URL can be cached in `dlq_urls`.
493    async fn resolve_dlq_url_from_attrs(
494        sqs_client: &aws_sdk_sqs::Client,
495        attrs: Option<&HashMap<aws_sdk_sqs::types::QueueAttributeName, String>>,
496    ) -> Option<String> {
497        let redrive_policy =
498            attrs.and_then(|a| a.get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy))?;
499
500        let dlq_arn = serde_json::from_str::<serde_json::Value>(redrive_policy)
501            .ok()
502            .and_then(|v| v.get("deadLetterTargetArn").cloned())
503            .and_then(|v| v.as_str().map(|s| s.to_string()));
504
505        let dlq_name = dlq_arn.as_deref()?.rsplit(':').next()?;
506
507        match sqs_client.get_queue_url().queue_name(dlq_name).send().await {
508            Ok(output) => output.queue_url().map(str::to_string),
509            Err(err) => {
510                warn!(
511                    error.kind = classify_sdk_error(&err),
512                    error.detail = %DisplayErrorContext(&err),
513                    dlq_name = %dlq_name,
514                    "Failed to resolve DLQ URL at startup"
515                );
516                None
517            }
518        }
519    }
520
521    /// Returns the approximate message count for a cached DLQ URL.
522    ///
523    /// Uses URLs resolved and cached at startup, requiring only a single
524    /// `get_queue_attributes` call per health check (no URL resolution).
525    async fn get_dlq_message_count(&self, queue_type: &QueueType) -> u64 {
526        let Some(dlq_url) = self.dlq_urls.get(queue_type) else {
527            return 0;
528        };
529
530        match self
531            .sqs_client
532            .get_queue_attributes()
533            .queue_url(dlq_url)
534            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
535            .send()
536            .await
537        {
538            Ok(output) => output
539                .attributes()
540                .and_then(|attrs| {
541                    attrs.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
542                })
543                .and_then(|value| value.parse::<u64>().ok())
544                .unwrap_or(0),
545            Err(err) => {
546                warn!(
547                    error.kind = classify_sdk_error(&err),
548                    error.detail = %DisplayErrorContext(&err),
549                    dlq_url = %dlq_url,
550                    "Failed to fetch DLQ depth"
551                );
552                0
553            }
554        }
555    }
556}
557
558#[async_trait]
559impl QueueBackend for SqsBackend {
560    async fn produce_transaction_request(
561        &self,
562        job: Job<TransactionRequest>,
563        scheduled_on: Option<i64>,
564    ) -> Result<String, QueueBackendError> {
565        let queue_url = self
566            .queue_urls
567            .get(&QueueType::TransactionRequest)
568            .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionRequest".to_string()))?;
569
570        let body = serde_json::to_string(&job).map_err(|e| {
571            error!(error = %e, "Failed to serialize TransactionRequest job");
572            QueueBackendError::SerializationError(e.to_string())
573        })?;
574
575        let message_group_id = transaction_message_group_id(
576            job.data.network_type.as_ref(),
577            &job.data.relayer_id,
578            &job.data.transaction_id,
579        );
580        let message_deduplication_id = job.message_id.clone();
581        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
582
583        self.send_message_to_sqs(
584            queue_url,
585            body,
586            message_group_id,
587            message_deduplication_id,
588            delay_seconds,
589            scheduled_on,
590        )
591        .await
592    }
593
594    async fn produce_transaction_submission(
595        &self,
596        job: Job<TransactionSend>,
597        scheduled_on: Option<i64>,
598    ) -> Result<String, QueueBackendError> {
599        let queue_url = self
600            .queue_urls
601            .get(&QueueType::TransactionSubmission)
602            .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionSubmission".to_string()))?;
603
604        let body = serde_json::to_string(&job).map_err(|e| {
605            error!(error = %e, "Failed to serialize TransactionSend job");
606            QueueBackendError::SerializationError(e.to_string())
607        })?;
608
609        let message_group_id = transaction_message_group_id(
610            job.data.network_type.as_ref(),
611            &job.data.relayer_id,
612            &job.data.transaction_id,
613        );
614        let message_deduplication_id = job.message_id.clone();
615        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
616
617        self.send_message_to_sqs(
618            queue_url,
619            body,
620            message_group_id,
621            message_deduplication_id,
622            delay_seconds,
623            scheduled_on,
624        )
625        .await
626    }
627
628    async fn produce_transaction_status_check(
629        &self,
630        job: Job<TransactionStatusCheck>,
631        scheduled_on: Option<i64>,
632    ) -> Result<String, QueueBackendError> {
633        // Route to network-specific queue based on network type.
634        // EVM and Stellar get dedicated queues with tuned concurrency/polling;
635        // Solana and unknown network types use the generic StatusCheck queue.
636        let queue_type = status_check_queue_type(job.data.network_type.as_ref());
637        let queue_url = self
638            .queue_urls
639            .get(&queue_type)
640            .ok_or_else(|| QueueBackendError::QueueNotFound(format!("{queue_type}")))?;
641
642        let body = serde_json::to_string(&job).map_err(|e| {
643            error!(error = %e, "Failed to serialize TransactionStatusCheck job");
644            QueueBackendError::SerializationError(e.to_string())
645        })?;
646
647        let message_group_id = job.data.transaction_id.clone();
648        let message_deduplication_id = job.message_id.clone();
649        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
650
651        self.send_message_to_sqs(
652            queue_url,
653            body,
654            message_group_id,
655            message_deduplication_id,
656            delay_seconds,
657            scheduled_on,
658        )
659        .await
660    }
661
662    async fn produce_notification(
663        &self,
664        job: Job<NotificationSend>,
665        scheduled_on: Option<i64>,
666    ) -> Result<String, QueueBackendError> {
667        let queue_url = self
668            .queue_urls
669            .get(&QueueType::Notification)
670            .ok_or_else(|| QueueBackendError::QueueNotFound("Notification".to_string()))?;
671
672        let body = serde_json::to_string(&job).map_err(|e| {
673            error!(error = %e, "Failed to serialize NotificationSend job");
674            QueueBackendError::SerializationError(e.to_string())
675        })?;
676
677        // Notifications use notification_id as the group ID
678        let message_group_id = job.data.notification_id.clone();
679        let message_deduplication_id = job.message_id.clone();
680        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
681
682        self.send_message_to_sqs(
683            queue_url,
684            body,
685            message_group_id,
686            message_deduplication_id,
687            delay_seconds,
688            scheduled_on,
689        )
690        .await
691    }
692
693    async fn produce_token_swap_request(
694        &self,
695        job: Job<TokenSwapRequest>,
696        scheduled_on: Option<i64>,
697    ) -> Result<String, QueueBackendError> {
698        let queue_url = self
699            .queue_urls
700            .get(&QueueType::TokenSwapRequest)
701            .ok_or_else(|| QueueBackendError::QueueNotFound("TokenSwapRequest".to_string()))?;
702
703        let body = serde_json::to_string(&job).map_err(|e| {
704            error!(error = %e, "Failed to serialize TokenSwapRequest job");
705            QueueBackendError::SerializationError(e.to_string())
706        })?;
707
708        let message_group_id = job.data.relayer_id.clone();
709        let message_deduplication_id = job.message_id.clone();
710        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
711
712        self.send_message_to_sqs(
713            queue_url,
714            body,
715            message_group_id,
716            message_deduplication_id,
717            delay_seconds,
718            scheduled_on,
719        )
720        .await
721    }
722
723    async fn produce_relayer_health_check(
724        &self,
725        job: Job<RelayerHealthCheck>,
726        scheduled_on: Option<i64>,
727    ) -> Result<String, QueueBackendError> {
728        let queue_url = self
729            .queue_urls
730            .get(&QueueType::RelayerHealthCheck)
731            .ok_or_else(|| QueueBackendError::QueueNotFound("RelayerHealthCheck".to_string()))?;
732
733        let body = serde_json::to_string(&job).map_err(|e| {
734            error!(error = %e, "Failed to serialize RelayerHealthCheck job");
735            QueueBackendError::SerializationError(e.to_string())
736        })?;
737
738        let message_group_id = job.data.relayer_id.clone();
739        let message_deduplication_id = job.message_id.clone();
740        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
741
742        self.send_message_to_sqs(
743            queue_url,
744            body,
745            message_group_id,
746            message_deduplication_id,
747            delay_seconds,
748            scheduled_on,
749        )
750        .await
751    }
752
753    async fn initialize_workers(
754        &self,
755        app_state: Arc<ThinData<DefaultAppState>>,
756    ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
757        info!(
758            "Initializing SQS workers for {} queues",
759            self.queue_urls.len()
760        );
761
762        let mut handles = Vec::new();
763
764        // Spawn a worker for each queue type
765        for (queue_type, queue_url) in &self.queue_urls {
766            let handle = super::sqs_worker::spawn_worker_for_queue(
767                self.sqs_client.clone(),
768                *queue_type,
769                queue_url.clone(),
770                app_state.clone(),
771                self.shutdown_tx.subscribe(),
772            )
773            .await?;
774
775            handles.push(handle);
776        }
777
778        // Start cron scheduler for periodic tasks (cleanup, token swaps)
779        let cron_scheduler =
780            super::sqs_cron::SqsCronScheduler::new(app_state.clone(), self.shutdown_tx.subscribe());
781        let cron_handles = cron_scheduler.start().await?;
782        handles.extend(cron_handles);
783
784        // Internal shutdown signal handler — listens for SIGINT/SIGTERM and
785        // broadcasts shutdown to all SQS workers and cron tasks.
786        // (Redis/Apalis workers handle signals via their own Monitor.)
787        {
788            let shutdown_tx = self.shutdown_tx.clone();
789            let handle = tokio::spawn(async move {
790                let mut sigint =
791                    tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
792                        .expect("Failed to create SIGINT handler");
793                let mut sigterm =
794                    tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
795                        .expect("Failed to create SIGTERM handler");
796
797                tokio::select! {
798                    _ = sigint.recv() => info!("SQS backend: received SIGINT, shutting down workers"),
799                    _ = sigterm.recv() => info!("SQS backend: received SIGTERM, shutting down workers"),
800                }
801
802                let _ = shutdown_tx.send(true);
803            });
804            handles.push(WorkerHandle::Tokio(handle));
805        }
806
807        info!(
808            "Successfully spawned {} SQS workers and cron tasks",
809            handles.len()
810        );
811        Ok(handles)
812    }
813
814    async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
815        let mut health_statuses = Vec::new();
816
817        for (queue_type, queue_url) in &self.queue_urls {
818            // Get queue attributes to check health
819            let result = self
820                .sqs_client
821                .get_queue_attributes()
822                .queue_url(queue_url)
823                .attribute_names(
824                    aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages,
825                )
826                .attribute_names(
827                    aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
828                )
829                .send()
830                .await;
831
832            let (messages_visible, messages_in_flight, messages_dlq, is_healthy) = match result {
833                Ok(output) => {
834                    let attrs = output.attributes();
835                    let visible = attrs
836                        .and_then(|a| {
837                            a.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
838                        })
839                        .and_then(|v| v.parse::<u64>().ok())
840                        .unwrap_or(0);
841                    let in_flight = attrs
842                        .and_then(|a| {
843                            a.get(
844                                &aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
845                            )
846                        })
847                        .and_then(|v| v.parse::<u64>().ok())
848                        .unwrap_or(0);
849                    let dlq_count = self.get_dlq_message_count(queue_type).await;
850                    (visible, in_flight, dlq_count, true)
851                }
852                Err(e) => {
853                    error!(
854                        error = %e,
855                        queue_type = ?queue_type,
856                        "Failed to get queue attributes"
857                    );
858                    (0, 0, 0, false)
859                }
860            };
861
862            health_statuses.push(QueueHealth {
863                queue_type: *queue_type,
864                messages_visible,
865                messages_in_flight,
866                messages_dlq,
867                backend: "sqs".to_string(),
868                is_healthy,
869            });
870        }
871
872        Ok(health_statuses)
873    }
874
875    fn backend_type(&self) -> QueueBackendType {
876        QueueBackendType::Sqs
877    }
878
879    fn shutdown(&self) {
880        info!("SQS backend: broadcasting shutdown signal to all workers");
881        let _ = self.shutdown_tx.send(true);
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::jobs::{Job, JobType, TransactionStatusCheck};
889    use crate::models::NetworkType;
890
891    #[test]
892    fn test_calculate_delay_seconds() {
893        // No scheduled time
894        assert_eq!(SqsBackend::calculate_delay_seconds(None), None);
895
896        // Past time
897        let past = SystemTime::now()
898            .duration_since(SystemTime::UNIX_EPOCH)
899            .unwrap()
900            .as_secs() as i64
901            - 10;
902        assert_eq!(SqsBackend::calculate_delay_seconds(Some(past)), None);
903
904        // Future time within SQS limit (< 900s)
905        let future_5s = SystemTime::now()
906            .duration_since(SystemTime::UNIX_EPOCH)
907            .unwrap()
908            .as_secs() as i64
909            + 5;
910        assert_eq!(
911            SqsBackend::calculate_delay_seconds(Some(future_5s)),
912            Some(5)
913        );
914
915        // Future time beyond SQS limit (> 900s) - should clamp to 900
916        let future_1000s = SystemTime::now()
917            .duration_since(SystemTime::UNIX_EPOCH)
918            .unwrap()
919            .as_secs() as i64
920            + 1000;
921        assert_eq!(
922            SqsBackend::calculate_delay_seconds(Some(future_1000s)),
923            Some(900)
924        );
925    }
926
927    #[test]
928    fn test_calculate_delay_seconds_edge_cases() {
929        // Exactly at current time (should return None)
930        let now = SystemTime::now()
931            .duration_since(SystemTime::UNIX_EPOCH)
932            .unwrap()
933            .as_secs() as i64;
934        assert_eq!(SqsBackend::calculate_delay_seconds(Some(now)), None);
935
936        // Exactly at SQS limit (900s)
937        let future_900s = now + 900;
938        assert_eq!(
939            SqsBackend::calculate_delay_seconds(Some(future_900s)),
940            Some(900)
941        );
942
943        // Just over SQS limit (901s) - should clamp to 900
944        let future_901s = now + 901;
945        assert_eq!(
946            SqsBackend::calculate_delay_seconds(Some(future_901s)),
947            Some(900)
948        );
949    }
950
951    #[test]
952    fn test_sqs_backend_type_value() {
953        assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
954        assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
955    }
956
957    #[test]
958    fn test_queue_url_construction() {
959        // Test that queue URLs are correctly constructed
960        let mut queue_urls = HashMap::new();
961        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
962
963        queue_urls.insert(
964            QueueType::TransactionRequest,
965            format!("{prefix}transaction-request.fifo"),
966        );
967        queue_urls.insert(
968            QueueType::TransactionSubmission,
969            format!("{prefix}transaction-submission.fifo"),
970        );
971        queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check.fifo"));
972        queue_urls.insert(
973            QueueType::Notification,
974            format!("{prefix}notification.fifo"),
975        );
976        queue_urls.insert(
977            QueueType::TokenSwapRequest,
978            format!("{prefix}token-swap-request.fifo"),
979        );
980        queue_urls.insert(
981            QueueType::RelayerHealthCheck,
982            format!("{prefix}relayer-health-check.fifo"),
983        );
984        queue_urls.insert(
985            QueueType::StatusCheckEvm,
986            format!("{prefix}status-check-evm.fifo"),
987        );
988        queue_urls.insert(
989            QueueType::StatusCheckStellar,
990            format!("{prefix}status-check-stellar.fifo"),
991        );
992
993        // Verify all queue types have URLs
994        assert_eq!(queue_urls.len(), 8);
995        assert!(queue_urls
996            .get(&QueueType::TransactionRequest)
997            .unwrap()
998            .ends_with(".fifo"));
999        assert!(queue_urls
1000            .get(&QueueType::TransactionSubmission)
1001            .unwrap()
1002            .contains("transaction-submission"));
1003        assert!(queue_urls
1004            .get(&QueueType::StatusCheckEvm)
1005            .unwrap()
1006            .contains("status-check-evm"));
1007        assert!(queue_urls
1008            .get(&QueueType::StatusCheckStellar)
1009            .unwrap()
1010            .contains("status-check-stellar"));
1011    }
1012
1013    #[test]
1014    fn test_queue_url_construction_standard() {
1015        // Test that standard queue URLs do not have .fifo suffix
1016        let mut queue_urls = HashMap::new();
1017        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1018
1019        queue_urls.insert(
1020            QueueType::TransactionRequest,
1021            format!("{prefix}transaction-request"),
1022        );
1023        queue_urls.insert(
1024            QueueType::TransactionSubmission,
1025            format!("{prefix}transaction-submission"),
1026        );
1027        queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check"));
1028        queue_urls.insert(QueueType::Notification, format!("{prefix}notification"));
1029        queue_urls.insert(
1030            QueueType::TokenSwapRequest,
1031            format!("{prefix}token-swap-request"),
1032        );
1033        queue_urls.insert(
1034            QueueType::RelayerHealthCheck,
1035            format!("{prefix}relayer-health-check"),
1036        );
1037        queue_urls.insert(
1038            QueueType::StatusCheckEvm,
1039            format!("{prefix}status-check-evm"),
1040        );
1041        queue_urls.insert(
1042            QueueType::StatusCheckStellar,
1043            format!("{prefix}status-check-stellar"),
1044        );
1045
1046        assert_eq!(queue_urls.len(), 8);
1047        // Standard queue URLs should NOT end with .fifo
1048        for (_, url) in &queue_urls {
1049            assert!(
1050                !url.ends_with(".fifo"),
1051                "Standard queue URL should not end with .fifo: {url}"
1052            );
1053        }
1054        assert!(queue_urls
1055            .get(&QueueType::TransactionRequest)
1056            .unwrap()
1057            .contains("transaction-request"));
1058    }
1059
1060    #[test]
1061    fn test_is_fifo_queue_url_standard() {
1062        assert!(!SqsBackend::is_fifo_queue_url(
1063            "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request"
1064        ));
1065        assert!(!SqsBackend::is_fifo_queue_url(
1066            "http://localstack:4566/000000000000/relayer-status-check"
1067        ));
1068    }
1069
1070    #[test]
1071    fn test_transaction_message_group_id_evm_uses_transaction() {
1072        let group = transaction_message_group_id(Some(&NetworkType::Evm), "relayer-1", "tx-123");
1073        assert_eq!(group, "tx-123");
1074    }
1075
1076    #[test]
1077    fn test_transaction_message_group_id_stellar_uses_transaction() {
1078        let group =
1079            transaction_message_group_id(Some(&NetworkType::Stellar), "relayer-1", "tx-123");
1080        assert_eq!(group, "tx-123");
1081    }
1082
1083    #[test]
1084    fn test_transaction_message_group_id_solana_uses_transaction() {
1085        let group = transaction_message_group_id(Some(&NetworkType::Solana), "relayer-1", "tx-123");
1086        assert_eq!(group, "tx-123");
1087    }
1088
1089    #[test]
1090    fn test_transaction_message_group_id_none_defaults_to_transaction() {
1091        let group = transaction_message_group_id(None, "relayer-1", "tx-123");
1092        assert_eq!(
1093            group, "tx-123",
1094            "Unknown network should default to transaction id"
1095        );
1096    }
1097
1098    #[test]
1099    fn test_status_check_queue_type_evm() {
1100        assert_eq!(
1101            status_check_queue_type(Some(&NetworkType::Evm)),
1102            QueueType::StatusCheckEvm
1103        );
1104    }
1105
1106    #[test]
1107    fn test_status_check_queue_type_stellar() {
1108        assert_eq!(
1109            status_check_queue_type(Some(&NetworkType::Stellar)),
1110            QueueType::StatusCheckStellar
1111        );
1112    }
1113
1114    #[test]
1115    fn test_status_check_queue_type_solana_defaults_to_generic() {
1116        assert_eq!(
1117            status_check_queue_type(Some(&NetworkType::Solana)),
1118            QueueType::StatusCheck
1119        );
1120    }
1121
1122    #[test]
1123    fn test_status_check_queue_type_none_defaults_to_generic() {
1124        assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
1125    }
1126
1127    #[test]
1128    fn test_sqs_max_message_size_constant() {
1129        assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 256 * 1024);
1130    }
1131
1132    #[test]
1133    fn test_is_fifo_queue_url() {
1134        assert!(SqsBackend::is_fifo_queue_url(
1135            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1136        ));
1137        assert!(!SqsBackend::is_fifo_queue_url(
1138            "https://sqs.us-east-1.amazonaws.com/123/queue"
1139        ));
1140    }
1141
1142    // --- resolve_queue_type tests ---
1143
1144    const REF_STD: &str = "http://localhost:4566/000000000000/relayer-transaction-request";
1145    const REF_FIFO: &str = "http://localhost:4566/000000000000/relayer-transaction-request.fifo";
1146
1147    #[test]
1148    fn test_resolve_queue_type_explicit_standard() {
1149        let result = resolve_queue_type("standard", None, REF_STD, REF_FIFO);
1150        assert_eq!(result.unwrap(), false);
1151    }
1152
1153    #[test]
1154    fn test_resolve_queue_type_explicit_fifo() {
1155        let result = resolve_queue_type("fifo", None, REF_STD, REF_FIFO);
1156        assert_eq!(result.unwrap(), true);
1157    }
1158
1159    #[test]
1160    fn test_resolve_queue_type_explicit_ignores_probes() {
1161        // Even if probes say FIFO exists, explicit "standard" wins
1162        let result = resolve_queue_type("standard", Some((false, true)), REF_STD, REF_FIFO);
1163        assert_eq!(result.unwrap(), false);
1164    }
1165
1166    #[test]
1167    fn test_resolve_queue_type_auto_standard_only() {
1168        let result = resolve_queue_type("auto", Some((true, false)), REF_STD, REF_FIFO);
1169        assert_eq!(result.unwrap(), false);
1170    }
1171
1172    #[test]
1173    fn test_resolve_queue_type_auto_fifo_only() {
1174        let result = resolve_queue_type("auto", Some((false, true)), REF_STD, REF_FIFO);
1175        assert_eq!(result.unwrap(), true);
1176    }
1177
1178    #[test]
1179    fn test_resolve_queue_type_auto_both_exist_errors() {
1180        let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1181        assert!(result.is_err());
1182        let err = result.unwrap_err().to_string();
1183        assert!(
1184            err.contains("Ambiguous"),
1185            "Expected 'Ambiguous' error, got: {err}"
1186        );
1187    }
1188
1189    #[test]
1190    fn test_resolve_queue_type_auto_neither_exists_errors() {
1191        let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1192        assert!(result.is_err());
1193        let err = result.unwrap_err().to_string();
1194        assert!(
1195            err.contains("No SQS queues found"),
1196            "Expected 'No SQS queues found' error, got: {err}"
1197        );
1198    }
1199
1200    #[test]
1201    fn test_resolve_queue_type_auto_no_probes_defaults_to_neither() {
1202        // None probe results (shouldn't happen in practice) treated as (false, false)
1203        let result = resolve_queue_type("auto", None, REF_STD, REF_FIFO);
1204        assert!(result.is_err());
1205    }
1206
1207    #[test]
1208    fn test_resolve_queue_type_unknown_value_errors() {
1209        let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1210        assert!(result.is_err());
1211        let err = result.unwrap_err().to_string();
1212        assert!(
1213            err.contains("Unsupported SQS_QUEUE_TYPE"),
1214            "Expected unsupported error, got: {err}"
1215        );
1216    }
1217
1218    // ── resolve_queue_type: error variant and message checks ──────────
1219
1220    #[test]
1221    fn test_resolve_queue_type_auto_neither_error_includes_urls() {
1222        let std_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request";
1223        let fifo_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request.fifo";
1224        let result = resolve_queue_type("auto", Some((false, false)), std_url, fifo_url);
1225        let err = result.unwrap_err();
1226        let msg = err.to_string();
1227        assert!(
1228            msg.contains(std_url),
1229            "Error should include standard URL: {msg}"
1230        );
1231        assert!(
1232            msg.contains(fifo_url),
1233            "Error should include FIFO URL: {msg}"
1234        );
1235    }
1236
1237    #[test]
1238    fn test_resolve_queue_type_returns_config_error_variant() {
1239        let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1240        assert!(
1241            matches!(result, Err(QueueBackendError::ConfigError(_))),
1242            "Expected ConfigError variant"
1243        );
1244
1245        let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1246        assert!(
1247            matches!(result, Err(QueueBackendError::ConfigError(_))),
1248            "Ambiguous case should be ConfigError"
1249        );
1250
1251        let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1252        assert!(
1253            matches!(result, Err(QueueBackendError::ConfigError(_))),
1254            "No queues case should be ConfigError"
1255        );
1256    }
1257
1258    #[test]
1259    fn test_resolve_queue_type_unknown_includes_value_in_error() {
1260        let result = resolve_queue_type("redis", None, REF_STD, REF_FIFO);
1261        let msg = result.unwrap_err().to_string();
1262        assert!(
1263            msg.contains("redis"),
1264            "Error should echo the invalid value: {msg}"
1265        );
1266
1267        let result = resolve_queue_type("", None, REF_STD, REF_FIFO);
1268        assert!(result.is_err(), "Empty string should be rejected");
1269    }
1270
1271    #[test]
1272    fn test_resolve_queue_type_case_sensitive() {
1273        // The function matches exact lowercase strings; mixed case is unsupported
1274        assert!(resolve_queue_type("Standard", None, REF_STD, REF_FIFO).is_err());
1275        assert!(resolve_queue_type("FIFO", None, REF_STD, REF_FIFO).is_err());
1276        assert!(resolve_queue_type("Auto", None, REF_STD, REF_FIFO).is_err());
1277    }
1278
1279    // ── calculate_delay_seconds: additional edge cases ────────────────
1280
1281    #[test]
1282    fn test_calculate_delay_seconds_one_second_future() {
1283        let future_1s = SystemTime::now()
1284            .duration_since(SystemTime::UNIX_EPOCH)
1285            .unwrap()
1286            .as_secs() as i64
1287            + 2; // +2 to avoid race with timing
1288        let result = SqsBackend::calculate_delay_seconds(Some(future_1s));
1289        assert!(result.is_some(), "1-2s in future should yield Some");
1290        assert!(result.unwrap() > 0, "Delay should be positive");
1291        assert!(result.unwrap() <= 2, "Delay should be at most 2s");
1292    }
1293
1294    #[test]
1295    fn test_calculate_delay_seconds_far_past() {
1296        // Unix epoch itself
1297        assert_eq!(SqsBackend::calculate_delay_seconds(Some(0)), None);
1298        // Negative timestamp (before epoch)
1299        assert_eq!(SqsBackend::calculate_delay_seconds(Some(-1000)), None);
1300    }
1301
1302    #[test]
1303    fn test_calculate_delay_seconds_very_far_future() {
1304        // Year ~2100 — should still clamp to 900
1305        let far_future = 4_102_444_800_i64; // 2100-01-01T00:00:00Z
1306        assert_eq!(
1307            SqsBackend::calculate_delay_seconds(Some(far_future)),
1308            Some(900)
1309        );
1310    }
1311
1312    #[test]
1313    fn test_calculate_delay_seconds_exactly_900_boundary() {
1314        let now = SystemTime::now()
1315            .duration_since(SystemTime::UNIX_EPOCH)
1316            .unwrap()
1317            .as_secs() as i64;
1318
1319        // 899s future → should return 899 (under the cap)
1320        let result = SqsBackend::calculate_delay_seconds(Some(now + 899));
1321        assert!(result.is_some());
1322        // Allow ±1 for timing
1323        let val = result.unwrap();
1324        assert!((898..=899).contains(&val), "Expected ~899, got {val}");
1325    }
1326
1327    // ── is_fifo_queue_url: edge cases ─────────────────────────────────
1328
1329    #[test]
1330    fn test_is_fifo_queue_url_empty() {
1331        assert!(!SqsBackend::is_fifo_queue_url(""));
1332    }
1333
1334    #[test]
1335    fn test_is_fifo_queue_url_case_sensitive() {
1336        assert!(!SqsBackend::is_fifo_queue_url(
1337            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1338        ));
1339        assert!(!SqsBackend::is_fifo_queue_url(
1340            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1341        ));
1342    }
1343
1344    #[test]
1345    fn test_is_fifo_queue_url_fifo_in_middle() {
1346        assert!(!SqsBackend::is_fifo_queue_url(
1347            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1348        ));
1349    }
1350
1351    #[test]
1352    fn test_is_fifo_queue_url_just_suffix() {
1353        assert!(SqsBackend::is_fifo_queue_url(".fifo"));
1354    }
1355
1356    #[test]
1357    fn test_is_fifo_queue_url_localstack() {
1358        assert!(SqsBackend::is_fifo_queue_url(
1359            "http://localhost:4566/000000000000/relayer-tx.fifo"
1360        ));
1361    }
1362
1363    // ── transaction_message_group_id: consistency ─────────────────────
1364
1365    #[test]
1366    fn test_transaction_message_group_id_always_returns_transaction_id() {
1367        // Current implementation returns transaction_id for all network types.
1368        // This test documents that behavior and catches accidental changes.
1369        let networks: &[Option<NetworkType>] = &[
1370            Some(NetworkType::Evm),
1371            Some(NetworkType::Stellar),
1372            Some(NetworkType::Solana),
1373            None,
1374        ];
1375
1376        for network in networks {
1377            let group = transaction_message_group_id(network.as_ref(), "relayer-99", "tx-abc");
1378            assert_eq!(
1379                group, "tx-abc",
1380                "Expected transaction_id for network {network:?}"
1381            );
1382        }
1383    }
1384
1385    // ── status_check_queue_type: returned queue names ──────────────────
1386
1387    #[test]
1388    fn test_status_check_queue_type_returns_distinct_queue_names() {
1389        let evm = status_check_queue_type(Some(&NetworkType::Evm));
1390        let stellar = status_check_queue_type(Some(&NetworkType::Stellar));
1391        let generic = status_check_queue_type(None);
1392
1393        assert_ne!(evm.queue_name(), stellar.queue_name());
1394        assert_ne!(evm.queue_name(), generic.queue_name());
1395        assert_ne!(stellar.queue_name(), generic.queue_name());
1396    }
1397
1398    #[test]
1399    fn test_status_check_queue_type_all_are_status_checks() {
1400        let networks: &[Option<&NetworkType>] = &[
1401            Some(&NetworkType::Evm),
1402            Some(&NetworkType::Stellar),
1403            Some(&NetworkType::Solana),
1404            None,
1405        ];
1406
1407        for network in networks {
1408            let qt = status_check_queue_type(*network);
1409            assert!(
1410                qt.is_status_check(),
1411                "{qt:?} should be a status check variant"
1412            );
1413        }
1414    }
1415
1416    // ── Queue URL construction algorithm ──────────────────────────────
1417
1418    #[test]
1419    fn test_queue_url_construction_algorithm_fifo() {
1420        // Replicate the algorithm from SqsBackend::new()
1421        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1422        let suffix = ".fifo";
1423
1424        let expected_urls = [
1425            (
1426                QueueType::TransactionRequest,
1427                format!("{prefix}transaction-request{suffix}"),
1428            ),
1429            (
1430                QueueType::TransactionSubmission,
1431                format!("{prefix}transaction-submission{suffix}"),
1432            ),
1433            (
1434                QueueType::StatusCheck,
1435                format!("{prefix}status-check{suffix}"),
1436            ),
1437            (
1438                QueueType::StatusCheckEvm,
1439                format!("{prefix}status-check-evm{suffix}"),
1440            ),
1441            (
1442                QueueType::StatusCheckStellar,
1443                format!("{prefix}status-check-stellar{suffix}"),
1444            ),
1445            (
1446                QueueType::Notification,
1447                format!("{prefix}notification{suffix}"),
1448            ),
1449            (
1450                QueueType::TokenSwapRequest,
1451                format!("{prefix}token-swap-request{suffix}"),
1452            ),
1453            (
1454                QueueType::RelayerHealthCheck,
1455                format!("{prefix}relayer-health-check{suffix}"),
1456            ),
1457        ];
1458
1459        for (qt, expected) in &expected_urls {
1460            assert!(
1461                SqsBackend::is_fifo_queue_url(expected),
1462                "{qt:?}: URL should be FIFO: {expected}"
1463            );
1464        }
1465    }
1466
1467    #[test]
1468    fn test_queue_url_construction_algorithm_standard() {
1469        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1470        let suffix = "";
1471
1472        let urls = [
1473            format!("{prefix}transaction-request{suffix}"),
1474            format!("{prefix}transaction-submission{suffix}"),
1475            format!("{prefix}status-check{suffix}"),
1476            format!("{prefix}notification{suffix}"),
1477        ];
1478
1479        for url in &urls {
1480            assert!(
1481                !SqsBackend::is_fifo_queue_url(url),
1482                "Standard URL should not be FIFO: {url}"
1483            );
1484        }
1485    }
1486
1487    // ── SQS_MAX_MESSAGE_SIZE_BYTES ────────────────────────────────────
1488
1489    #[test]
1490    fn test_sqs_max_message_size_matches_aws_limit() {
1491        // AWS SQS maximum message body size is exactly 256 KiB
1492        assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 262_144);
1493    }
1494
1495    // ── resolve_queue_type: all branches produce expected is_fifo ─────
1496
1497    #[test]
1498    fn test_resolve_queue_type_suffix_logic() {
1499        // Verify the suffix derived from resolve_queue_type produces correct URLs
1500        let test_cases = [
1501            ("standard", None, false),
1502            ("fifo", None, true),
1503            ("auto", Some((true, false)), false),
1504            ("auto", Some((false, true)), true),
1505        ];
1506
1507        for (sqs_type, probes, expected_fifo) in test_cases {
1508            let is_fifo = resolve_queue_type(sqs_type, probes, REF_STD, REF_FIFO).unwrap();
1509            assert_eq!(is_fifo, expected_fifo, "sqs_type={sqs_type}");
1510
1511            let suffix = if is_fifo { ".fifo" } else { "" };
1512            let url = format!("https://sqs.us-east-1.amazonaws.com/123/relayer-tx{suffix}");
1513            assert_eq!(
1514                SqsBackend::is_fifo_queue_url(&url),
1515                expected_fifo,
1516                "URL FIFO detection mismatch for sqs_type={sqs_type}"
1517            );
1518        }
1519    }
1520
1521    #[tokio::test]
1522    #[ignore]
1523    async fn smoke_push_status_check_to_sqs() {
1524        // Requires real AWS credentials and queue env config.
1525        // Expected env:
1526        // - AWS_REGION
1527        // - SQS_QUEUE_URL_PREFIX
1528        // - optional AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY/AWS_SESSION_TOKEN
1529        let backend = SqsBackend::new()
1530            .await
1531            .expect("SQS backend initialization failed");
1532        let job = Job::new(
1533            JobType::TransactionStatusCheck,
1534            TransactionStatusCheck::new("smoke-tx-id", "smoke-relayer", NetworkType::Stellar),
1535        );
1536        let now = SystemTime::now()
1537            .duration_since(SystemTime::UNIX_EPOCH)
1538            .expect("system time before unix epoch")
1539            .as_secs() as i64;
1540        let scheduled_on = Some(now + 2);
1541        let result = backend
1542            .produce_transaction_status_check(job, scheduled_on)
1543            .await;
1544        assert!(
1545            result.is_ok(),
1546            "Expected SendMessage via SQS backend to succeed, got: {result:?}"
1547        );
1548    }
1549}