1use actix_web::web::ThinData;
8
9use crate::{
10 config::ServerConfig,
11 constants::{
12 SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13 WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14 WORKER_TRANSACTION_CLEANUP_RETRIES,
15 },
16 jobs::{
17 notification_handler, relayer_health_check_handler, system_cleanup_handler,
18 token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20 Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21 TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22 TransactionRequest, TransactionSend, TransactionStatusCheck,
23 },
24 models::{
25 DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26 RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27 },
28 repositories::{
29 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30 Repository, TransactionCounterTrait, TransactionRepository,
31 },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use crate::metrics::observe_queue_pickup_latency;
49
50use super::{filter_relayers_for_swap, QueueType, WorkerContext};
51use crate::queues::retry_config::{
52 RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
53 STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
54 TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
55 TX_SUBMISSION_BACKOFF,
56};
57
58const PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS: i64 = 60 * 60 * 1000;
75
76fn observe_redis_pickup_latency(
87 attempt: usize,
88 available_at: Option<&String>,
89 job_timestamp: &str,
90 queue_type: &str,
91) {
92 if attempt != 1 {
93 return;
94 }
95 let baseline_epoch_secs = available_at
96 .and_then(|s| s.parse::<i64>().ok())
97 .or_else(|| job_timestamp.parse::<i64>().ok());
98 let Some(baseline_epoch_secs) = baseline_epoch_secs else {
99 tracing::warn!(
100 queue_type = queue_type,
101 available_at = ?available_at,
102 job_timestamp = %job_timestamp,
103 "skipping queue_pickup_latency: failed to parse both available_at and job_timestamp"
104 );
105 return;
106 };
107 let now_ms = chrono::Utc::now().timestamp_millis();
108 let delta_ms = now_ms - baseline_epoch_secs * 1000;
109 if available_at.is_none() && delta_ms > PICKUP_LATENCY_CLOCK_SKEW_THRESHOLD_MS {
110 tracing::warn!(
111 queue_type = queue_type,
112 latency_ms = delta_ms,
113 "queue_pickup_latency above sanity threshold for non-scheduled job; check producer/consumer clock skew"
114 );
115 }
116 let latency_secs = delta_ms.max(0) as f64 / 1000.0;
117 observe_queue_pickup_latency(queue_type, "redis", latency_secs);
118}
119
120async fn apalis_transaction_request_handler(
121 job: Job<TransactionRequest>,
122 state: Data<ThinData<DefaultAppState>>,
123 attempt: Attempt,
124 task_id: TaskId,
125) -> Result<(), apalis::prelude::Error> {
126 observe_redis_pickup_latency(
127 attempt.current(),
128 job.available_at.as_ref(),
129 &job.timestamp,
130 QueueType::TransactionRequest.queue_name(),
131 );
132 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
133 transaction_request_handler(job, (*state).clone(), ctx)
134 .await
135 .map_err(Into::into)
136}
137
138async fn apalis_transaction_submission_handler(
139 job: Job<TransactionSend>,
140 state: Data<ThinData<DefaultAppState>>,
141 attempt: Attempt,
142 task_id: TaskId,
143) -> Result<(), apalis::prelude::Error> {
144 observe_redis_pickup_latency(
145 attempt.current(),
146 job.available_at.as_ref(),
147 &job.timestamp,
148 QueueType::TransactionSubmission.queue_name(),
149 );
150 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
151 transaction_submission_handler(job, (*state).clone(), ctx)
152 .await
153 .map_err(Into::into)
154}
155
156async fn apalis_transaction_status_handler(
157 job: Job<TransactionStatusCheck>,
158 state: Data<ThinData<DefaultAppState>>,
159 attempt: Attempt,
160 task_id: TaskId,
161) -> Result<(), apalis::prelude::Error> {
162 observe_redis_pickup_latency(
163 attempt.current(),
164 job.available_at.as_ref(),
165 &job.timestamp,
166 QueueType::StatusCheck.queue_name(),
167 );
168 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
169 transaction_status_handler(job, (*state).clone(), ctx)
170 .await
171 .map_err(Into::into)
172}
173
174async fn apalis_transaction_status_evm_handler(
175 job: Job<TransactionStatusCheck>,
176 state: Data<ThinData<DefaultAppState>>,
177 attempt: Attempt,
178 task_id: TaskId,
179) -> Result<(), apalis::prelude::Error> {
180 observe_redis_pickup_latency(
181 attempt.current(),
182 job.available_at.as_ref(),
183 &job.timestamp,
184 QueueType::StatusCheckEvm.queue_name(),
185 );
186 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
187 transaction_status_handler(job, (*state).clone(), ctx)
188 .await
189 .map_err(Into::into)
190}
191
192async fn apalis_transaction_status_stellar_handler(
193 job: Job<TransactionStatusCheck>,
194 state: Data<ThinData<DefaultAppState>>,
195 attempt: Attempt,
196 task_id: TaskId,
197) -> Result<(), apalis::prelude::Error> {
198 observe_redis_pickup_latency(
199 attempt.current(),
200 job.available_at.as_ref(),
201 &job.timestamp,
202 QueueType::StatusCheckStellar.queue_name(),
203 );
204 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
205 transaction_status_handler(job, (*state).clone(), ctx)
206 .await
207 .map_err(Into::into)
208}
209
210async fn apalis_notification_handler(
211 job: Job<NotificationSend>,
212 state: Data<ThinData<DefaultAppState>>,
213 attempt: Attempt,
214 task_id: TaskId,
215) -> Result<(), apalis::prelude::Error> {
216 observe_redis_pickup_latency(
217 attempt.current(),
218 job.available_at.as_ref(),
219 &job.timestamp,
220 QueueType::Notification.queue_name(),
221 );
222 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
223 notification_handler(job, (*state).clone(), ctx)
224 .await
225 .map_err(Into::into)
226}
227
228async fn apalis_token_swap_request_handler(
229 job: Job<TokenSwapRequest>,
230 state: Data<ThinData<DefaultAppState>>,
231 attempt: Attempt,
232 task_id: TaskId,
233) -> Result<(), apalis::prelude::Error> {
234 observe_redis_pickup_latency(
235 attempt.current(),
236 job.available_at.as_ref(),
237 &job.timestamp,
238 QueueType::TokenSwapRequest.queue_name(),
239 );
240 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
241 token_swap_request_handler(job, (*state).clone(), ctx)
242 .await
243 .map_err(Into::into)
244}
245
246async fn apalis_relayer_health_check_handler(
247 job: Job<RelayerHealthCheck>,
248 state: Data<ThinData<DefaultAppState>>,
249 attempt: Attempt,
250 task_id: TaskId,
251) -> Result<(), apalis::prelude::Error> {
252 observe_redis_pickup_latency(
253 attempt.current(),
254 job.available_at.as_ref(),
255 &job.timestamp,
256 QueueType::RelayerHealthCheck.queue_name(),
257 );
258 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
259 relayer_health_check_handler(job, (*state).clone(), ctx)
260 .await
261 .map_err(Into::into)
262}
263
264async fn apalis_transaction_cleanup_handler(
265 _job: TransactionCleanupCronReminder,
266 state: Data<ThinData<DefaultAppState>>,
267 attempt: Attempt,
268 task_id: TaskId,
269) -> Result<(), apalis::prelude::Error> {
270 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
271 transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
272 .await
273 .map_err(Into::into)
274}
275
276async fn apalis_system_cleanup_handler(
277 _job: SystemCleanupCronReminder,
278 state: Data<ThinData<DefaultAppState>>,
279 attempt: Attempt,
280 task_id: TaskId,
281) -> Result<(), apalis::prelude::Error> {
282 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
283 system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
284 .await
285 .map_err(Into::into)
286}
287
288async fn apalis_token_swap_cron_handler(
289 _job: TokenSwapCronReminder,
290 relayer_id: Data<String>,
291 state: Data<ThinData<DefaultAppState>>,
292 attempt: Attempt,
293 task_id: TaskId,
294) -> Result<(), apalis::prelude::Error> {
295 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
296 token_swap_cron_handler(
297 TokenSwapCronReminder(),
298 (*relayer_id).clone(),
299 (*state).clone(),
300 ctx,
301 )
302 .await
303 .map_err(Into::into)
304}
305
306const TRANSACTION_REQUEST: &str = "transaction_request";
307const TRANSACTION_SENDER: &str = "transaction_sender";
308const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
310const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
312const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
313const NOTIFICATION_SENDER: &str = "notification_sender";
314const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
315const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
316const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
317const SYSTEM_CLEANUP: &str = "system_cleanup";
318
319fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
329 let maker = ExponentialBackoffMaker::new(
330 Duration::from_millis(initial_ms),
331 Duration::from_millis(max_ms),
332 jitter,
333 HasherRng::default(),
334 )?;
335
336 Ok(maker)
337}
338
339fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
340 create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
341}
342
343pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
348 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
349) -> Result<()>
350where
351 J: JobProducerTrait + Send + Sync + 'static,
352 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
353 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
354 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
355 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
356 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
357 TCR: TransactionCounterTrait + Send + Sync + 'static,
358 PR: PluginRepositoryTrait + Send + Sync + 'static,
359 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
360{
361 let queue_backend = app_state
362 .job_producer
363 .get_queue_backend()
364 .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
365 let queue = queue_backend
366 .queue()
367 .cloned()
368 .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
369
370 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
371 .layer(ErrorHandlingLayer::new())
372 .retry(
373 RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
374 .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
375 )
376 .enable_tracing()
377 .catch_panic()
378 .concurrency(ServerConfig::get_worker_concurrency(
379 QueueType::TransactionRequest.concurrency_env_key(),
380 QueueType::TransactionRequest.default_concurrency(),
381 ))
382 .data(app_state.clone())
383 .backend(queue.transaction_request_queue.clone())
384 .build_fn(apalis_transaction_request_handler);
385
386 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
387 .layer(ErrorHandlingLayer::new())
388 .enable_tracing()
389 .catch_panic()
390 .retry(
391 RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
392 .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
393 )
394 .concurrency(ServerConfig::get_worker_concurrency(
395 QueueType::TransactionSubmission.concurrency_env_key(),
396 QueueType::TransactionSubmission.default_concurrency(),
397 ))
398 .data(app_state.clone())
399 .backend(queue.transaction_submission_queue.clone())
400 .build_fn(apalis_transaction_submission_handler);
401
402 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
405 .layer(ErrorHandlingLayer::new())
406 .enable_tracing()
407 .catch_panic()
408 .retry(
409 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
410 .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
411 )
412 .concurrency(ServerConfig::get_worker_concurrency(
413 QueueType::StatusCheck.concurrency_env_key(),
414 QueueType::StatusCheck.default_concurrency(),
415 ))
416 .data(app_state.clone())
417 .backend(queue.transaction_status_queue.clone())
418 .build_fn(apalis_transaction_status_handler);
419
420 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
423 .layer(ErrorHandlingLayer::new())
424 .enable_tracing()
425 .catch_panic()
426 .retry(
427 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
428 .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
429 )
430 .concurrency(ServerConfig::get_worker_concurrency(
431 QueueType::StatusCheckEvm.concurrency_env_key(),
432 QueueType::StatusCheckEvm.default_concurrency(),
433 ))
434 .data(app_state.clone())
435 .backend(queue.transaction_status_queue_evm.clone())
436 .build_fn(apalis_transaction_status_evm_handler);
437
438 let transaction_status_queue_worker_stellar =
441 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
442 .layer(ErrorHandlingLayer::new())
443 .enable_tracing()
444 .catch_panic()
445 .retry(
446 RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
447 create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
448 ),
449 )
450 .concurrency(ServerConfig::get_worker_concurrency(
451 QueueType::StatusCheckStellar.concurrency_env_key(),
452 QueueType::StatusCheckStellar.default_concurrency(),
453 ))
454 .data(app_state.clone())
455 .backend(queue.transaction_status_queue_stellar.clone())
456 .build_fn(apalis_transaction_status_stellar_handler);
457
458 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
459 .layer(ErrorHandlingLayer::new())
460 .enable_tracing()
461 .catch_panic()
462 .retry(
463 RetryPolicy::retries(QueueType::Notification.max_retries())
464 .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
465 )
466 .concurrency(ServerConfig::get_worker_concurrency(
467 QueueType::Notification.concurrency_env_key(),
468 QueueType::Notification.default_concurrency(),
469 ))
470 .data(app_state.clone())
471 .backend(queue.notification_queue.clone())
472 .build_fn(apalis_notification_handler);
473
474 let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
475 .layer(ErrorHandlingLayer::new())
476 .enable_tracing()
477 .catch_panic()
478 .retry(
479 RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
480 create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
481 ),
482 )
483 .concurrency(ServerConfig::get_worker_concurrency(
484 QueueType::TokenSwapRequest.concurrency_env_key(),
485 QueueType::TokenSwapRequest.default_concurrency(),
486 ))
487 .data(app_state.clone())
488 .backend(queue.token_swap_request_queue.clone())
489 .build_fn(apalis_token_swap_request_handler);
490
491 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
492 .layer(ErrorHandlingLayer::new())
493 .enable_tracing()
494 .catch_panic()
495 .retry(
496 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
497 .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
498 )
499 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
501 .backend(CronStream::new(
502 apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
503 ))
504 .build_fn(apalis_transaction_cleanup_handler);
505
506 let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
507 .layer(ErrorHandlingLayer::new())
508 .enable_tracing()
509 .catch_panic()
510 .retry(
511 RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
512 .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
513 )
514 .concurrency(1)
515 .data(app_state.clone())
516 .backend(CronStream::new(apalis_cron::Schedule::from_str(
517 SYSTEM_CLEANUP_CRON_SCHEDULE,
518 )?))
519 .build_fn(apalis_system_cleanup_handler);
520
521 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
522 .layer(ErrorHandlingLayer::new())
523 .enable_tracing()
524 .catch_panic()
525 .retry(
526 RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
527 .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
528 )
529 .concurrency(ServerConfig::get_worker_concurrency(
530 QueueType::RelayerHealthCheck.concurrency_env_key(),
531 QueueType::RelayerHealthCheck.default_concurrency(),
532 ))
533 .data(app_state.clone())
534 .backend(queue.relayer_health_check_queue.clone())
535 .build_fn(apalis_relayer_health_check_handler);
536
537 let monitor = Monitor::new()
538 .register(transaction_request_queue_worker)
539 .register(transaction_submission_queue_worker)
540 .register(transaction_status_queue_worker)
541 .register(transaction_status_queue_worker_evm)
542 .register(transaction_status_queue_worker_stellar)
543 .register(notification_queue_worker)
544 .register(token_swap_request_queue_worker)
545 .register(transaction_cleanup_queue_worker)
546 .register(system_cleanup_queue_worker)
547 .register(relayer_health_check_worker)
548 .on_event(monitor_handle_event)
549 .shutdown_timeout(Duration::from_millis(5000));
550
551 let monitor_future = monitor.run_with_signal(async {
552 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
553 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
554 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
555 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
556
557 debug!("Workers monitor started");
558
559 tokio::select! {
560 _ = sigint.recv() => debug!("Received SIGINT."),
561 _ = sigterm.recv() => debug!("Received SIGTERM."),
562 };
563
564 debug!("Workers monitor shutting down");
565
566 Ok(())
567 });
568 tokio::spawn(async move {
569 if let Err(e) = monitor_future.await {
570 error!(error = %e, "monitor error");
571 }
572 });
573 debug!("Workers monitor shutdown complete");
574
575 Ok(())
576}
577
578pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
581 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
582) -> Result<()>
583where
584 J: JobProducerTrait + Send + Sync + 'static,
585 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
586 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
587 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
588 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
589 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
590 TCR: TransactionCounterTrait + Send + Sync + 'static,
591 PR: PluginRepositoryTrait + Send + Sync + 'static,
592 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
593{
594 let active_relayers = app_state.relayer_repository.list_active().await?;
595 let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
596
597 if relayers_with_swap_enabled.is_empty() {
598 debug!("No relayers with swap enabled");
599 return Ok(());
600 }
601 info!(
602 "Found {} relayers with swap enabled",
603 relayers_with_swap_enabled.len()
604 );
605
606 let mut workers = Vec::new();
607
608 let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
609
610 for relayer in relayers_with_swap_enabled {
611 debug!(relayer = ?relayer, "found relayer with swap enabled");
612
613 let (cron_schedule, network_type) = match &relayer.policies {
614 RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
615 Some(config) => match config.cron_schedule {
616 Some(schedule) => (schedule, "solana".to_string()),
617 None => {
618 debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
619 continue;
620 }
621 },
622 None => {
623 debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
624 continue;
625 }
626 },
627 RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
628 Some(config) => match config.cron_schedule {
629 Some(schedule) => (schedule, "stellar".to_string()),
630 None => {
631 debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
632 continue;
633 }
634 },
635 None => {
636 debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
637 continue;
638 }
639 },
640 RelayerNetworkPolicy::Evm(_) => {
641 debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
642 continue;
643 }
644 };
645
646 let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
647 Ok(schedule) => schedule,
648 Err(e) => {
649 error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
650 continue;
651 }
652 };
653
654 let worker = WorkerBuilder::new(format!(
656 "{}-swap-schedule-{}",
657 network_type,
658 relayer.id.clone()
659 ))
660 .layer(ErrorHandlingLayer::new())
661 .enable_tracing()
662 .catch_panic()
663 .retry(
664 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
665 .with_backoff(swap_backoff.clone()),
666 )
667 .concurrency(1)
668 .data(relayer.id.clone())
669 .data(app_state.clone())
670 .backend(CronStream::new(calendar_schedule))
671 .build_fn(apalis_token_swap_cron_handler);
672
673 workers.push(worker);
674 debug!(
675 relayer_id = %relayer.id,
676 network_type = %network_type,
677 "Created worker for relayer with swap enabled"
678 );
679 }
680
681 let mut monitor = Monitor::new()
682 .on_event(monitor_handle_event)
683 .shutdown_timeout(Duration::from_millis(5000));
684
685 for worker in workers {
687 monitor = monitor.register(worker);
688 }
689
690 let monitor_future = monitor.run_with_signal(async {
691 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
692 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
693 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
694 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
695
696 debug!("Swap Monitor started");
697
698 tokio::select! {
699 _ = sigint.recv() => debug!("Received SIGINT."),
700 _ = sigterm.recv() => debug!("Received SIGTERM."),
701 };
702
703 debug!("Swap Monitor shutting down");
704
705 Ok(())
706 });
707 tokio::spawn(async move {
708 if let Err(e) = monitor_future.await {
709 error!(error = %e, "monitor error");
710 }
711 });
712 Ok(())
713}
714
715fn monitor_handle_event(e: Worker<Event>) {
716 let worker_id = e.id();
717 match e.inner() {
718 Event::Engage(task_id) => {
719 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
720 }
721 Event::Error(e) => {
722 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
723 }
724 Event::Exit => {
725 debug!(worker_id = %worker_id, "worker exited");
726 }
727 Event::Idle => {
728 debug!(worker_id = %worker_id, "worker is idle");
729 }
730 Event::Start => {
731 debug!(worker_id = %worker_id, "worker started");
732 }
733 Event::Stop => {
734 debug!(worker_id = %worker_id, "worker stopped");
735 }
736 _ => {}
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use super::*;
743 use crate::queues::retry_config::{
744 NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
745 STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
746 TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
747 };
748
749 #[test]
752 fn test_create_backoff_with_valid_parameters() {
753 let result = create_backoff(200, 5000, 0.99);
754 assert!(
755 result.is_ok(),
756 "Should create backoff with valid parameters"
757 );
758 }
759
760 #[test]
761 fn test_create_backoff_with_zero_initial() {
762 let result = create_backoff(0, 5000, 0.99);
763 assert!(
764 result.is_ok(),
765 "Should handle zero initial delay (edge case)"
766 );
767 }
768
769 #[test]
770 fn test_create_backoff_with_equal_initial_and_max() {
771 let result = create_backoff(1000, 1000, 0.5);
772 assert!(result.is_ok(), "Should handle equal initial and max delays");
773 }
774
775 #[test]
776 fn test_create_backoff_with_zero_jitter() {
777 let result = create_backoff(500, 5000, 0.0);
778 assert!(result.is_ok(), "Should handle zero jitter");
779 }
780
781 #[test]
782 fn test_create_backoff_with_max_jitter() {
783 let result = create_backoff(500, 5000, 1.0);
784 assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
785 }
786
787 #[test]
788 fn test_create_backoff_with_small_values() {
789 let result = create_backoff(1, 10, 0.5);
790 assert!(result.is_ok(), "Should handle very small delay values");
791 }
792
793 #[test]
794 fn test_create_backoff_with_large_values() {
795 let result = create_backoff(10000, 60000, 0.99);
796 assert!(result.is_ok(), "Should handle large delay values");
797 }
798
799 #[test]
800 fn test_create_backoff_from_config_profiles() {
801 let profiles = [
802 TX_REQUEST_BACKOFF,
803 TX_SUBMISSION_BACKOFF,
804 STATUS_GENERIC_BACKOFF,
805 STATUS_EVM_BACKOFF,
806 STATUS_STELLAR_BACKOFF,
807 NOTIFICATION_BACKOFF,
808 TOKEN_SWAP_REQUEST_BACKOFF,
809 TX_CLEANUP_BACKOFF,
810 SYSTEM_CLEANUP_BACKOFF,
811 RELAYER_HEALTH_BACKOFF,
812 TOKEN_SWAP_CRON_BACKOFF,
813 ];
814
815 for cfg in profiles {
816 let result = create_backoff_from_config(cfg);
817 assert!(
818 result.is_ok(),
819 "backoff profile should be constructible: {:?}",
820 cfg
821 );
822 }
823 }
824
825 #[test]
826 fn test_create_backoff_from_config_produces_usable_backoff() {
827 let profiles = [
828 TX_REQUEST_BACKOFF,
829 TX_SUBMISSION_BACKOFF,
830 STATUS_GENERIC_BACKOFF,
831 STATUS_EVM_BACKOFF,
832 STATUS_STELLAR_BACKOFF,
833 NOTIFICATION_BACKOFF,
834 TOKEN_SWAP_REQUEST_BACKOFF,
835 TX_CLEANUP_BACKOFF,
836 SYSTEM_CLEANUP_BACKOFF,
837 RELAYER_HEALTH_BACKOFF,
838 TOKEN_SWAP_CRON_BACKOFF,
839 ];
840
841 for cfg in profiles {
842 let mut maker = create_backoff_from_config(cfg).unwrap();
843 let _backoff = maker.make_backoff();
845 }
846 }
847
848 #[test]
849 fn test_create_backoff_with_initial_greater_than_max_errors() {
850 let result = create_backoff(10000, 100, 0.5);
851 assert!(
852 result.is_err(),
853 "initial > max should be rejected by ExponentialBackoffMaker"
854 );
855 }
856
857 #[test]
860 fn test_all_backoff_configs_have_valid_initial_le_max() {
861 let profiles: &[(&str, RetryBackoffConfig)] = &[
862 ("TX_REQUEST", TX_REQUEST_BACKOFF),
863 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
864 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
865 ("STATUS_EVM", STATUS_EVM_BACKOFF),
866 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
867 ("NOTIFICATION", NOTIFICATION_BACKOFF),
868 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
869 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
870 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
871 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
872 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
873 ];
874
875 for (name, cfg) in profiles {
876 assert!(
877 cfg.initial_ms <= cfg.max_ms,
878 "{name}: initial_ms ({}) must be <= max_ms ({})",
879 cfg.initial_ms,
880 cfg.max_ms
881 );
882 }
883 }
884
885 #[test]
886 fn test_all_backoff_configs_have_valid_jitter_range() {
887 let profiles: &[(&str, RetryBackoffConfig)] = &[
888 ("TX_REQUEST", TX_REQUEST_BACKOFF),
889 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
890 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
891 ("STATUS_EVM", STATUS_EVM_BACKOFF),
892 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
893 ("NOTIFICATION", NOTIFICATION_BACKOFF),
894 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
895 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
896 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
897 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
898 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
899 ];
900
901 for (name, cfg) in profiles {
902 assert!(
903 (0.0..=1.0).contains(&cfg.jitter),
904 "{name}: jitter ({}) must be in [0.0, 1.0]",
905 cfg.jitter
906 );
907 }
908 }
909
910 #[test]
911 fn test_all_backoff_configs_have_positive_initial_ms() {
912 let profiles: &[(&str, RetryBackoffConfig)] = &[
913 ("TX_REQUEST", TX_REQUEST_BACKOFF),
914 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
915 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
916 ("STATUS_EVM", STATUS_EVM_BACKOFF),
917 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
918 ("NOTIFICATION", NOTIFICATION_BACKOFF),
919 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
920 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
921 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
922 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
923 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
924 ];
925
926 for (name, cfg) in profiles {
927 assert!(
928 cfg.initial_ms > 0,
929 "{name}: initial_ms must be positive, got {}",
930 cfg.initial_ms
931 );
932 }
933 }
934
935 #[test]
938 fn test_worker_name_constants_are_nonempty() {
939 let names = [
940 TRANSACTION_REQUEST,
941 TRANSACTION_SENDER,
942 TRANSACTION_STATUS_CHECKER,
943 TRANSACTION_STATUS_CHECKER_EVM,
944 TRANSACTION_STATUS_CHECKER_STELLAR,
945 NOTIFICATION_SENDER,
946 TOKEN_SWAP_REQUEST,
947 TRANSACTION_CLEANUP,
948 RELAYER_HEALTH_CHECK,
949 SYSTEM_CLEANUP,
950 ];
951
952 for name in &names {
953 assert!(!name.is_empty(), "Worker name constant must not be empty");
954 }
955 }
956
957 #[test]
958 fn test_worker_name_constants_are_unique() {
959 let names = [
960 TRANSACTION_REQUEST,
961 TRANSACTION_SENDER,
962 TRANSACTION_STATUS_CHECKER,
963 TRANSACTION_STATUS_CHECKER_EVM,
964 TRANSACTION_STATUS_CHECKER_STELLAR,
965 NOTIFICATION_SENDER,
966 TOKEN_SWAP_REQUEST,
967 TRANSACTION_CLEANUP,
968 RELAYER_HEALTH_CHECK,
969 SYSTEM_CLEANUP,
970 ];
971
972 for (i, a) in names.iter().enumerate() {
973 for (j, b) in names.iter().enumerate() {
974 if i != j {
975 assert_ne!(
976 a, b,
977 "Worker names must be unique: '{}' at index {} and {}",
978 a, i, j
979 );
980 }
981 }
982 }
983 }
984
985 #[test]
986 fn test_worker_names_match_concurrency_env_keys() {
987 assert_eq!(
991 TRANSACTION_REQUEST,
992 QueueType::TransactionRequest.concurrency_env_key()
993 );
994 assert_eq!(
995 TRANSACTION_SENDER,
996 QueueType::TransactionSubmission.concurrency_env_key()
997 );
998 assert_eq!(
999 TRANSACTION_STATUS_CHECKER,
1000 QueueType::StatusCheck.concurrency_env_key()
1001 );
1002 assert_eq!(
1003 TRANSACTION_STATUS_CHECKER_EVM,
1004 QueueType::StatusCheckEvm.concurrency_env_key()
1005 );
1006 assert_eq!(
1007 TRANSACTION_STATUS_CHECKER_STELLAR,
1008 QueueType::StatusCheckStellar.concurrency_env_key()
1009 );
1010 assert_eq!(
1011 NOTIFICATION_SENDER,
1012 QueueType::Notification.concurrency_env_key()
1013 );
1014 assert_eq!(
1015 TOKEN_SWAP_REQUEST,
1016 QueueType::TokenSwapRequest.concurrency_env_key()
1017 );
1018 assert_eq!(
1019 RELAYER_HEALTH_CHECK,
1020 QueueType::RelayerHealthCheck.concurrency_env_key()
1021 );
1022 }
1023
1024 fn make_worker_event(event: Event) -> Worker<Event> {
1027 let worker_id = WorkerId::from_str("test-worker").unwrap();
1028 Worker::new(worker_id, event)
1029 }
1030
1031 #[test]
1032 fn test_monitor_handle_event_start_does_not_panic() {
1033 monitor_handle_event(make_worker_event(Event::Start));
1034 }
1035
1036 #[test]
1037 fn test_monitor_handle_event_engage_does_not_panic() {
1038 let task_id = TaskId::new();
1039 monitor_handle_event(make_worker_event(Event::Engage(task_id)));
1040 }
1041
1042 #[test]
1043 fn test_monitor_handle_event_idle_does_not_panic() {
1044 monitor_handle_event(make_worker_event(Event::Idle));
1045 }
1046
1047 #[test]
1048 fn test_monitor_handle_event_error_does_not_panic() {
1049 let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
1050 monitor_handle_event(make_worker_event(Event::Error(error)));
1051 }
1052
1053 #[test]
1054 fn test_monitor_handle_event_stop_does_not_panic() {
1055 monitor_handle_event(make_worker_event(Event::Stop));
1056 }
1057
1058 #[test]
1059 fn test_monitor_handle_event_exit_does_not_panic() {
1060 monitor_handle_event(make_worker_event(Event::Exit));
1061 }
1062
1063 #[test]
1064 fn test_monitor_handle_event_custom_does_not_panic() {
1065 monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
1066 }
1067
1068 fn pickup_sample_count(queue_type: &str) -> u64 {
1071 crate::metrics::QUEUE_PICKUP_LATENCY
1072 .with_label_values(&[queue_type, "redis"])
1073 .get_sample_count()
1074 }
1075
1076 #[test]
1077 fn test_observe_redis_pickup_latency_records_on_first_attempt() {
1078 let queue = "test-pickup-first-attempt";
1079 let before = pickup_sample_count(queue);
1080
1081 let ts = chrono::Utc::now().timestamp().to_string();
1082 observe_redis_pickup_latency(1, None, &ts, queue);
1083
1084 assert_eq!(pickup_sample_count(queue), before + 1);
1085 }
1086
1087 #[test]
1088 fn test_observe_redis_pickup_latency_skips_retry_attempts() {
1089 let queue = "test-pickup-skip-retry";
1090 let before = pickup_sample_count(queue);
1091
1092 let ts = chrono::Utc::now().timestamp().to_string();
1093 observe_redis_pickup_latency(2, None, &ts, queue);
1094 observe_redis_pickup_latency(99, None, &ts, queue);
1095
1096 assert_eq!(pickup_sample_count(queue), before);
1097 }
1098
1099 #[test]
1100 fn test_observe_redis_pickup_latency_prefers_available_at_over_timestamp() {
1101 let queue = "test-pickup-prefers-available-at";
1105 let histogram = crate::metrics::QUEUE_PICKUP_LATENCY.with_label_values(&[queue, "redis"]);
1106 let sum_before = histogram.get_sample_sum();
1107
1108 let now = chrono::Utc::now().timestamp();
1109 let available_at = now.to_string();
1110 let stale_timestamp = (now - 3600).to_string();
1111
1112 observe_redis_pickup_latency(1, Some(&available_at), &stale_timestamp, queue);
1113
1114 let delta = histogram.get_sample_sum() - sum_before;
1115 assert!(
1116 delta < 5.0,
1117 "expected near-zero latency when available_at is now, got {delta}"
1118 );
1119 }
1120
1121 #[test]
1122 fn test_observe_redis_pickup_latency_falls_back_to_timestamp_when_available_at_absent() {
1123 let queue = "test-pickup-fallback-timestamp";
1124 let before = pickup_sample_count(queue);
1125
1126 let ts = chrono::Utc::now().timestamp().to_string();
1127 observe_redis_pickup_latency(1, None, &ts, queue);
1128
1129 assert_eq!(pickup_sample_count(queue), before + 1);
1130 }
1131
1132 #[test]
1133 fn test_observe_redis_pickup_latency_clamps_negative_skew_to_zero() {
1134 let queue = "test-pickup-clamps-negative";
1137 let histogram = crate::metrics::QUEUE_PICKUP_LATENCY.with_label_values(&[queue, "redis"]);
1138 let sum_before = histogram.get_sample_sum();
1139 let count_before = histogram.get_sample_count();
1140
1141 let future_ts = (chrono::Utc::now().timestamp() + 3600).to_string();
1142 observe_redis_pickup_latency(1, None, &future_ts, queue);
1143
1144 assert_eq!(histogram.get_sample_count(), count_before + 1);
1145 let delta_sum = histogram.get_sample_sum() - sum_before;
1146 assert!(
1147 delta_sum.abs() < f64::EPSILON,
1148 "expected 0 latency for future baseline, got {delta_sum}"
1149 );
1150 }
1151
1152 #[test]
1153 fn test_observe_redis_pickup_latency_skips_when_baseline_unparsable() {
1154 let queue = "test-pickup-unparsable";
1155 let before = pickup_sample_count(queue);
1156
1157 observe_redis_pickup_latency(1, None, "not-a-number", queue);
1158
1159 assert_eq!(pickup_sample_count(queue), before);
1160 }
1161}