1use crate::{
7 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
8 domain::{get_network_relayer, Relayer},
9 jobs::{handle_result, Job, JobProducerTrait, RelayerHealthCheck},
10 models::{
11 produce_relayer_enabled_payload, DefaultAppState, DisabledReason, NetworkRepoModel,
12 NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
13 TransactionRepoModel,
14 },
15 observability::request_id::set_request_id,
16 queues::{HandlerError, WorkerContext},
17 repositories::{
18 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
19 Repository, TransactionCounterTrait, TransactionRepository,
20 },
21 utils::calculate_scheduled_timestamp,
22};
23use actix_web::web::ThinData;
24use eyre::Result;
25use std::time::Duration;
26use tracing::{debug, info, instrument, warn};
27
28#[instrument(
53 level = "debug",
54 skip(job, app_state),
55 fields(
56 request_id = ?job.request_id,
57 job_id = %job.message_id,
58 job_type = %job.job_type.to_string(),
59 attempt = %ctx.attempt,
60 relayer_id = %job.data.relayer_id,
61 task_id = %ctx.task_id,
62 )
63)]
64pub async fn relayer_health_check_handler(
65 job: Job<RelayerHealthCheck>,
66 app_state: ThinData<DefaultAppState>,
67 ctx: WorkerContext,
68) -> Result<(), HandlerError> {
69 if let Some(request_id) = job.request_id.clone() {
70 set_request_id(request_id);
71 }
72
73 relayer_health_check_handler_impl(job, app_state, ctx).await
74}
75
76#[allow(clippy::type_complexity)]
78async fn relayer_health_check_handler_impl<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
79 job: Job<RelayerHealthCheck>,
80 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
81 ctx: WorkerContext,
82) -> Result<(), HandlerError>
83where
84 J: JobProducerTrait + Send + Sync + 'static,
85 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
86 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
87 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
88 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
89 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
90 TCR: TransactionCounterTrait + Send + Sync + 'static,
91 PR: PluginRepositoryTrait + Send + Sync + 'static,
92 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
93{
94 let result = check_and_reenable_relayer(job.data, &app_state).await;
95 handle_result(
96 result,
97 &ctx,
98 "relayer_health_check",
99 WORKER_DEFAULT_MAXIMUM_RETRIES,
100 )
101}
102
103async fn check_and_reenable_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
104 data: RelayerHealthCheck,
105 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
106) -> Result<()>
107where
108 J: JobProducerTrait + Send + Sync + 'static,
109 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
110 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
111 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
112 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
113 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
114 TCR: TransactionCounterTrait + Send + Sync + 'static,
115 PR: PluginRepositoryTrait + Send + Sync + 'static,
116 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
117{
118 let relayer_id = data.relayer_id.clone();
119
120 debug!(
121 relayer_id = %relayer_id,
122 retry_count = data.retry_count,
123 "Running health check"
124 );
125
126 if let Some(metadata) = &data.metadata {
130 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
131 .await
132 .map_err(|e| eyre::eyre!("Failed to get relayer for targeted action: {}", e))?;
133
134 match relayer_service.handle_health_action(metadata).await {
135 Ok(true) => return Ok(()),
136 Ok(false) => { }
137 Err(e) => return Err(eyre::eyre!("Targeted health action failed: {}", e)),
138 }
139 }
140
141 let relayer = app_state
143 .relayer_repository
144 .get_by_id(relayer_id.clone())
145 .await
146 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
147
148 if !relayer.system_disabled {
149 info!(
150 relayer_id = %relayer_id,
151 "Relayer is not disabled, skipping health check"
152 );
153 return Ok(());
154 }
155
156 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
158 .await
159 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
160
161 match relayer_service.check_health().await {
163 Ok(_) => {
164 info!(
166 relayer_id = %relayer_id,
167 retry_count = data.retry_count,
168 "Health checks passed, re-enabling relayer"
169 );
170
171 let enabled_relayer = app_state
173 .relayer_repository
174 .enable_relayer(relayer_id.clone())
175 .await
176 .map_err(|e| eyre::eyre!("Failed to enable relayer: {}", e))?;
177
178 if let Some(notification_id) = &enabled_relayer.notification_id {
180 app_state
181 .job_producer
182 .produce_send_notification_job(
183 produce_relayer_enabled_payload(
184 notification_id,
185 &enabled_relayer,
186 data.retry_count,
187 ),
188 None,
189 )
190 .await
191 .map_err(|e| eyre::eyre!("Failed to send notification: {}", e))?;
192
193 info!(
194 relayer_id = %relayer_id,
195 notification_id = %notification_id,
196 "Sent relayer recovery notification"
197 );
198 }
199
200 Ok(())
201 }
202 Err(failures) => {
203 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
205 DisabledReason::RpcValidationFailed("Unknown error".to_string())
206 });
207
208 warn!(
209 relayer_id = %relayer_id,
210 retry_count = data.retry_count,
211 reason = %reason,
212 "Health checks failed, scheduling retry"
213 );
214
215 let should_update = match &relayer.disabled_reason {
218 Some(old_reason) => !old_reason.same_variant(&reason),
219 None => true, };
221
222 if should_update {
223 debug!(
224 relayer_id = %relayer_id,
225 old_reason = ?relayer.disabled_reason,
226 new_reason = %reason,
227 "Disabled reason variant has changed, updating"
228 );
229
230 app_state
231 .relayer_repository
232 .disable_relayer(relayer_id.clone(), reason.clone())
233 .await
234 .map_err(|e| eyre::eyre!("Failed to update disabled reason: {}", e))?;
235 } else {
236 debug!(
237 relayer_id = %relayer_id,
238 reason = %reason,
239 "Disabled reason variant unchanged, skipping update"
240 );
241 }
242
243 let delay = calculate_backoff_delay(data.retry_count);
245
246 debug!(
247 relayer_id = %relayer_id,
248 next_retry = data.retry_count + 1,
249 delay_seconds = delay.as_secs(),
250 "Scheduling next health check attempt"
251 );
252
253 app_state
255 .job_producer
256 .produce_relayer_health_check_job(
257 RelayerHealthCheck::with_retry_count(relayer_id, data.retry_count + 1),
258 Some(calculate_scheduled_timestamp(delay.as_secs() as i64)),
259 )
260 .await
261 .map_err(|e| eyre::eyre!("Failed to schedule retry: {}", e))?;
262
263 Ok(())
264 }
265 }
266}
267
268fn calculate_backoff_delay(retry_count: u32) -> Duration {
277 let seconds = match retry_count {
278 0 => 10,
279 1 => 20,
280 2 => 30,
281 3 => 45,
282 _ => 60, };
284 Duration::from_secs(seconds)
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::models::{
291 DisabledReason, NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
292 };
293
294 #[test]
295 fn test_calculate_backoff_delay() {
296 assert_eq!(calculate_backoff_delay(0), Duration::from_secs(10)); assert_eq!(calculate_backoff_delay(1), Duration::from_secs(20)); assert_eq!(calculate_backoff_delay(2), Duration::from_secs(30)); assert_eq!(calculate_backoff_delay(3), Duration::from_secs(45)); assert_eq!(calculate_backoff_delay(4), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(10), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(100), Duration::from_secs(60)); }
304
305 #[test]
306 fn test_relayer_health_check_creation() {
307 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
308 assert_eq!(health_check.relayer_id, "test-relayer");
309 assert_eq!(health_check.retry_count, 0);
310
311 let health_check_with_retry =
312 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 3);
313 assert_eq!(health_check_with_retry.relayer_id, "test-relayer");
314 assert_eq!(health_check_with_retry.retry_count, 3);
315 }
316
317 fn create_disabled_relayer(id: &str) -> RelayerRepoModel {
318 RelayerRepoModel {
319 id: id.to_string(),
320 name: format!("Relayer {id}"),
321 network: "sepolia".to_string(),
322 paused: false,
323 network_type: NetworkType::Evm,
324 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
325 include_revert_data: None,
326 gas_price_cap: None,
327 whitelist_receivers: None,
328 eip1559_pricing: Some(false),
329 private_transactions: Some(false),
330 min_balance: Some(0),
331 gas_limit_estimation: Some(false),
332 }),
333 signer_id: "test-signer".to_string(),
334 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
335 notification_id: Some("test-notification".to_string()),
336 system_disabled: true,
337 disabled_reason: Some(DisabledReason::RpcValidationFailed(
338 "RPC unavailable".to_string(),
339 )),
340 custom_rpc_urls: None,
341 }
342 }
343
344 #[tokio::test]
345 async fn test_health_check_data_structure() {
346 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
348 assert_eq!(health_check.relayer_id, "test-relayer");
349 assert_eq!(health_check.retry_count, 0);
350
351 let health_check_retry =
353 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 5);
354 assert_eq!(health_check_retry.retry_count, 5);
355
356 let expected_delay = calculate_backoff_delay(5);
358 assert_eq!(expected_delay, Duration::from_secs(60)); }
360
361 #[tokio::test]
363 async fn test_relayer_health_check_handler_impl_exits_on_enabled() {
364 use crate::jobs::MockJobProducerTrait;
365 use crate::models::AppState;
366 use crate::repositories::{
367 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
368 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
369 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
370 };
371 use std::sync::Arc;
372
373 let mock_job_producer = MockJobProducerTrait::new();
375
376 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
378
379 let mut relayer = create_disabled_relayer("test-handler-enabled");
381 relayer.system_disabled = false;
382 relayer.disabled_reason = None;
383 relayer_repo.create(relayer).await.unwrap();
384
385 let app_state = actix_web::web::ThinData(AppState {
387 relayer_repository: relayer_repo,
388 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
389 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
390 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
391 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
392 transaction_counter_store: Arc::new(
393 TransactionCounterRepositoryStorage::new_in_memory(),
394 ),
395 job_producer: Arc::new(mock_job_producer),
396 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
397 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
398 });
399
400 let health_check = RelayerHealthCheck::new("test-handler-enabled".to_string());
402 let job = Job::new(crate::jobs::JobType::RelayerHealthCheck, health_check);
403 let ctx = WorkerContext::new(1, "test-task".into());
404
405 let result = relayer_health_check_handler_impl(job, app_state, ctx).await;
407
408 assert!(result.is_ok());
410 }
411
412 #[tokio::test]
413 async fn test_relayer_health_check_backoff_progression() {
414 let delays: Vec<Duration> = (0..6).map(calculate_backoff_delay).collect();
416
417 assert_eq!(delays[0], Duration::from_secs(10)); assert_eq!(delays[1], Duration::from_secs(20)); assert_eq!(delays[2], Duration::from_secs(30)); assert_eq!(delays[3], Duration::from_secs(45)); assert_eq!(delays[4], Duration::from_secs(60)); assert_eq!(delays[5], Duration::from_secs(60)); for i in 0..4 {
427 assert!(
428 delays[i] < delays[i + 1],
429 "Delay should increase with retry count"
430 );
431 }
432
433 assert_eq!(delays[4], delays[5], "Delay should cap at 60 seconds");
435 }
436
437 #[tokio::test]
438 async fn test_disabled_reason_is_preserved() {
439 use crate::repositories::RelayerRepositoryStorage;
441 let repo = RelayerRepositoryStorage::new_in_memory();
442
443 let relayer = create_disabled_relayer("test-relayer-2");
444 let disabled_reason = relayer.disabled_reason.clone();
445
446 repo.create(relayer).await.unwrap();
447
448 let retrieved = repo.get_by_id("test-relayer-2".to_string()).await.unwrap();
450
451 assert!(retrieved.system_disabled);
452 assert_eq!(retrieved.disabled_reason, disabled_reason);
453
454 if let Some(reason) = &retrieved.disabled_reason {
456 let description = reason.description();
457 assert!(description.contains("RPC"));
458 }
459 }
460
461 #[tokio::test]
462 async fn test_check_and_reenable_relayer_exits_early_if_not_disabled() {
463 use crate::jobs::MockJobProducerTrait;
464 use crate::models::AppState;
465 use crate::repositories::{
466 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
467 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
468 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
469 };
470 use std::sync::Arc;
471
472 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
474
475 let mut relayer = create_disabled_relayer("test-check-enabled");
477 relayer.system_disabled = false;
478 relayer.disabled_reason = None;
479 relayer_repo.create(relayer).await.unwrap();
480
481 let mock_job_producer = MockJobProducerTrait::new();
483
484 let app_state = AppState {
486 relayer_repository: relayer_repo.clone(),
487 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
488 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
489 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
490 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
491 transaction_counter_store: Arc::new(
492 TransactionCounterRepositoryStorage::new_in_memory(),
493 ),
494 job_producer: Arc::new(mock_job_producer),
495 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
496 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
497 };
498
499 let health_check = RelayerHealthCheck::new("test-check-enabled".to_string());
501
502 let thin_app_state = actix_web::web::ThinData(app_state);
504
505 let result = check_and_reenable_relayer(health_check, &thin_app_state).await;
507
508 assert!(result.is_ok());
510
511 let retrieved = relayer_repo
513 .get_by_id("test-check-enabled".to_string())
514 .await
515 .unwrap();
516 assert!(!retrieved.system_disabled);
517 assert!(retrieved.disabled_reason.is_none());
518 }
519
520 #[tokio::test]
521 async fn test_check_and_reenable_variant_comparison() {
522 use crate::models::DisabledReason;
524
525 let reason1 = DisabledReason::RpcValidationFailed("Error A".to_string());
527 let reason2 = DisabledReason::RpcValidationFailed("Error B".to_string());
528 assert!(reason1.same_variant(&reason2));
529
530 let reason3 = DisabledReason::NonceSyncFailed("Error".to_string());
532 assert!(!reason1.same_variant(&reason3));
533
534 let multi1 = DisabledReason::Multiple(vec![
536 DisabledReason::RpcValidationFailed("A".to_string()),
537 DisabledReason::NonceSyncFailed("B".to_string()),
538 ]);
539 let multi2 = DisabledReason::Multiple(vec![
540 DisabledReason::RpcValidationFailed("C".to_string()),
541 DisabledReason::NonceSyncFailed("D".to_string()),
542 ]);
543 assert!(multi1.same_variant(&multi2));
544
545 let multi3 = DisabledReason::Multiple(vec![
547 DisabledReason::RpcValidationFailed("A".to_string()),
548 DisabledReason::BalanceCheckFailed("B".to_string()),
549 ]);
550 assert!(!multi1.same_variant(&multi3));
551 }
552
553 #[tokio::test]
554 async fn test_backoff_delay_calculation_edge_cases() {
555 let delay0 = calculate_backoff_delay(0);
559 assert_eq!(delay0, Duration::from_secs(10));
560
561 let delay_large = calculate_backoff_delay(100);
563 assert_eq!(delay_large, Duration::from_secs(60));
564
565 let mut prev_delay = Duration::from_secs(0);
567 for retry in 0..10 {
568 let delay = calculate_backoff_delay(retry);
569 if delay < Duration::from_secs(60) {
570 assert!(delay > prev_delay, "Retry {retry}: delay should increase");
572 } else {
573 assert_eq!(
575 delay,
576 Duration::from_secs(60),
577 "Retry {retry}: should cap at 60s"
578 );
579 }
580 prev_delay = delay;
581 }
582 }
583
584 #[tokio::test]
585 async fn test_disabled_reason_from_health_failures() {
586 use crate::models::{DisabledReason, HealthCheckFailure};
587
588 let empty_result = DisabledReason::from_health_failures(vec![]);
590 assert!(empty_result.is_none());
591
592 let single_failure = vec![HealthCheckFailure::RpcValidationFailed(
594 "RPC down".to_string(),
595 )];
596 let single_result = DisabledReason::from_health_failures(single_failure);
597 assert!(single_result.is_some());
598 match single_result.unwrap() {
599 DisabledReason::RpcValidationFailed(msg) => {
600 assert_eq!(msg, "RPC down");
601 }
602 _ => panic!("Expected RpcValidationFailed variant"),
603 }
604
605 let multiple_failures = vec![
607 HealthCheckFailure::RpcValidationFailed("RPC error".to_string()),
608 HealthCheckFailure::NonceSyncFailed("Nonce error".to_string()),
609 ];
610 let multiple_result = DisabledReason::from_health_failures(multiple_failures);
611 assert!(multiple_result.is_some());
612 match multiple_result.unwrap() {
613 DisabledReason::Multiple(reasons) => {
614 assert_eq!(reasons.len(), 2);
615 assert!(matches!(reasons[0], DisabledReason::RpcValidationFailed(_)));
616 assert!(matches!(reasons[1], DisabledReason::NonceSyncFailed(_)));
617 }
618 _ => panic!("Expected Multiple variant"),
619 }
620 }
621
622 #[tokio::test]
623 async fn test_relayer_health_check_retry_count_increments() {
624 let retry_counts = vec![0, 1, 2, 5, 10];
626
627 for retry_count in retry_counts {
628 let health_check =
629 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count);
630
631 assert_eq!(health_check.retry_count, retry_count);
633
634 let next_health_check =
636 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count + 1);
637 assert_eq!(next_health_check.retry_count, retry_count + 1);
638
639 let current_delay = calculate_backoff_delay(retry_count);
641 let next_delay = calculate_backoff_delay(retry_count + 1);
642
643 if current_delay < Duration::from_secs(60) {
644 assert!(next_delay >= current_delay);
645 } else {
646 assert_eq!(next_delay, Duration::from_secs(60));
647 }
648 }
649 }
650
651 #[tokio::test]
652 async fn test_repository_enable_disable_operations() {
653 use crate::models::DisabledReason;
654 use crate::repositories::{RelayerRepositoryStorage, Repository};
655
656 let repo = RelayerRepositoryStorage::new_in_memory();
657
658 let mut relayer = create_disabled_relayer("test-enable-disable");
660 relayer.system_disabled = false;
661 relayer.disabled_reason = None;
662 repo.create(relayer).await.unwrap();
663
664 let reason = DisabledReason::RpcValidationFailed("Test error".to_string());
666 let disabled = repo
667 .disable_relayer("test-enable-disable".to_string(), reason.clone())
668 .await
669 .unwrap();
670
671 assert!(disabled.system_disabled);
672 assert_eq!(disabled.disabled_reason, Some(reason));
673
674 let enabled = repo
676 .enable_relayer("test-enable-disable".to_string())
677 .await
678 .unwrap();
679
680 assert!(!enabled.system_disabled);
681 assert!(enabled.disabled_reason.is_none());
682
683 let retrieved = repo
685 .get_by_id("test-enable-disable".to_string())
686 .await
687 .unwrap();
688 assert!(!retrieved.system_disabled);
689 assert!(retrieved.disabled_reason.is_none());
690 }
691
692 #[tokio::test]
693 async fn test_disabled_reason_safe_description() {
694 use crate::models::DisabledReason;
695
696 let reasons = vec![
698 DisabledReason::NonceSyncFailed("Error with API key abc123".to_string()),
699 DisabledReason::RpcValidationFailed(
700 "RPC error: http://secret-rpc.com:8545".to_string(),
701 ),
702 DisabledReason::BalanceCheckFailed("Balance: 1.5 ETH at address 0x123...".to_string()),
703 ];
704
705 for reason in reasons {
706 let safe_desc = reason.safe_description();
707
708 assert!(!safe_desc.contains("abc123"));
710 assert!(!safe_desc.contains("http://"));
711 assert!(!safe_desc.contains("0x123"));
712 assert!(!safe_desc.contains("1.5 ETH"));
713
714 assert!(!safe_desc.is_empty());
716 }
717
718 let multiple = DisabledReason::Multiple(vec![
720 DisabledReason::RpcValidationFailed("Secret RPC info".to_string()),
721 DisabledReason::NonceSyncFailed("Secret nonce info".to_string()),
722 ]);
723
724 let safe_desc = multiple.safe_description();
725 assert!(!safe_desc.contains("Secret"));
726 assert!(safe_desc.contains("RPC endpoint validation failed"));
727 assert!(safe_desc.contains("Nonce synchronization failed"));
728 }
729}