openzeppelin_relayer/queues/redis/
worker.rs

1//! Redis/Apalis worker initialization.
2//!
3//! This module contains all Apalis-specific worker creation logic for the Redis
4//! queue backend, including WorkerBuilder configurations, Monitor setup,
5//! backoff strategies, and token swap cron workers.
6
7use actix_web::web::ThinData;
8
9use crate::{
10    config::ServerConfig,
11    constants::{
12        SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13        WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14        WORKER_TRANSACTION_CLEANUP_RETRIES,
15    },
16    jobs::{
17        notification_handler, relayer_health_check_handler, system_cleanup_handler,
18        token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20        Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21        TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22        TransactionRequest, TransactionSend, TransactionStatusCheck,
23    },
24    models::{
25        DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26        RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27    },
28    repositories::{
29        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30        Repository, TransactionCounterTrait, TransactionRepository,
31    },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39/// Re-exports from [`tower::util`]
40pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use crate::metrics::observe_queue_pickup_latency;
49
50use super::{filter_relayers_for_swap, QueueType, WorkerContext};
51use crate::queues::retry_config::{
52    RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
53    STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
54    TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
55    TX_SUBMISSION_BACKOFF,
56};
57
58// ---------------------------------------------------------------------------
59// Apalis adapter functions
60//
61// These thin adapters are the ONLY place where Apalis-specific handler types
62// (Data, Attempt, Worker<Context>, TaskId, RedisContext) appear. They convert
63// Apalis types → WorkerContext and HandlerError → apalis::prelude::Error,
64// keeping all handler business logic backend-neutral.
65// ---------------------------------------------------------------------------
66
67/// Sanity threshold (ms) for non-scheduled latency observations.
68///
69/// Latencies above this for a job whose only baseline was `Job.timestamp`
70/// almost certainly indicate clock skew between producer and consumer rather
71/// than a real backlog of that duration. We still observe the value (so the
72/// `+Inf` bucket reflects reality), but log a warning so operators can detect
73/// bad data instead of alerting on it.
74const PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS: i64 = 60 * 60 * 1000;
75
76/// Observe queue pickup latency for Redis/Apalis workers.
77///
78/// Uses `available_at` (the intended availability time) when present to exclude
79/// intentional scheduling delay. Falls back to `timestamp` (job creation time)
80/// for immediate jobs. Only records on the initial attempt — apalis
81/// `Attempt::current()` is 1-indexed, so `attempt == 1` is the first delivery;
82/// subsequent attempts would inflate the metric with retry backoff time.
83///
84/// If the chosen baseline fails to parse, the alternative is tried so a single
85/// corrupted field cannot silently drop the observation.
86fn observe_redis_pickup_latency(
87    attempt: usize,
88    available_at: Option<&String>,
89    job_timestamp: &str,
90    queue_type: &str,
91) {
92    if attempt != 1 {
93        return;
94    }
95    let baseline_epoch_secs = available_at
96        .and_then(|s| s.parse::<i64>().ok())
97        .or_else(|| job_timestamp.parse::<i64>().ok());
98    let Some(baseline_epoch_secs) = baseline_epoch_secs else {
99        tracing::warn!(
100            queue_type = queue_type,
101            available_at = ?available_at,
102            job_timestamp = %job_timestamp,
103            "skipping queue_pickup_latency: failed to parse both available_at and job_timestamp"
104        );
105        return;
106    };
107    let now_ms = chrono::Utc::now().timestamp_millis();
108    let delta_ms = now_ms - baseline_epoch_secs * 1000;
109    if available_at.is_none() && delta_ms > PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS {
110        tracing::warn!(
111            queue_type = queue_type,
112            latency_ms = delta_ms,
113            "queue_pickup_latency above sanity threshold for non-scheduled job; check producer/consumer clock skew"
114        );
115    }
116    let latency_secs = delta_ms.max(0) as f64 / 1000.0;
117    observe_queue_pickup_latency(queue_type, "redis", latency_secs);
118}
119
120async fn apalis_transaction_request_handler(
121    job: Job<TransactionRequest>,
122    state: Data<ThinData<DefaultAppState>>,
123    attempt: Attempt,
124    task_id: TaskId,
125) -> Result<(), apalis::prelude::Error> {
126    observe_redis_pickup_latency(
127        attempt.current(),
128        job.available_at.as_ref(),
129        &job.timestamp,
130        QueueType::TransactionRequest.queue_name(),
131    );
132    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
133    transaction_request_handler(job, (*state).clone(), ctx)
134        .await
135        .map_err(Into::into)
136}
137
138async fn apalis_transaction_submission_handler(
139    job: Job<TransactionSend>,
140    state: Data<ThinData<DefaultAppState>>,
141    attempt: Attempt,
142    task_id: TaskId,
143) -> Result<(), apalis::prelude::Error> {
144    observe_redis_pickup_latency(
145        attempt.current(),
146        job.available_at.as_ref(),
147        &job.timestamp,
148        QueueType::TransactionSubmission.queue_name(),
149    );
150    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
151    transaction_submission_handler(job, (*state).clone(), ctx)
152        .await
153        .map_err(Into::into)
154}
155
156async fn apalis_transaction_status_handler(
157    job: Job<TransactionStatusCheck>,
158    state: Data<ThinData<DefaultAppState>>,
159    attempt: Attempt,
160    task_id: TaskId,
161) -> Result<(), apalis::prelude::Error> {
162    observe_redis_pickup_latency(
163        attempt.current(),
164        job.available_at.as_ref(),
165        &job.timestamp,
166        QueueType::StatusCheck.queue_name(),
167    );
168    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
169    transaction_status_handler(job, (*state).clone(), ctx)
170        .await
171        .map_err(Into::into)
172}
173
174async fn apalis_transaction_status_evm_handler(
175    job: Job<TransactionStatusCheck>,
176    state: Data<ThinData<DefaultAppState>>,
177    attempt: Attempt,
178    task_id: TaskId,
179) -> Result<(), apalis::prelude::Error> {
180    observe_redis_pickup_latency(
181        attempt.current(),
182        job.available_at.as_ref(),
183        &job.timestamp,
184        QueueType::StatusCheckEvm.queue_name(),
185    );
186    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
187    transaction_status_handler(job, (*state).clone(), ctx)
188        .await
189        .map_err(Into::into)
190}
191
192async fn apalis_transaction_status_stellar_handler(
193    job: Job<TransactionStatusCheck>,
194    state: Data<ThinData<DefaultAppState>>,
195    attempt: Attempt,
196    task_id: TaskId,
197) -> Result<(), apalis::prelude::Error> {
198    observe_redis_pickup_latency(
199        attempt.current(),
200        job.available_at.as_ref(),
201        &job.timestamp,
202        QueueType::StatusCheckStellar.queue_name(),
203    );
204    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
205    transaction_status_handler(job, (*state).clone(), ctx)
206        .await
207        .map_err(Into::into)
208}
209
210async fn apalis_notification_handler(
211    job: Job<NotificationSend>,
212    state: Data<ThinData<DefaultAppState>>,
213    attempt: Attempt,
214    task_id: TaskId,
215) -> Result<(), apalis::prelude::Error> {
216    observe_redis_pickup_latency(
217        attempt.current(),
218        job.available_at.as_ref(),
219        &job.timestamp,
220        QueueType::Notification.queue_name(),
221    );
222    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
223    notification_handler(job, (*state).clone(), ctx)
224        .await
225        .map_err(Into::into)
226}
227
228async fn apalis_token_swap_request_handler(
229    job: Job<TokenSwapRequest>,
230    state: Data<ThinData<DefaultAppState>>,
231    attempt: Attempt,
232    task_id: TaskId,
233) -> Result<(), apalis::prelude::Error> {
234    observe_redis_pickup_latency(
235        attempt.current(),
236        job.available_at.as_ref(),
237        &job.timestamp,
238        QueueType::TokenSwapRequest.queue_name(),
239    );
240    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
241    token_swap_request_handler(job, (*state).clone(), ctx)
242        .await
243        .map_err(Into::into)
244}
245
246async fn apalis_relayer_health_check_handler(
247    job: Job<RelayerHealthCheck>,
248    state: Data<ThinData<DefaultAppState>>,
249    attempt: Attempt,
250    task_id: TaskId,
251) -> Result<(), apalis::prelude::Error> {
252    observe_redis_pickup_latency(
253        attempt.current(),
254        job.available_at.as_ref(),
255        &job.timestamp,
256        QueueType::RelayerHealthCheck.queue_name(),
257    );
258    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
259    relayer_health_check_handler(job, (*state).clone(), ctx)
260        .await
261        .map_err(Into::into)
262}
263
264async fn apalis_transaction_cleanup_handler(
265    _job: TransactionCleanupCronReminder,
266    state: Data<ThinData<DefaultAppState>>,
267    attempt: Attempt,
268    task_id: TaskId,
269) -> Result<(), apalis::prelude::Error> {
270    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
271    transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
272        .await
273        .map_err(Into::into)
274}
275
276async fn apalis_system_cleanup_handler(
277    _job: SystemCleanupCronReminder,
278    state: Data<ThinData<DefaultAppState>>,
279    attempt: Attempt,
280    task_id: TaskId,
281) -> Result<(), apalis::prelude::Error> {
282    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
283    system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
284        .await
285        .map_err(Into::into)
286}
287
288async fn apalis_token_swap_cron_handler(
289    _job: TokenSwapCronReminder,
290    relayer_id: Data<String>,
291    state: Data<ThinData<DefaultAppState>>,
292    attempt: Attempt,
293    task_id: TaskId,
294) -> Result<(), apalis::prelude::Error> {
295    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
296    token_swap_cron_handler(
297        TokenSwapCronReminder(),
298        (*relayer_id).clone(),
299        (*state).clone(),
300        ctx,
301    )
302    .await
303    .map_err(Into::into)
304}
305
306const TRANSACTION_REQUEST: &str = "transaction_request";
307const TRANSACTION_SENDER: &str = "transaction_sender";
308// Generic transaction status checker
309const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
310// Network specific status checkers
311const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
312const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
313const NOTIFICATION_SENDER: &str = "notification_sender";
314const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
315const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
316const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
317const SYSTEM_CLEANUP: &str = "system_cleanup";
318
319/// Creates an exponential backoff with configurable parameters
320///
321/// # Arguments
322/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
323/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
324/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
325///
326/// # Returns
327/// A configured backoff instance ready for use with RetryPolicy
328fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
329    let maker = ExponentialBackoffMaker::new(
330        Duration::from_millis(initial_ms),
331        Duration::from_millis(max_ms),
332        jitter,
333        HasherRng::default(),
334    )?;
335
336    Ok(maker)
337}
338
339fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
340    create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
341}
342
343/// Initializes Redis/Apalis workers and starts the lifecycle monitor.
344///
345/// # Arguments
346/// * `app_state` - Application state containing the job producer and configuration
347pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
348    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
349) -> Result<()>
350where
351    J: JobProducerTrait + Send + Sync + 'static,
352    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
353    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
354    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
355    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
356    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
357    TCR: TransactionCounterTrait + Send + Sync + 'static,
358    PR: PluginRepositoryTrait + Send + Sync + 'static,
359    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
360{
361    let queue_backend = app_state
362        .job_producer
363        .get_queue_backend()
364        .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
365    let queue = queue_backend
366        .queue()
367        .cloned()
368        .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
369
370    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
371        .layer(ErrorHandlingLayer::new())
372        .retry(
373            RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
374                .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
375        )
376        .enable_tracing()
377        .catch_panic()
378        .concurrency(ServerConfig::get_worker_concurrency(
379            QueueType::TransactionRequest.concurrency_env_key(),
380            QueueType::TransactionRequest.default_concurrency(),
381        ))
382        .data(app_state.clone())
383        .backend(queue.transaction_request_queue.clone())
384        .build_fn(apalis_transaction_request_handler);
385
386    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
387        .layer(ErrorHandlingLayer::new())
388        .enable_tracing()
389        .catch_panic()
390        .retry(
391            RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
392                .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
393        )
394        .concurrency(ServerConfig::get_worker_concurrency(
395            QueueType::TransactionSubmission.concurrency_env_key(),
396            QueueType::TransactionSubmission.default_concurrency(),
397        ))
398        .data(app_state.clone())
399        .backend(queue.transaction_submission_queue.clone())
400        .build_fn(apalis_transaction_submission_handler);
401
402    // Generic status checker
403    // Uses medium settings that work reasonably for most chains
404    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
405        .layer(ErrorHandlingLayer::new())
406        .enable_tracing()
407        .catch_panic()
408        .retry(
409            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
410                .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
411        )
412        .concurrency(ServerConfig::get_worker_concurrency(
413            QueueType::StatusCheck.concurrency_env_key(),
414            QueueType::StatusCheck.default_concurrency(),
415        ))
416        .data(app_state.clone())
417        .backend(queue.transaction_status_queue.clone())
418        .build_fn(apalis_transaction_status_handler);
419
420    // EVM status checker - slower retries to avoid premature resubmission
421    // EVM has longer block times (~12s) and needs time for resubmission logic
422    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
423        .layer(ErrorHandlingLayer::new())
424        .enable_tracing()
425        .catch_panic()
426        .retry(
427            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
428                .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
429        )
430        .concurrency(ServerConfig::get_worker_concurrency(
431            QueueType::StatusCheckEvm.concurrency_env_key(),
432            QueueType::StatusCheckEvm.default_concurrency(),
433        ))
434        .data(app_state.clone())
435        .backend(queue.transaction_status_queue_evm.clone())
436        .build_fn(apalis_transaction_status_evm_handler);
437
438    // Stellar status checker - fast retries for fast finality
439    // Stellar has sub-second finality, needs more frequent status checks
440    let transaction_status_queue_worker_stellar =
441        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
442            .layer(ErrorHandlingLayer::new())
443            .enable_tracing()
444            .catch_panic()
445            .retry(
446                RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
447                    create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
448                ),
449            )
450            .concurrency(ServerConfig::get_worker_concurrency(
451                QueueType::StatusCheckStellar.concurrency_env_key(),
452                QueueType::StatusCheckStellar.default_concurrency(),
453            ))
454            .data(app_state.clone())
455            .backend(queue.transaction_status_queue_stellar.clone())
456            .build_fn(apalis_transaction_status_stellar_handler);
457
458    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
459        .layer(ErrorHandlingLayer::new())
460        .enable_tracing()
461        .catch_panic()
462        .retry(
463            RetryPolicy::retries(QueueType::Notification.max_retries())
464                .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
465        )
466        .concurrency(ServerConfig::get_worker_concurrency(
467            QueueType::Notification.concurrency_env_key(),
468            QueueType::Notification.default_concurrency(),
469        ))
470        .data(app_state.clone())
471        .backend(queue.notification_queue.clone())
472        .build_fn(apalis_notification_handler);
473
474    let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
475        .layer(ErrorHandlingLayer::new())
476        .enable_tracing()
477        .catch_panic()
478        .retry(
479            RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
480                create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
481            ),
482        )
483        .concurrency(ServerConfig::get_worker_concurrency(
484            QueueType::TokenSwapRequest.concurrency_env_key(),
485            QueueType::TokenSwapRequest.default_concurrency(),
486        ))
487        .data(app_state.clone())
488        .backend(queue.token_swap_request_queue.clone())
489        .build_fn(apalis_token_swap_request_handler);
490
491    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
492        .layer(ErrorHandlingLayer::new())
493        .enable_tracing()
494        .catch_panic()
495        .retry(
496            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
497                .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
498        )
499        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
500        .data(app_state.clone())
501        .backend(CronStream::new(
502            apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
503        ))
504        .build_fn(apalis_transaction_cleanup_handler);
505
506    let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
507        .layer(ErrorHandlingLayer::new())
508        .enable_tracing()
509        .catch_panic()
510        .retry(
511            RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
512                .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
513        )
514        .concurrency(1)
515        .data(app_state.clone())
516        .backend(CronStream::new(apalis_cron::Schedule::from_str(
517            SYSTEM_CLEANUP_CRON_SCHEDULE,
518        )?))
519        .build_fn(apalis_system_cleanup_handler);
520
521    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
522        .layer(ErrorHandlingLayer::new())
523        .enable_tracing()
524        .catch_panic()
525        .retry(
526            RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
527                .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
528        )
529        .concurrency(ServerConfig::get_worker_concurrency(
530            QueueType::RelayerHealthCheck.concurrency_env_key(),
531            QueueType::RelayerHealthCheck.default_concurrency(),
532        ))
533        .data(app_state.clone())
534        .backend(queue.relayer_health_check_queue.clone())
535        .build_fn(apalis_relayer_health_check_handler);
536
537    let monitor = Monitor::new()
538        .register(transaction_request_queue_worker)
539        .register(transaction_submission_queue_worker)
540        .register(transaction_status_queue_worker)
541        .register(transaction_status_queue_worker_evm)
542        .register(transaction_status_queue_worker_stellar)
543        .register(notification_queue_worker)
544        .register(token_swap_request_queue_worker)
545        .register(transaction_cleanup_queue_worker)
546        .register(system_cleanup_queue_worker)
547        .register(relayer_health_check_worker)
548        .on_event(monitor_handle_event)
549        .shutdown_timeout(Duration::from_millis(5000));
550
551    let monitor_future = monitor.run_with_signal(async {
552        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
553            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
554        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
555            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
556
557        debug!("Workers monitor started");
558
559        tokio::select! {
560            _ = sigint.recv() => debug!("Received SIGINT."),
561            _ = sigterm.recv() => debug!("Received SIGTERM."),
562        };
563
564        debug!("Workers monitor shutting down");
565
566        Ok(())
567    });
568    tokio::spawn(async move {
569        if let Err(e) = monitor_future.await {
570            error!(error = %e, "monitor error");
571        }
572    });
573    debug!("Workers monitor shutdown complete");
574
575    Ok(())
576}
577
578/// Initializes swap workers for Solana and Stellar relayers.
579/// This function creates and registers workers for relayers that have swap enabled and cron schedule set.
580pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
581    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
582) -> Result<()>
583where
584    J: JobProducerTrait + Send + Sync + 'static,
585    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
586    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
587    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
588    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
589    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
590    TCR: TransactionCounterTrait + Send + Sync + 'static,
591    PR: PluginRepositoryTrait + Send + Sync + 'static,
592    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
593{
594    let active_relayers = app_state.relayer_repository.list_active().await?;
595    let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
596
597    if relayers_with_swap_enabled.is_empty() {
598        debug!("No relayers with swap enabled");
599        return Ok(());
600    }
601    info!(
602        "Found {} relayers with swap enabled",
603        relayers_with_swap_enabled.len()
604    );
605
606    let mut workers = Vec::new();
607
608    let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
609
610    for relayer in relayers_with_swap_enabled {
611        debug!(relayer = ?relayer, "found relayer with swap enabled");
612
613        let (cron_schedule, network_type) = match &relayer.policies {
614            RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
615                Some(config) => match config.cron_schedule {
616                    Some(schedule) => (schedule, "solana".to_string()),
617                    None => {
618                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
619                        continue;
620                    }
621                },
622                None => {
623                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
624                    continue;
625                }
626            },
627            RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
628                Some(config) => match config.cron_schedule {
629                    Some(schedule) => (schedule, "stellar".to_string()),
630                    None => {
631                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
632                        continue;
633                    }
634                },
635                None => {
636                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
637                    continue;
638                }
639            },
640            RelayerNetworkPolicy::Evm(_) => {
641                debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
642                continue;
643            }
644        };
645
646        let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
647            Ok(schedule) => schedule,
648            Err(e) => {
649                error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
650                continue;
651            }
652        };
653
654        // Create worker and add to the workers vector
655        let worker = WorkerBuilder::new(format!(
656            "{}-swap-schedule-{}",
657            network_type,
658            relayer.id.clone()
659        ))
660        .layer(ErrorHandlingLayer::new())
661        .enable_tracing()
662        .catch_panic()
663        .retry(
664            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
665                .with_backoff(swap_backoff.clone()),
666        )
667        .concurrency(1)
668        .data(relayer.id.clone())
669        .data(app_state.clone())
670        .backend(CronStream::new(calendar_schedule))
671        .build_fn(apalis_token_swap_cron_handler);
672
673        workers.push(worker);
674        debug!(
675            relayer_id = %relayer.id,
676            network_type = %network_type,
677            "Created worker for relayer with swap enabled"
678        );
679    }
680
681    let mut monitor = Monitor::new()
682        .on_event(monitor_handle_event)
683        .shutdown_timeout(Duration::from_millis(5000));
684
685    // Register all workers with the monitor
686    for worker in workers {
687        monitor = monitor.register(worker);
688    }
689
690    let monitor_future = monitor.run_with_signal(async {
691        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
692            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
693        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
694            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
695
696        debug!("Swap Monitor started");
697
698        tokio::select! {
699            _ = sigint.recv() => debug!("Received SIGINT."),
700            _ = sigterm.recv() => debug!("Received SIGTERM."),
701        };
702
703        debug!("Swap Monitor shutting down");
704
705        Ok(())
706    });
707    tokio::spawn(async move {
708        if let Err(e) = monitor_future.await {
709            error!(error = %e, "monitor error");
710        }
711    });
712    Ok(())
713}
714
715fn monitor_handle_event(e: Worker<Event>) {
716    let worker_id = e.id();
717    match e.inner() {
718        Event::Engage(task_id) => {
719            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
720        }
721        Event::Error(e) => {
722            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
723        }
724        Event::Exit => {
725            debug!(worker_id = %worker_id, "worker exited");
726        }
727        Event::Idle => {
728            debug!(worker_id = %worker_id, "worker is idle");
729        }
730        Event::Start => {
731            debug!(worker_id = %worker_id, "worker started");
732        }
733        Event::Stop => {
734            debug!(worker_id = %worker_id, "worker stopped");
735        }
736        _ => {}
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743    use crate::queues::retry_config::{
744        NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
745        STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
746        TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
747    };
748
749    // ── create_backoff tests ───────────────────────────────────────────
750
751    #[test]
752    fn test_create_backoff_with_valid_parameters() {
753        let result = create_backoff(200, 5000, 0.99);
754        assert!(
755            result.is_ok(),
756            "Should create backoff with valid parameters"
757        );
758    }
759
760    #[test]
761    fn test_create_backoff_with_zero_initial() {
762        let result = create_backoff(0, 5000, 0.99);
763        assert!(
764            result.is_ok(),
765            "Should handle zero initial delay (edge case)"
766        );
767    }
768
769    #[test]
770    fn test_create_backoff_with_equal_initial_and_max() {
771        let result = create_backoff(1000, 1000, 0.5);
772        assert!(result.is_ok(), "Should handle equal initial and max delays");
773    }
774
775    #[test]
776    fn test_create_backoff_with_zero_jitter() {
777        let result = create_backoff(500, 5000, 0.0);
778        assert!(result.is_ok(), "Should handle zero jitter");
779    }
780
781    #[test]
782    fn test_create_backoff_with_max_jitter() {
783        let result = create_backoff(500, 5000, 1.0);
784        assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
785    }
786
787    #[test]
788    fn test_create_backoff_with_small_values() {
789        let result = create_backoff(1, 10, 0.5);
790        assert!(result.is_ok(), "Should handle very small delay values");
791    }
792
793    #[test]
794    fn test_create_backoff_with_large_values() {
795        let result = create_backoff(10000, 60000, 0.99);
796        assert!(result.is_ok(), "Should handle large delay values");
797    }
798
799    #[test]
800    fn test_create_backoff_from_config_profiles() {
801        let profiles = [
802            TX_REQUEST_BACKOFF,
803            TX_SUBMISSION_BACKOFF,
804            STATUS_GENERIC_BACKOFF,
805            STATUS_EVM_BACKOFF,
806            STATUS_STELLAR_BACKOFF,
807            NOTIFICATION_BACKOFF,
808            TOKEN_SWAP_REQUEST_BACKOFF,
809            TX_CLEANUP_BACKOFF,
810            SYSTEM_CLEANUP_BACKOFF,
811            RELAYER_HEALTH_BACKOFF,
812            TOKEN_SWAP_CRON_BACKOFF,
813        ];
814
815        for cfg in profiles {
816            let result = create_backoff_from_config(cfg);
817            assert!(
818                result.is_ok(),
819                "backoff profile should be constructible: {:?}",
820                cfg
821            );
822        }
823    }
824
825    #[test]
826    fn test_create_backoff_from_config_produces_usable_backoff() {
827        let profiles = [
828            TX_REQUEST_BACKOFF,
829            TX_SUBMISSION_BACKOFF,
830            STATUS_GENERIC_BACKOFF,
831            STATUS_EVM_BACKOFF,
832            STATUS_STELLAR_BACKOFF,
833            NOTIFICATION_BACKOFF,
834            TOKEN_SWAP_REQUEST_BACKOFF,
835            TX_CLEANUP_BACKOFF,
836            SYSTEM_CLEANUP_BACKOFF,
837            RELAYER_HEALTH_BACKOFF,
838            TOKEN_SWAP_CRON_BACKOFF,
839        ];
840
841        for cfg in profiles {
842            let mut maker = create_backoff_from_config(cfg).unwrap();
843            // Calling make_backoff() should not panic
844            let _backoff = maker.make_backoff();
845        }
846    }
847
848    #[test]
849    fn test_create_backoff_with_initial_greater_than_max_errors() {
850        let result = create_backoff(10000, 100, 0.5);
851        assert!(
852            result.is_err(),
853            "initial > max should be rejected by ExponentialBackoffMaker"
854        );
855    }
856
857    // ── Backoff config invariant tests ─────────────────────────────────
858
859    #[test]
860    fn test_all_backoff_configs_have_valid_initial_le_max() {
861        let profiles: &[(&str, RetryBackoffConfig)] = &[
862            ("TX_REQUEST", TX_REQUEST_BACKOFF),
863            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
864            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
865            ("STATUS_EVM", STATUS_EVM_BACKOFF),
866            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
867            ("NOTIFICATION", NOTIFICATION_BACKOFF),
868            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
869            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
870            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
871            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
872            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
873        ];
874
875        for (name, cfg) in profiles {
876            assert!(
877                cfg.initial_ms <= cfg.max_ms,
878                "{name}: initial_ms ({}) must be <= max_ms ({})",
879                cfg.initial_ms,
880                cfg.max_ms
881            );
882        }
883    }
884
885    #[test]
886    fn test_all_backoff_configs_have_valid_jitter_range() {
887        let profiles: &[(&str, RetryBackoffConfig)] = &[
888            ("TX_REQUEST", TX_REQUEST_BACKOFF),
889            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
890            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
891            ("STATUS_EVM", STATUS_EVM_BACKOFF),
892            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
893            ("NOTIFICATION", NOTIFICATION_BACKOFF),
894            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
895            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
896            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
897            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
898            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
899        ];
900
901        for (name, cfg) in profiles {
902            assert!(
903                (0.0..=1.0).contains(&cfg.jitter),
904                "{name}: jitter ({}) must be in [0.0, 1.0]",
905                cfg.jitter
906            );
907        }
908    }
909
910    #[test]
911    fn test_all_backoff_configs_have_positive_initial_ms() {
912        let profiles: &[(&str, RetryBackoffConfig)] = &[
913            ("TX_REQUEST", TX_REQUEST_BACKOFF),
914            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
915            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
916            ("STATUS_EVM", STATUS_EVM_BACKOFF),
917            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
918            ("NOTIFICATION", NOTIFICATION_BACKOFF),
919            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
920            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
921            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
922            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
923            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
924        ];
925
926        for (name, cfg) in profiles {
927            assert!(
928                cfg.initial_ms > 0,
929                "{name}: initial_ms must be positive, got {}",
930                cfg.initial_ms
931            );
932        }
933    }
934
935    // ── Worker name constant tests ─────────────────────────────────────
936
937    #[test]
938    fn test_worker_name_constants_are_nonempty() {
939        let names = [
940            TRANSACTION_REQUEST,
941            TRANSACTION_SENDER,
942            TRANSACTION_STATUS_CHECKER,
943            TRANSACTION_STATUS_CHECKER_EVM,
944            TRANSACTION_STATUS_CHECKER_STELLAR,
945            NOTIFICATION_SENDER,
946            TOKEN_SWAP_REQUEST,
947            TRANSACTION_CLEANUP,
948            RELAYER_HEALTH_CHECK,
949            SYSTEM_CLEANUP,
950        ];
951
952        for name in &names {
953            assert!(!name.is_empty(), "Worker name constant must not be empty");
954        }
955    }
956
957    #[test]
958    fn test_worker_name_constants_are_unique() {
959        let names = [
960            TRANSACTION_REQUEST,
961            TRANSACTION_SENDER,
962            TRANSACTION_STATUS_CHECKER,
963            TRANSACTION_STATUS_CHECKER_EVM,
964            TRANSACTION_STATUS_CHECKER_STELLAR,
965            NOTIFICATION_SENDER,
966            TOKEN_SWAP_REQUEST,
967            TRANSACTION_CLEANUP,
968            RELAYER_HEALTH_CHECK,
969            SYSTEM_CLEANUP,
970        ];
971
972        for (i, a) in names.iter().enumerate() {
973            for (j, b) in names.iter().enumerate() {
974                if i != j {
975                    assert_ne!(
976                        a, b,
977                        "Worker names must be unique: '{}' at index {} and {}",
978                        a, i, j
979                    );
980                }
981            }
982        }
983    }
984
985    #[test]
986    fn test_worker_names_match_concurrency_env_keys() {
987        // The WorkerBuilder name for each queue-type-backed worker should match
988        // the concurrency_env_key used with ServerConfig::get_worker_concurrency,
989        // so that concurrency configuration picks up the correct env var.
990        assert_eq!(
991            TRANSACTION_REQUEST,
992            QueueType::TransactionRequest.concurrency_env_key()
993        );
994        assert_eq!(
995            TRANSACTION_SENDER,
996            QueueType::TransactionSubmission.concurrency_env_key()
997        );
998        assert_eq!(
999            TRANSACTION_STATUS_CHECKER,
1000            QueueType::StatusCheck.concurrency_env_key()
1001        );
1002        assert_eq!(
1003            TRANSACTION_STATUS_CHECKER_EVM,
1004            QueueType::StatusCheckEvm.concurrency_env_key()
1005        );
1006        assert_eq!(
1007            TRANSACTION_STATUS_CHECKER_STELLAR,
1008            QueueType::StatusCheckStellar.concurrency_env_key()
1009        );
1010        assert_eq!(
1011            NOTIFICATION_SENDER,
1012            QueueType::Notification.concurrency_env_key()
1013        );
1014        assert_eq!(
1015            TOKEN_SWAP_REQUEST,
1016            QueueType::TokenSwapRequest.concurrency_env_key()
1017        );
1018        assert_eq!(
1019            RELAYER_HEALTH_CHECK,
1020            QueueType::RelayerHealthCheck.concurrency_env_key()
1021        );
1022    }
1023
1024    // ── monitor_handle_event tests ─────────────────────────────────────
1025
1026    fn make_worker_event(event: Event) -> Worker<Event> {
1027        let worker_id = WorkerId::from_str("test-worker").unwrap();
1028        Worker::new(worker_id, event)
1029    }
1030
1031    #[test]
1032    fn test_monitor_handle_event_start_does_not_panic() {
1033        monitor_handle_event(make_worker_event(Event::Start));
1034    }
1035
1036    #[test]
1037    fn test_monitor_handle_event_engage_does_not_panic() {
1038        let task_id = TaskId::new();
1039        monitor_handle_event(make_worker_event(Event::Engage(task_id)));
1040    }
1041
1042    #[test]
1043    fn test_monitor_handle_event_idle_does_not_panic() {
1044        monitor_handle_event(make_worker_event(Event::Idle));
1045    }
1046
1047    #[test]
1048    fn test_monitor_handle_event_error_does_not_panic() {
1049        let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
1050        monitor_handle_event(make_worker_event(Event::Error(error)));
1051    }
1052
1053    #[test]
1054    fn test_monitor_handle_event_stop_does_not_panic() {
1055        monitor_handle_event(make_worker_event(Event::Stop));
1056    }
1057
1058    #[test]
1059    fn test_monitor_handle_event_exit_does_not_panic() {
1060        monitor_handle_event(make_worker_event(Event::Exit));
1061    }
1062
1063    #[test]
1064    fn test_monitor_handle_event_custom_does_not_panic() {
1065        monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
1066    }
1067
1068    // ── observe_redis_pickup_latency tests ─────────────────────────────
1069
1070    fn pickup_sample_count(queue_type: &str) -> u64 {
1071        crate::metrics::QUEUE_PICKUP_LATENCY
1072            .with_label_values(&[queue_type, "redis"])
1073            .get_sample_count()
1074    }
1075
1076    #[test]
1077    fn test_observe_redis_pickup_latency_records_on_first_attempt() {
1078        let queue = "test-pickup-first-attempt";
1079        let before = pickup_sample_count(queue);
1080
1081        let ts = chrono::Utc::now().timestamp().to_string();
1082        observe_redis_pickup_latency(1, None, &ts, queue);
1083
1084        assert_eq!(pickup_sample_count(queue), before + 1);
1085    }
1086
1087    #[test]
1088    fn test_observe_redis_pickup_latency_skips_retry_attempts() {
1089        let queue = "test-pickup-skip-retry";
1090        let before = pickup_sample_count(queue);
1091
1092        let ts = chrono::Utc::now().timestamp().to_string();
1093        observe_redis_pickup_latency(2, None, &ts, queue);
1094        observe_redis_pickup_latency(99, None, &ts, queue);
1095
1096        assert_eq!(pickup_sample_count(queue), before);
1097    }
1098
1099    #[test]
1100    fn test_observe_redis_pickup_latency_prefers_available_at_over_timestamp() {
1101        // available_at is "now" so latency should be ~0; job_timestamp is far in the
1102        // past, so if the fallback were used we'd see a large value. We verify the
1103        // preference indirectly via the histogram sum delta.
1104        let queue = "test-pickup-prefers-available-at";
1105        let histogram = crate::metrics::QUEUE_PICKUP_LATENCY.with_label_values(&[queue, "redis"]);
1106        let sum_before = histogram.get_sample_sum();
1107
1108        let now = chrono::Utc::now().timestamp();
1109        let available_at = now.to_string();
1110        let stale_timestamp = (now - 3600).to_string();
1111
1112        observe_redis_pickup_latency(1, Some(&available_at), &stale_timestamp, queue);
1113
1114        let delta = histogram.get_sample_sum() - sum_before;
1115        assert!(
1116            delta < 5.0,
1117            "expected near-zero latency when available_at is now, got {delta}"
1118        );
1119    }
1120
1121    #[test]
1122    fn test_observe_redis_pickup_latency_falls_back_to_timestamp_when_available_at_absent() {
1123        let queue = "test-pickup-fallback-timestamp";
1124        let before = pickup_sample_count(queue);
1125
1126        let ts = chrono::Utc::now().timestamp().to_string();
1127        observe_redis_pickup_latency(1, None, &ts, queue);
1128
1129        assert_eq!(pickup_sample_count(queue), before + 1);
1130    }
1131
1132    #[test]
1133    fn test_observe_redis_pickup_latency_clamps_negative_skew_to_zero() {
1134        // baseline in the future (producer clock ahead of consumer) → delta negative
1135        // → clamped to 0, but the observation still records.
1136        let queue = "test-pickup-clamps-negative";
1137        let histogram = crate::metrics::QUEUE_PICKUP_LATENCY.with_label_values(&[queue, "redis"]);
1138        let sum_before = histogram.get_sample_sum();
1139        let count_before = histogram.get_sample_count();
1140
1141        let future_ts = (chrono::Utc::now().timestamp() + 3600).to_string();
1142        observe_redis_pickup_latency(1, None, &future_ts, queue);
1143
1144        assert_eq!(histogram.get_sample_count(), count_before + 1);
1145        let delta_sum = histogram.get_sample_sum() - sum_before;
1146        assert!(
1147            delta_sum.abs() < f64::EPSILON,
1148            "expected 0 latency for future baseline, got {delta_sum}"
1149        );
1150    }
1151
1152    #[test]
1153    fn test_observe_redis_pickup_latency_skips_when_baseline_unparsable() {
1154        let queue = "test-pickup-unparsable";
1155        let before = pickup_sample_count(queue);
1156
1157        observe_redis_pickup_latency(1, None, "not-a-number", queue);
1158
1159        assert_eq!(pickup_sample_count(queue), before);
1160    }
1161}