1use std::sync::Arc;
28
29use crate::{
30 constants::{
31 transactions::PENDING_TRANSACTION_STATUSES, EVM_SMALLEST_UNIT_NAME,
32 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
33 },
34 domain::{
35 relayer::{Relayer, RelayerError},
36 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
37 SignTransactionRequest, SignTypedDataRequest,
38 },
39 jobs::{
40 JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionSend,
41 TransactionStatusCheck,
42 },
43 models::{
44 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
45 EvmNetwork, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
46 NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
47 PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError, RpcErrorCodes,
48 TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
49 },
50 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
51 services::{
52 provider::{EvmProvider, EvmProviderTrait},
53 signer::{DataSignerTrait, EvmSigner},
54 TransactionCounterService, TransactionCounterServiceTrait,
55 },
56 utils::calculate_scheduled_timestamp,
57};
58use async_trait::async_trait;
59use eyre::Result;
60use tracing::{debug, error, info, instrument, warn};
61
62use super::{create_error_response, create_success_response, EvmTransactionValidator};
63use crate::utils::{map_provider_error, sanitize_error_description};
64
65#[allow(dead_code)]
66pub struct EvmRelayer<P, RR, NR, TR, J, S, TCS>
67where
68 P: EvmProviderTrait + Send + Sync,
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync + 'static,
73 S: DataSignerTrait + Send + Sync + 'static,
74{
75 pub(super) relayer: RelayerRepoModel,
76 pub(super) signer: S,
77 pub(super) network: EvmNetwork,
78 pub(super) provider: P,
79 pub(super) relayer_repository: Arc<RR>,
80 pub(super) network_repository: Arc<NR>,
81 pub(super) transaction_repository: Arc<TR>,
82 pub(super) job_producer: Arc<J>,
83 pub(super) transaction_counter_service: Arc<TCS>,
84}
85
86#[allow(clippy::too_many_arguments)]
87impl<P, RR, NR, TR, J, S, TCS> EvmRelayer<P, RR, NR, TR, J, S, TCS>
88where
89 P: EvmProviderTrait + Send + Sync,
90 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
91 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
92 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
93 J: JobProducerTrait + Send + Sync + 'static,
94 S: DataSignerTrait + Send + Sync + 'static,
95 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
96{
97 pub fn new(
114 relayer: RelayerRepoModel,
115 signer: S,
116 provider: P,
117 network: EvmNetwork,
118 relayer_repository: Arc<RR>,
119 network_repository: Arc<NR>,
120 transaction_repository: Arc<TR>,
121 transaction_counter_service: Arc<TCS>,
122 job_producer: Arc<J>,
123 ) -> Result<Self, RelayerError> {
124 Ok(Self {
125 relayer,
126 signer,
127 network,
128 provider,
129 relayer_repository,
130 network_repository,
131 transaction_repository,
132 transaction_counter_service,
133 job_producer,
134 })
135 }
136
137 #[instrument(
143 level = "debug",
144 skip(self),
145 fields(
146 request_id = ?crate::observability::request_id::get_request_id(),
147 relayer_id = %self.relayer.id,
148 )
149 )]
150 async fn validate_rpc(&self) -> Result<(), RelayerError> {
151 self.provider
152 .health_check()
153 .await
154 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
155
156 Ok(())
157 }
158
159 #[instrument(
169 level = "debug",
170 skip(self, transaction),
171 fields(
172 request_id = ?crate::observability::request_id::get_request_id(),
173 relayer_id = %self.relayer.id,
174 tx_id = %transaction.id,
175 )
176 )]
177 async fn cancel_transaction_via_job(
178 &self,
179 transaction: TransactionRepoModel,
180 ) -> Result<(), RelayerError> {
181 let cancel_job = TransactionSend::cancel(
182 transaction.id.clone(),
183 transaction.relayer_id.clone(),
184 "Cancelled via delete_pending_transactions".to_string(),
185 );
186
187 self.job_producer
188 .produce_submit_transaction_job(cancel_job, None)
189 .await
190 .map_err(RelayerError::from)?;
191
192 Ok(())
193 }
194}
195
196pub type DefaultEvmRelayer<J, T, RR, NR, TCR> =
198 EvmRelayer<EvmProvider, RR, NR, T, J, EvmSigner, TransactionCounterService<TCR>>;
199
200#[async_trait]
201impl<P, RR, NR, TR, J, S, TCS> Relayer for EvmRelayer<P, RR, NR, TR, J, S, TCS>
202where
203 P: EvmProviderTrait + Send + Sync,
204 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
205 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
206 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
207 J: JobProducerTrait + Send + Sync + 'static,
208 S: DataSignerTrait + Send + Sync + 'static,
209 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
210{
211 #[instrument(
221 level = "debug",
222 skip(self, network_transaction),
223 fields(
224 request_id = ?crate::observability::request_id::get_request_id(),
225 relayer_id = %self.relayer.id,
226 network_type = ?self.relayer.network_type,
227 )
228 )]
229 async fn process_transaction_request(
230 &self,
231 network_transaction: NetworkTransactionRequest,
232 ) -> Result<TransactionRepoModel, RelayerError> {
233 let network_model = self
234 .network_repository
235 .get_by_name(NetworkType::Evm, &self.relayer.network)
236 .await?
237 .ok_or_else(|| {
238 RelayerError::NetworkConfiguration(format!(
239 "Network {} not found",
240 self.relayer.network
241 ))
242 })?;
243 let transaction =
244 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
245
246 self.transaction_repository
247 .create(transaction.clone())
248 .await
249 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
250
251 if let Err(e) = self
255 .job_producer
256 .produce_check_transaction_status_job(
257 TransactionStatusCheck::new(
258 transaction.id.clone(),
259 transaction.relayer_id.clone(),
260 crate::models::NetworkType::Evm,
261 ),
262 Some(calculate_scheduled_timestamp(
263 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
264 )),
265 )
266 .await
267 {
268 error!(
270 relayer_id = %self.relayer.id,
271 transaction_id = %transaction.id,
272 error = %e,
273 "Status check queue push failed - marking transaction as failed"
274 );
275 if let Err(update_err) = self
276 .transaction_repository
277 .partial_update(
278 transaction.id.clone(),
279 TransactionUpdateRequest {
280 status: Some(TransactionStatus::Failed),
281 status_reason: Some("Queue unavailable".to_string()),
282 ..Default::default()
283 },
284 )
285 .await
286 {
287 warn!(
288 relayer_id = %self.relayer.id,
289 transaction_id = %transaction.id,
290 error = %update_err,
291 "Failed to mark transaction as failed after queue push failure"
292 );
293 }
294 return Err(e.into());
295 }
296
297 self.job_producer
300 .produce_transaction_request_job(
301 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
302 None,
303 )
304 .await?;
305
306 Ok(transaction)
307 }
308
309 #[instrument(
315 level = "debug",
316 skip(self),
317 fields(
318 request_id = ?crate::observability::request_id::get_request_id(),
319 relayer_id = %self.relayer.id,
320 )
321 )]
322 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
323 let balance: u128 = self
324 .provider
325 .get_balance(&self.relayer.address)
326 .await
327 .map_err(|e| RelayerError::ProviderError(e.to_string()))?
328 .try_into()
329 .map_err(|_| {
330 RelayerError::ProviderError("Failed to convert balance to u128".to_string())
331 })?;
332
333 Ok(BalanceResponse {
334 balance,
335 unit: EVM_SMALLEST_UNIT_NAME.to_string(),
336 })
337 }
338
339 #[instrument(
345 level = "debug",
346 skip(self),
347 fields(
348 request_id = ?crate::observability::request_id::get_request_id(),
349 relayer_id = %self.relayer.id,
350 )
351 )]
352 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
353 let relayer_model = &self.relayer;
354
355 let nonce = self
357 .transaction_counter_service
358 .get()
359 .await
360 .ok()
361 .flatten()
362 .unwrap_or(0);
363 let nonce_str = nonce.to_string();
364
365 let balance_response = self.get_balance().await?;
366
367 let pending_transactions_count = self
369 .transaction_repository
370 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
371 .await
372 .map_err(RelayerError::from)?;
373
374 let last_confirmed_transaction_timestamp = self
376 .transaction_repository
377 .find_by_status_paginated(
378 &relayer_model.id,
379 &[TransactionStatus::Confirmed],
380 PaginationQuery {
381 page: 1,
382 per_page: 1,
383 },
384 false, )
386 .await
387 .map_err(RelayerError::from)?
388 .items
389 .into_iter()
390 .next()
391 .and_then(|tx| tx.confirmed_at);
392
393 Ok(RelayerStatus::Evm {
394 balance: balance_response.balance.to_string(),
395 pending_transactions_count,
396 last_confirmed_transaction_timestamp,
397 system_disabled: relayer_model.system_disabled,
398 paused: relayer_model.paused,
399 nonce: nonce_str,
400 })
401 }
402
403 #[instrument(
410 level = "debug",
411 skip(self),
412 fields(
413 request_id = ?crate::observability::request_id::get_request_id(),
414 relayer_id = %self.relayer.id,
415 )
416 )]
417 async fn delete_pending_transactions(
418 &self,
419 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
420 let pending_statuses = [
421 TransactionStatus::Pending,
422 TransactionStatus::Sent,
423 TransactionStatus::Submitted,
424 ];
425
426 let pending_transactions = self
428 .transaction_repository
429 .find_by_status(&self.relayer.id, &pending_statuses[..])
430 .await
431 .map_err(RelayerError::from)?;
432
433 let transaction_count = pending_transactions.len();
434
435 if transaction_count == 0 {
436 info!(
437 relayer_id = %self.relayer.id,
438 "no pending transactions found for relayer"
439 );
440 return Ok(DeletePendingTransactionsResponse {
441 queued_for_cancellation_transaction_ids: vec![],
442 failed_to_queue_transaction_ids: vec![],
443 total_processed: 0,
444 });
445 }
446
447 info!(
448 relayer_id = %self.relayer.id,
449 transaction_count = %transaction_count,
450 "processing pending transactions for relayer"
451 );
452
453 let mut cancelled_transaction_ids = Vec::new();
454 let mut failed_transaction_ids = Vec::new();
455
456 for transaction in pending_transactions {
458 match self.cancel_transaction_via_job(transaction.clone()).await {
459 Ok(_) => {
460 cancelled_transaction_ids.push(transaction.id.clone());
461 info!(
462 tx_id = %transaction.id,
463 relayer_id = %self.relayer.id,
464 status = ?transaction.status,
465 "initiated cancellation for transaction"
466 );
467 }
468 Err(e) => {
469 failed_transaction_ids.push(transaction.id.clone());
470 warn!(
471 tx_id = %transaction.id,
472 relayer_id = %self.relayer.id,
473 error = %e,
474 "failed to cancel transaction"
475 );
476 }
477 }
478 }
479
480 let total_processed = cancelled_transaction_ids.len() + failed_transaction_ids.len();
481
482 debug!(
483 queued_for_cancellation = %cancelled_transaction_ids.len(),
484 failed_to_queue = %failed_transaction_ids.len(),
485 "completed processing pending transactions for relayer"
486 );
487
488 Ok(DeletePendingTransactionsResponse {
489 queued_for_cancellation_transaction_ids: cancelled_transaction_ids,
490 failed_to_queue_transaction_ids: failed_transaction_ids,
491 total_processed: total_processed as u32,
492 })
493 }
494
495 #[instrument(
505 level = "debug",
506 skip(self, request),
507 fields(
508 request_id = ?crate::observability::request_id::get_request_id(),
509 relayer_id = %self.relayer.id,
510 )
511 )]
512 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
513 let result = self.signer.sign_data(request).await?;
514
515 Ok(result)
516 }
517
518 #[instrument(
528 level = "debug",
529 skip(self, request),
530 fields(
531 request_id = ?crate::observability::request_id::get_request_id(),
532 relayer_id = %self.relayer.id,
533 )
534 )]
535 async fn sign_typed_data(
536 &self,
537 request: SignTypedDataRequest,
538 ) -> Result<SignDataResponse, RelayerError> {
539 let result = self.signer.sign_typed_data(request).await?;
540
541 Ok(result)
542 }
543
544 #[instrument(
554 level = "debug",
555 skip(self, request),
556 fields(
557 request_id = ?crate::observability::request_id::get_request_id(),
558 relayer_id = %self.relayer.id,
559 )
560 )]
561 async fn rpc(
562 &self,
563 request: JsonRpcRequest<NetworkRpcRequest>,
564 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
565 let evm_request = match request.params {
566 NetworkRpcRequest::Evm(evm_req) => evm_req,
567 _ => {
568 return Ok(create_error_response(
569 request.id,
570 RpcErrorCodes::INVALID_PARAMS,
571 "Invalid params",
572 "Expected EVM network request",
573 ))
574 }
575 };
576
577 let (method, params_json) = match evm_request {
579 crate::models::EvmRpcRequest::RawRpcRequest { method, params } => (method, params),
580 };
581
582 match self.provider.raw_request_dyn(&method, params_json).await {
584 Ok(result_value) => Ok(create_success_response(request.id, result_value)),
585 Err(provider_error) => {
586 tracing::error!(
588 error = %provider_error,
589 "RPC provider error occurred"
590 );
591 let (error_code, error_message) = map_provider_error(&provider_error);
592 let sanitized_description = sanitize_error_description(&provider_error);
593 Ok(create_error_response(
594 request.id,
595 error_code,
596 error_message,
597 &sanitized_description,
598 ))
599 }
600 }
601 }
602
603 #[instrument(
609 level = "debug",
610 skip(self),
611 fields(
612 request_id = ?crate::observability::request_id::get_request_id(),
613 relayer_id = %self.relayer.id,
614 )
615 )]
616 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
617 let policy = self.relayer.policies.get_evm_policy();
618 EvmTransactionValidator::init_balance_validation(
619 &self.relayer.address,
620 &policy,
621 &self.provider,
622 )
623 .await
624 .map_err(|e| RelayerError::InsufficientBalanceError(e.to_string()))?;
625
626 Ok(())
627 }
628
629 #[instrument(
635 level = "debug",
636 skip(self),
637 fields(
638 request_id = ?crate::observability::request_id::get_request_id(),
639 relayer_id = %self.relayer.id,
640 )
641 )]
642 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
643 debug!("running health checks");
644
645 let nonce_sync_result = self.sync_nonce().await;
646 let validate_rpc_result = self.validate_rpc().await;
647 let validate_min_balance_result = self.validate_min_balance().await;
648
649 let failures: Vec<HealthCheckFailure> = vec![
651 nonce_sync_result
652 .err()
653 .map(|e| HealthCheckFailure::NonceSyncFailed(e.to_string())),
654 validate_rpc_result
655 .err()
656 .map(|e| HealthCheckFailure::RpcValidationFailed(e.to_string())),
657 validate_min_balance_result
658 .err()
659 .map(|e| HealthCheckFailure::BalanceCheckFailed(e.to_string())),
660 ]
661 .into_iter()
662 .flatten()
663 .collect();
664
665 if failures.is_empty() {
666 info!("all health checks passed");
667 Ok(())
668 } else {
669 warn!("health checks failed: {:?}", failures);
670 Err(failures)
671 }
672 }
673
674 #[instrument(
675 level = "debug",
676 skip(self),
677 fields(
678 request_id = ?crate::observability::request_id::get_request_id(),
679 relayer_id = %self.relayer.id,
680 )
681 )]
682 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
683 debug!("initializing EVM relayer");
684
685 match self.check_health().await {
686 Ok(_) => {
687 if self.relayer.system_disabled {
689 self.relayer_repository
691 .enable_relayer(self.relayer.id.clone())
692 .await?;
693 }
694 Ok(())
695 }
696 Err(failures) => {
697 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
699 DisabledReason::RpcValidationFailed("Unknown error".to_string())
700 });
701
702 warn!(reason = %reason, "disabling relayer");
703 let updated_relayer = self
704 .relayer_repository
705 .disable_relayer(self.relayer.id.clone(), reason.clone())
706 .await?;
707
708 if let Some(notification_id) = &self.relayer.notification_id {
710 self.job_producer
711 .produce_send_notification_job(
712 produce_relayer_disabled_payload(
713 notification_id,
714 &updated_relayer,
715 &reason.safe_description(),
716 ),
717 None,
718 )
719 .await?;
720 }
721
722 self.job_producer
724 .produce_relayer_health_check_job(
725 RelayerHealthCheck::new(self.relayer.id.clone()),
726 Some(calculate_scheduled_timestamp(10)),
727 )
728 .await?;
729
730 Ok(())
731 }
732 }
733 }
734
735 #[instrument(
736 level = "debug",
737 skip(self, _request),
738 fields(
739 request_id = ?crate::observability::request_id::get_request_id(),
740 relayer_id = %self.relayer.id,
741 )
742 )]
743 async fn sign_transaction(
744 &self,
745 _request: &SignTransactionRequest,
746 ) -> Result<SignTransactionExternalResponse, RelayerError> {
747 Err(RelayerError::NotSupported(
748 "Transaction signing not supported for EVM".to_string(),
749 ))
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::models::RpcConfig;
757 use crate::{
758 config::{EvmNetworkConfig, NetworkConfigCommon},
759 jobs::MockJobProducerTrait,
760 models::{
761 EvmRpcRequest, EvmRpcResult, JsonRpcId, NetworkRepoModel, NetworkType,
762 RelayerEvmPolicy, RelayerNetworkPolicy, RepositoryError, SignerError,
763 TransactionStatus, U256,
764 },
765 repositories::{MockNetworkRepository, MockRelayerRepository, MockTransactionRepository},
766 services::{
767 provider::{MockEvmProviderTrait, ProviderError},
768 MockTransactionCounterServiceTrait,
769 },
770 };
771 use mockall::predicate::*;
772 use std::future::ready;
773
774 mockall::mock! {
775 pub DataSigner {}
776
777 #[async_trait]
778 impl DataSignerTrait for DataSigner {
779 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, SignerError>;
780 async fn sign_typed_data(&self, request: SignTypedDataRequest) -> Result<SignDataResponse, SignerError>;
781 }
782 }
783
784 fn create_test_evm_network() -> EvmNetwork {
785 EvmNetwork {
786 network: "mainnet".to_string(),
787 rpc_urls: vec![RpcConfig::new(
788 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
789 )],
790 explorer_urls: None,
791 average_blocktime_ms: 12000,
792 is_testnet: false,
793 tags: vec!["mainnet".to_string()],
794 chain_id: 1,
795 required_confirmations: 1,
796 features: vec!["eip1559".to_string()],
797 symbol: "ETH".to_string(),
798 gas_price_cache: None,
799 }
800 }
801
802 fn create_test_network_repo_model() -> NetworkRepoModel {
803 let config = EvmNetworkConfig {
804 common: NetworkConfigCommon {
805 network: "mainnet".to_string(),
806 from: None,
807 rpc_urls: Some(vec![crate::models::RpcConfig::new(
808 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
809 )]),
810 explorer_urls: None,
811 average_blocktime_ms: Some(12000),
812 is_testnet: Some(false),
813 tags: Some(vec!["mainnet".to_string()]),
814 },
815 chain_id: Some(1),
816 required_confirmations: Some(1),
817 features: Some(vec!["eip1559".to_string()]),
818 symbol: Some("ETH".to_string()),
819 gas_price_cache: None,
820 };
821
822 NetworkRepoModel::new_evm(config)
823 }
824
825 fn create_test_relayer() -> RelayerRepoModel {
826 RelayerRepoModel {
827 id: "test-relayer-id".to_string(),
828 name: "Test Relayer".to_string(),
829 network: "mainnet".to_string(), address: "0xSender".to_string(),
831 paused: false,
832 system_disabled: false,
833 signer_id: "test-signer-id".to_string(),
834 notification_id: Some("test-notification-id".to_string()),
835 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
836 include_revert_data: None,
837 min_balance: Some(100000000000000000u128), whitelist_receivers: Some(vec!["0xRecipient".to_string()]),
839 gas_price_cap: Some(100000000000), eip1559_pricing: Some(true),
841 private_transactions: Some(false),
842 gas_limit_estimation: Some(true),
843 }),
844 network_type: NetworkType::Evm,
845 custom_rpc_urls: None,
846 ..Default::default()
847 }
848 }
849
850 fn setup_mocks() -> (
851 MockEvmProviderTrait,
852 MockRelayerRepository,
853 MockNetworkRepository,
854 MockTransactionRepository,
855 MockJobProducerTrait,
856 MockDataSigner,
857 MockTransactionCounterServiceTrait,
858 ) {
859 (
860 MockEvmProviderTrait::new(),
861 MockRelayerRepository::new(),
862 MockNetworkRepository::new(),
863 MockTransactionRepository::new(),
864 MockJobProducerTrait::new(),
865 MockDataSigner::new(),
866 MockTransactionCounterServiceTrait::new(),
867 )
868 }
869
870 #[tokio::test]
871 async fn test_get_balance() {
872 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
873 setup_mocks();
874 let relayer_model = create_test_relayer();
875
876 provider
877 .expect_get_balance()
878 .with(eq("0xSender"))
879 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64))))); let relayer = EvmRelayer::new(
882 relayer_model,
883 signer,
884 provider,
885 create_test_evm_network(),
886 Arc::new(relayer_repo),
887 Arc::new(network_repo),
888 Arc::new(tx_repo),
889 Arc::new(counter),
890 Arc::new(job_producer),
891 )
892 .unwrap();
893
894 let balance = relayer.get_balance().await.unwrap();
895 assert_eq!(balance.balance, 1000000000000000000u128);
896 assert_eq!(balance.unit, EVM_SMALLEST_UNIT_NAME);
897 }
898
899 #[tokio::test]
900 async fn test_process_transaction_request() {
901 let (
902 provider,
903 relayer_repo,
904 mut network_repo,
905 mut tx_repo,
906 mut job_producer,
907 signer,
908 counter,
909 ) = setup_mocks();
910 let relayer_model = create_test_relayer();
911
912 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
913 to: Some("0xRecipient".to_string()),
914 value: U256::from(1000000000000000000u64),
915 data: Some("0xData".to_string()),
916 gas_limit: Some(21000),
917 gas_price: Some(20000000000),
918 max_fee_per_gas: None,
919 max_priority_fee_per_gas: None,
920 speed: None,
921 valid_until: None,
922 });
923
924 network_repo
925 .expect_get_by_name()
926 .with(eq(NetworkType::Evm), eq("mainnet"))
927 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
928
929 tx_repo.expect_create().returning(Ok);
930 job_producer
931 .expect_produce_transaction_request_job()
932 .returning(|_, _| Box::pin(ready(Ok(()))));
933 job_producer
934 .expect_produce_check_transaction_status_job()
935 .returning(|_, _| Box::pin(ready(Ok(()))));
936
937 let relayer = EvmRelayer::new(
938 relayer_model,
939 signer,
940 provider,
941 create_test_evm_network(),
942 Arc::new(relayer_repo),
943 Arc::new(network_repo),
944 Arc::new(tx_repo),
945 Arc::new(counter),
946 Arc::new(job_producer),
947 )
948 .unwrap();
949
950 let result = relayer.process_transaction_request(network_tx).await;
951 assert!(result.is_ok());
952 }
953
954 #[tokio::test]
955 async fn test_process_transaction_request_status_check_failure_returns_error() {
956 let (
957 provider,
958 relayer_repo,
959 mut network_repo,
960 mut tx_repo,
961 mut job_producer,
962 signer,
963 counter,
964 ) = setup_mocks();
965 let relayer_model = create_test_relayer();
966
967 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
968 to: Some("0xRecipient".to_string()),
969 value: U256::from(1000000000000000000u64),
970 data: Some("0xData".to_string()),
971 gas_limit: Some(21000),
972 gas_price: Some(20000000000),
973 max_fee_per_gas: None,
974 max_priority_fee_per_gas: None,
975 speed: None,
976 valid_until: None,
977 });
978
979 network_repo
980 .expect_get_by_name()
981 .with(eq(NetworkType::Evm), eq("mainnet"))
982 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
983
984 tx_repo.expect_create().returning(Ok);
985 tx_repo
987 .expect_partial_update()
988 .returning(|_, _| Ok(TransactionRepoModel::default()));
989
990 job_producer
992 .expect_produce_check_transaction_status_job()
993 .returning(|_, _| {
994 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
995 "Failed to queue job".to_string(),
996 ))))
997 });
998
999 let relayer = EvmRelayer::new(
1003 relayer_model,
1004 signer,
1005 provider,
1006 create_test_evm_network(),
1007 Arc::new(relayer_repo),
1008 Arc::new(network_repo),
1009 Arc::new(tx_repo),
1010 Arc::new(counter),
1011 Arc::new(job_producer),
1012 )
1013 .unwrap();
1014
1015 let result = relayer.process_transaction_request(network_tx).await;
1016 assert!(result.is_err());
1017 }
1018
1019 #[tokio::test]
1020 async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
1021 let (
1022 provider,
1023 relayer_repo,
1024 mut network_repo,
1025 mut tx_repo,
1026 mut job_producer,
1027 signer,
1028 counter,
1029 ) = setup_mocks();
1030 let relayer_model = create_test_relayer();
1031
1032 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
1033 to: Some("0xRecipient".to_string()),
1034 value: U256::from(1000000000000000000u64),
1035 data: Some("0xData".to_string()),
1036 gas_limit: Some(21000),
1037 gas_price: Some(20000000000),
1038 max_fee_per_gas: None,
1039 max_priority_fee_per_gas: None,
1040 speed: None,
1041 valid_until: None,
1042 });
1043
1044 network_repo
1045 .expect_get_by_name()
1046 .with(eq(NetworkType::Evm), eq("mainnet"))
1047 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
1048
1049 tx_repo.expect_create().returning(Ok);
1050
1051 tx_repo
1053 .expect_partial_update()
1054 .withf(|_tx_id, update| {
1055 update.status == Some(TransactionStatus::Failed)
1056 && update.status_reason == Some("Queue unavailable".to_string())
1057 })
1058 .returning(|_, _| Ok(TransactionRepoModel::default()));
1059
1060 job_producer
1061 .expect_produce_check_transaction_status_job()
1062 .returning(|_, _| {
1063 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1064 "Redis timeout".to_string(),
1065 ))))
1066 });
1067
1068 let relayer = EvmRelayer::new(
1069 relayer_model,
1070 signer,
1071 provider,
1072 create_test_evm_network(),
1073 Arc::new(relayer_repo),
1074 Arc::new(network_repo),
1075 Arc::new(tx_repo),
1076 Arc::new(counter),
1077 Arc::new(job_producer),
1078 )
1079 .unwrap();
1080
1081 let result = relayer.process_transaction_request(network_tx).await;
1082 assert!(result.is_err());
1083 }
1085
1086 #[tokio::test]
1087 async fn test_validate_min_balance_sufficient() {
1088 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1089 setup_mocks();
1090 let relayer_model = create_test_relayer();
1091
1092 provider
1093 .expect_get_balance()
1094 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); let relayer = EvmRelayer::new(
1097 relayer_model,
1098 signer,
1099 provider,
1100 create_test_evm_network(),
1101 Arc::new(relayer_repo),
1102 Arc::new(network_repo),
1103 Arc::new(tx_repo),
1104 Arc::new(counter),
1105 Arc::new(job_producer),
1106 )
1107 .unwrap();
1108
1109 let result = relayer.validate_min_balance().await;
1110 assert!(result.is_ok());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_validate_min_balance_insufficient() {
1115 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1116 setup_mocks();
1117 let relayer_model = create_test_relayer();
1118
1119 provider
1120 .expect_get_balance()
1121 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); let relayer = EvmRelayer::new(
1124 relayer_model,
1125 signer,
1126 provider,
1127 create_test_evm_network(),
1128 Arc::new(relayer_repo),
1129 Arc::new(network_repo),
1130 Arc::new(tx_repo),
1131 Arc::new(counter),
1132 Arc::new(job_producer),
1133 )
1134 .unwrap();
1135
1136 let result = relayer.validate_min_balance().await;
1137 assert!(matches!(
1138 result,
1139 Err(RelayerError::InsufficientBalanceError(_))
1140 ));
1141 }
1142
1143 #[tokio::test]
1144 async fn test_sync_nonce() {
1145 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1146 setup_mocks();
1147 let relayer_model = create_test_relayer();
1148
1149 provider
1150 .expect_get_transaction_count()
1151 .returning(|_| Box::pin(ready(Ok(42u64))));
1152
1153 counter
1154 .expect_set()
1155 .returning(|_nonce| Box::pin(ready(Ok(()))));
1156
1157 counter
1158 .expect_get()
1159 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1160
1161 let relayer = EvmRelayer::new(
1162 relayer_model,
1163 signer,
1164 provider,
1165 create_test_evm_network(),
1166 Arc::new(relayer_repo),
1167 Arc::new(network_repo),
1168 Arc::new(tx_repo),
1169 Arc::new(counter),
1170 Arc::new(job_producer),
1171 )
1172 .unwrap();
1173
1174 let result = relayer.sync_nonce().await;
1175 assert!(result.is_ok());
1176 }
1177
1178 #[tokio::test]
1179 async fn test_sync_nonce_lower_on_chain_nonce() {
1180 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1181 setup_mocks();
1182 let relayer_model = create_test_relayer();
1183
1184 provider
1185 .expect_get_transaction_count()
1186 .returning(|_| Box::pin(ready(Ok(40u64))));
1187
1188 counter
1189 .expect_set()
1190 .with(eq(42u64))
1191 .returning(|_nonce| Box::pin(ready(Ok(()))));
1192
1193 counter
1194 .expect_get()
1195 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1196
1197 let relayer = EvmRelayer::new(
1198 relayer_model,
1199 signer,
1200 provider,
1201 create_test_evm_network(),
1202 Arc::new(relayer_repo),
1203 Arc::new(network_repo),
1204 Arc::new(tx_repo),
1205 Arc::new(counter),
1206 Arc::new(job_producer),
1207 )
1208 .unwrap();
1209
1210 let result = relayer.sync_nonce().await;
1211 assert!(result.is_ok());
1212 }
1213
1214 #[tokio::test]
1215 async fn test_sync_nonce_lower_transaction_counter_nonce() {
1216 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1217 setup_mocks();
1218 let relayer_model = create_test_relayer();
1219
1220 provider
1221 .expect_get_transaction_count()
1222 .returning(|_| Box::pin(ready(Ok(42u64))));
1223
1224 counter
1225 .expect_set()
1226 .with(eq(42u64))
1227 .returning(|_nonce| Box::pin(ready(Ok(()))));
1228
1229 counter
1230 .expect_get()
1231 .returning(|| Box::pin(ready(Ok(Some(40u64)))));
1232
1233 let relayer = EvmRelayer::new(
1234 relayer_model,
1235 signer,
1236 provider,
1237 create_test_evm_network(),
1238 Arc::new(relayer_repo),
1239 Arc::new(network_repo),
1240 Arc::new(tx_repo),
1241 Arc::new(counter),
1242 Arc::new(job_producer),
1243 )
1244 .unwrap();
1245
1246 let result = relayer.sync_nonce().await;
1247 assert!(result.is_ok());
1248 }
1249
1250 #[tokio::test]
1251 async fn test_validate_rpc() {
1252 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1253 setup_mocks();
1254 let relayer_model = create_test_relayer();
1255
1256 provider
1257 .expect_health_check()
1258 .returning(|| Box::pin(ready(Ok(true))));
1259
1260 let relayer = EvmRelayer::new(
1261 relayer_model,
1262 signer,
1263 provider,
1264 create_test_evm_network(),
1265 Arc::new(relayer_repo),
1266 Arc::new(network_repo),
1267 Arc::new(tx_repo),
1268 Arc::new(counter),
1269 Arc::new(job_producer),
1270 )
1271 .unwrap();
1272
1273 let result = relayer.validate_rpc().await;
1274 assert!(result.is_ok());
1275 }
1276
1277 #[tokio::test]
1278 async fn test_get_status_success() {
1279 let (
1280 mut provider,
1281 relayer_repo,
1282 network_repo,
1283 mut tx_repo,
1284 job_producer,
1285 signer,
1286 mut counter,
1287 ) = setup_mocks();
1288 let relayer_model = create_test_relayer();
1289
1290 counter
1292 .expect_get()
1293 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1294 .once();
1295 provider
1296 .expect_get_balance()
1297 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1298 .once();
1299
1300 tx_repo
1302 .expect_count_by_status()
1303 .withf(|relayer_id, statuses| {
1304 relayer_id == "test-relayer-id"
1305 && statuses
1306 == [
1307 TransactionStatus::Pending,
1308 TransactionStatus::Sent,
1309 TransactionStatus::Submitted,
1310 ]
1311 })
1312 .returning(|_, _| Ok(0u64))
1313 .once();
1314
1315 let latest_confirmed_tx = TransactionRepoModel {
1317 id: "tx1".to_string(),
1318 relayer_id: relayer_model.id.clone(),
1319 status: TransactionStatus::Confirmed,
1320 confirmed_at: Some("2023-01-01T12:00:00Z".to_string()),
1321 ..TransactionRepoModel::default()
1322 };
1323 let relayer_id_clone = relayer_model.id.clone();
1324 tx_repo
1325 .expect_find_by_status_paginated()
1326 .withf(move |relayer_id, statuses, query, oldest_first| {
1327 *relayer_id == relayer_id_clone
1328 && statuses == [TransactionStatus::Confirmed]
1329 && query.page == 1
1330 && query.per_page == 1
1331 && !(*oldest_first)
1332 })
1333 .returning(move |_, _, _, _| {
1334 Ok(crate::repositories::PaginatedResult {
1335 items: vec![latest_confirmed_tx.clone()],
1336 total: 1,
1337 page: 1,
1338 per_page: 1,
1339 })
1340 })
1341 .once();
1342
1343 let relayer = EvmRelayer::new(
1344 relayer_model.clone(),
1345 signer,
1346 provider,
1347 create_test_evm_network(),
1348 Arc::new(relayer_repo),
1349 Arc::new(network_repo),
1350 Arc::new(tx_repo),
1351 Arc::new(counter),
1352 Arc::new(job_producer),
1353 )
1354 .unwrap();
1355
1356 let status = relayer.get_status().await.unwrap();
1357
1358 match status {
1359 RelayerStatus::Evm {
1360 balance,
1361 pending_transactions_count,
1362 last_confirmed_transaction_timestamp,
1363 system_disabled,
1364 paused,
1365 nonce,
1366 } => {
1367 assert_eq!(balance, "1000000000000000000");
1368 assert_eq!(pending_transactions_count, 0);
1369 assert_eq!(
1370 last_confirmed_transaction_timestamp,
1371 Some("2023-01-01T12:00:00Z".to_string())
1372 );
1373 assert_eq!(system_disabled, relayer_model.system_disabled);
1374 assert_eq!(paused, relayer_model.paused);
1375 assert_eq!(nonce, "10");
1376 }
1377 _ => panic!("Expected EVM RelayerStatus"),
1378 }
1379 }
1380
1381 #[tokio::test]
1382 async fn test_get_status_provider_nonce_error() {
1383 let (
1384 mut provider,
1385 relayer_repo,
1386 network_repo,
1387 mut tx_repo,
1388 job_producer,
1389 signer,
1390 mut counter,
1391 ) = setup_mocks();
1392 let relayer_model = create_test_relayer();
1393
1394 counter
1396 .expect_get()
1397 .returning(|| Box::pin(ready(Ok(None))))
1398 .once();
1399 provider
1400 .expect_get_balance()
1401 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1402 .once();
1403
1404 tx_repo
1406 .expect_count_by_status()
1407 .returning(|_, _| Ok(0u64))
1408 .once();
1409
1410 tx_repo
1412 .expect_find_by_status_paginated()
1413 .withf(|_relayer_id, statuses, query, oldest_first| {
1414 statuses == [TransactionStatus::Confirmed]
1415 && query.page == 1
1416 && query.per_page == 1
1417 && !(*oldest_first)
1418 })
1419 .returning(|_, _, _, _| {
1420 Ok(crate::repositories::PaginatedResult {
1421 items: vec![],
1422 total: 0,
1423 page: 1,
1424 per_page: 1,
1425 })
1426 })
1427 .once();
1428
1429 let relayer = EvmRelayer::new(
1430 relayer_model.clone(),
1431 signer,
1432 provider,
1433 create_test_evm_network(),
1434 Arc::new(relayer_repo),
1435 Arc::new(network_repo),
1436 Arc::new(tx_repo),
1437 Arc::new(counter),
1438 Arc::new(job_producer),
1439 )
1440 .unwrap();
1441
1442 let status = relayer.get_status().await.unwrap();
1444 match status {
1445 RelayerStatus::Evm { nonce, .. } => {
1446 assert_eq!(nonce, "0");
1447 }
1448 _ => panic!("Expected Evm status"),
1449 }
1450 }
1451
1452 #[tokio::test]
1453 async fn test_get_status_repository_pending_error() {
1454 let (
1455 mut provider,
1456 relayer_repo,
1457 network_repo,
1458 mut tx_repo,
1459 job_producer,
1460 signer,
1461 mut counter,
1462 ) = setup_mocks();
1463 let relayer_model = create_test_relayer();
1464
1465 counter
1467 .expect_get()
1468 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1469 .once();
1470 provider
1471 .expect_get_balance()
1472 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1473
1474 tx_repo
1475 .expect_count_by_status()
1476 .withf(|relayer_id, statuses| {
1477 relayer_id == "test-relayer-id"
1478 && statuses
1479 == [
1480 TransactionStatus::Pending,
1481 TransactionStatus::Sent,
1482 TransactionStatus::Submitted,
1483 ]
1484 })
1485 .returning(|_, _| Err(RepositoryError::Unknown("DB down".to_string())))
1486 .once();
1487
1488 let relayer = EvmRelayer::new(
1489 relayer_model.clone(),
1490 signer,
1491 provider,
1492 create_test_evm_network(),
1493 Arc::new(relayer_repo),
1494 Arc::new(network_repo),
1495 Arc::new(tx_repo),
1496 Arc::new(counter),
1497 Arc::new(job_producer),
1498 )
1499 .unwrap();
1500
1501 let result = relayer.get_status().await;
1502 assert!(result.is_err());
1503 match result.err().unwrap() {
1504 RelayerError::NetworkConfiguration(msg) => assert!(msg.contains("DB down")),
1506 _ => panic!("Expected NetworkConfiguration error for repo failure"),
1507 }
1508 }
1509
1510 #[tokio::test]
1511 async fn test_get_status_no_confirmed_transactions() {
1512 let (
1513 mut provider,
1514 relayer_repo,
1515 network_repo,
1516 mut tx_repo,
1517 job_producer,
1518 signer,
1519 mut counter,
1520 ) = setup_mocks();
1521 let relayer_model = create_test_relayer();
1522
1523 counter
1525 .expect_get()
1526 .returning(|| Box::pin(ready(Ok(Some(10u64)))));
1527 provider
1528 .expect_get_balance()
1529 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1530 provider
1531 .expect_health_check()
1532 .returning(|| Box::pin(ready(Ok(true))));
1533
1534 tx_repo
1536 .expect_count_by_status()
1537 .withf(|relayer_id, statuses| {
1538 relayer_id == "test-relayer-id"
1539 && statuses
1540 == [
1541 TransactionStatus::Pending,
1542 TransactionStatus::Sent,
1543 TransactionStatus::Submitted,
1544 ]
1545 })
1546 .returning(|_, _| Ok(0u64))
1547 .once();
1548
1549 let relayer_id_clone = relayer_model.id.clone();
1551 tx_repo
1552 .expect_find_by_status_paginated()
1553 .withf(move |relayer_id, statuses, query, oldest_first| {
1554 *relayer_id == relayer_id_clone
1555 && statuses == [TransactionStatus::Confirmed]
1556 && query.page == 1
1557 && query.per_page == 1
1558 && !(*oldest_first)
1559 })
1560 .returning(|_, _, _, _| {
1561 Ok(crate::repositories::PaginatedResult {
1562 items: vec![],
1563 total: 0,
1564 page: 1,
1565 per_page: 1,
1566 })
1567 })
1568 .once();
1569
1570 let relayer = EvmRelayer::new(
1571 relayer_model.clone(),
1572 signer,
1573 provider,
1574 create_test_evm_network(),
1575 Arc::new(relayer_repo),
1576 Arc::new(network_repo),
1577 Arc::new(tx_repo),
1578 Arc::new(counter),
1579 Arc::new(job_producer),
1580 )
1581 .unwrap();
1582
1583 let status = relayer.get_status().await.unwrap();
1584 match status {
1585 RelayerStatus::Evm {
1586 balance,
1587 pending_transactions_count,
1588 last_confirmed_transaction_timestamp,
1589 system_disabled,
1590 paused,
1591 nonce,
1592 } => {
1593 assert_eq!(balance, "1000000000000000000");
1594 assert_eq!(pending_transactions_count, 0);
1595 assert_eq!(last_confirmed_transaction_timestamp, None);
1596 assert_eq!(system_disabled, relayer_model.system_disabled);
1597 assert_eq!(paused, relayer_model.paused);
1598 assert_eq!(nonce, "10");
1599 }
1600 _ => panic!("Expected EVM RelayerStatus"),
1601 }
1602 }
1603
1604 #[tokio::test]
1605 async fn test_cancel_transaction_via_job_success() {
1606 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1607 setup_mocks();
1608 let relayer_model = create_test_relayer();
1609
1610 let test_transaction = TransactionRepoModel {
1611 id: "test-tx-id".to_string(),
1612 relayer_id: relayer_model.id.clone(),
1613 status: TransactionStatus::Pending,
1614 ..TransactionRepoModel::default()
1615 };
1616
1617 job_producer
1618 .expect_produce_submit_transaction_job()
1619 .withf(|job, delay| {
1620 matches!(job.command, crate::jobs::TransactionCommand::Cancel { ref reason }
1621 if job.transaction_id == "test-tx-id"
1622 && job.relayer_id == "test-relayer-id"
1623 && reason == "Cancelled via delete_pending_transactions")
1624 && delay.is_none()
1625 })
1626 .returning(|_, _| Box::pin(ready(Ok(()))))
1627 .once();
1628
1629 let relayer = EvmRelayer::new(
1630 relayer_model,
1631 signer,
1632 provider,
1633 create_test_evm_network(),
1634 Arc::new(relayer_repo),
1635 Arc::new(network_repo),
1636 Arc::new(tx_repo),
1637 Arc::new(counter),
1638 Arc::new(job_producer),
1639 )
1640 .unwrap();
1641
1642 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1643 assert!(result.is_ok());
1644 }
1645
1646 #[tokio::test]
1647 async fn test_cancel_transaction_via_job_failure() {
1648 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1649 setup_mocks();
1650 let relayer_model = create_test_relayer();
1651
1652 let test_transaction = TransactionRepoModel {
1653 id: "test-tx-id".to_string(),
1654 relayer_id: relayer_model.id.clone(),
1655 status: TransactionStatus::Pending,
1656 ..TransactionRepoModel::default()
1657 };
1658
1659 job_producer
1660 .expect_produce_submit_transaction_job()
1661 .returning(|_, _| {
1662 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1663 "Queue is full".to_string(),
1664 ))))
1665 })
1666 .once();
1667
1668 let relayer = EvmRelayer::new(
1669 relayer_model,
1670 signer,
1671 provider,
1672 create_test_evm_network(),
1673 Arc::new(relayer_repo),
1674 Arc::new(network_repo),
1675 Arc::new(tx_repo),
1676 Arc::new(counter),
1677 Arc::new(job_producer),
1678 )
1679 .unwrap();
1680
1681 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1682 assert!(result.is_err());
1683 match result.err().unwrap() {
1684 RelayerError::QueueError(_) => (),
1685 _ => panic!("Expected QueueError"),
1686 }
1687 }
1688
1689 #[tokio::test]
1690 async fn test_delete_pending_transactions_no_pending() {
1691 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1692 setup_mocks();
1693 let relayer_model = create_test_relayer();
1694
1695 tx_repo
1696 .expect_find_by_status()
1697 .withf(|relayer_id, statuses| {
1698 relayer_id == "test-relayer-id"
1699 && statuses
1700 == [
1701 TransactionStatus::Pending,
1702 TransactionStatus::Sent,
1703 TransactionStatus::Submitted,
1704 ]
1705 })
1706 .returning(|_, _| Ok(vec![]))
1707 .once();
1708
1709 let relayer = EvmRelayer::new(
1710 relayer_model,
1711 signer,
1712 provider,
1713 create_test_evm_network(),
1714 Arc::new(relayer_repo),
1715 Arc::new(network_repo),
1716 Arc::new(tx_repo),
1717 Arc::new(counter),
1718 Arc::new(job_producer),
1719 )
1720 .unwrap();
1721
1722 let result = relayer.delete_pending_transactions().await.unwrap();
1723 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1724 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1725 assert_eq!(result.total_processed, 0);
1726 }
1727
1728 #[tokio::test]
1729 async fn test_delete_pending_transactions_all_successful() {
1730 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1731 setup_mocks();
1732 let relayer_model = create_test_relayer();
1733
1734 let pending_transactions = vec![
1735 TransactionRepoModel {
1736 id: "tx1".to_string(),
1737 relayer_id: relayer_model.id.clone(),
1738 status: TransactionStatus::Pending,
1739 ..TransactionRepoModel::default()
1740 },
1741 TransactionRepoModel {
1742 id: "tx2".to_string(),
1743 relayer_id: relayer_model.id.clone(),
1744 status: TransactionStatus::Sent,
1745 ..TransactionRepoModel::default()
1746 },
1747 TransactionRepoModel {
1748 id: "tx3".to_string(),
1749 relayer_id: relayer_model.id.clone(),
1750 status: TransactionStatus::Submitted,
1751 ..TransactionRepoModel::default()
1752 },
1753 ];
1754
1755 tx_repo
1756 .expect_find_by_status()
1757 .withf(|relayer_id, statuses| {
1758 relayer_id == "test-relayer-id"
1759 && statuses
1760 == [
1761 TransactionStatus::Pending,
1762 TransactionStatus::Sent,
1763 TransactionStatus::Submitted,
1764 ]
1765 })
1766 .returning(move |_, _| Ok(pending_transactions.clone()))
1767 .once();
1768
1769 job_producer
1770 .expect_produce_submit_transaction_job()
1771 .returning(|_, _| Box::pin(ready(Ok(()))))
1772 .times(3);
1773
1774 let relayer = EvmRelayer::new(
1775 relayer_model,
1776 signer,
1777 provider,
1778 create_test_evm_network(),
1779 Arc::new(relayer_repo),
1780 Arc::new(network_repo),
1781 Arc::new(tx_repo),
1782 Arc::new(counter),
1783 Arc::new(job_producer),
1784 )
1785 .unwrap();
1786
1787 let result = relayer.delete_pending_transactions().await.unwrap();
1788 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 3);
1789 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1790 assert_eq!(result.total_processed, 3);
1791
1792 let expected_ids = vec!["tx1", "tx2", "tx3"];
1793 for id in expected_ids {
1794 assert!(result
1795 .queued_for_cancellation_transaction_ids
1796 .contains(&id.to_string()));
1797 }
1798 }
1799
1800 #[tokio::test]
1801 async fn test_delete_pending_transactions_partial_failures() {
1802 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1803 setup_mocks();
1804 let relayer_model = create_test_relayer();
1805
1806 let pending_transactions = vec![
1807 TransactionRepoModel {
1808 id: "tx1".to_string(),
1809 relayer_id: relayer_model.id.clone(),
1810 status: TransactionStatus::Pending,
1811 ..TransactionRepoModel::default()
1812 },
1813 TransactionRepoModel {
1814 id: "tx2".to_string(),
1815 relayer_id: relayer_model.id.clone(),
1816 status: TransactionStatus::Sent,
1817 ..TransactionRepoModel::default()
1818 },
1819 TransactionRepoModel {
1820 id: "tx3".to_string(),
1821 relayer_id: relayer_model.id.clone(),
1822 status: TransactionStatus::Submitted,
1823 ..TransactionRepoModel::default()
1824 },
1825 ];
1826
1827 tx_repo
1828 .expect_find_by_status()
1829 .withf(|relayer_id, statuses| {
1830 relayer_id == "test-relayer-id"
1831 && statuses
1832 == [
1833 TransactionStatus::Pending,
1834 TransactionStatus::Sent,
1835 TransactionStatus::Submitted,
1836 ]
1837 })
1838 .returning(move |_, _| Ok(pending_transactions.clone()))
1839 .once();
1840
1841 job_producer
1843 .expect_produce_submit_transaction_job()
1844 .returning(|_, _| Box::pin(ready(Ok(()))))
1845 .times(1);
1846 job_producer
1847 .expect_produce_submit_transaction_job()
1848 .returning(|_, _| {
1849 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1850 "Queue is full".to_string(),
1851 ))))
1852 })
1853 .times(1);
1854 job_producer
1855 .expect_produce_submit_transaction_job()
1856 .returning(|_, _| Box::pin(ready(Ok(()))))
1857 .times(1);
1858
1859 let relayer = EvmRelayer::new(
1860 relayer_model,
1861 signer,
1862 provider,
1863 create_test_evm_network(),
1864 Arc::new(relayer_repo),
1865 Arc::new(network_repo),
1866 Arc::new(tx_repo),
1867 Arc::new(counter),
1868 Arc::new(job_producer),
1869 )
1870 .unwrap();
1871
1872 let result = relayer.delete_pending_transactions().await.unwrap();
1873 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 2);
1874 assert_eq!(result.failed_to_queue_transaction_ids.len(), 1);
1875 assert_eq!(result.total_processed, 3);
1876 }
1877
1878 #[tokio::test]
1879 async fn test_delete_pending_transactions_repository_error() {
1880 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1881 setup_mocks();
1882 let relayer_model = create_test_relayer();
1883
1884 tx_repo
1885 .expect_find_by_status()
1886 .withf(|relayer_id, statuses| {
1887 relayer_id == "test-relayer-id"
1888 && statuses
1889 == [
1890 TransactionStatus::Pending,
1891 TransactionStatus::Sent,
1892 TransactionStatus::Submitted,
1893 ]
1894 })
1895 .returning(|_, _| {
1896 Err(RepositoryError::Unknown(
1897 "Database connection failed".to_string(),
1898 ))
1899 })
1900 .once();
1901
1902 let relayer = EvmRelayer::new(
1903 relayer_model,
1904 signer,
1905 provider,
1906 create_test_evm_network(),
1907 Arc::new(relayer_repo),
1908 Arc::new(network_repo),
1909 Arc::new(tx_repo),
1910 Arc::new(counter),
1911 Arc::new(job_producer),
1912 )
1913 .unwrap();
1914
1915 let result = relayer.delete_pending_transactions().await;
1916 assert!(result.is_err());
1917 match result.err().unwrap() {
1918 RelayerError::NetworkConfiguration(msg) => {
1919 assert!(msg.contains("Database connection failed"))
1920 }
1921 _ => panic!("Expected NetworkConfiguration error for repository failure"),
1922 }
1923 }
1924
1925 #[tokio::test]
1926 async fn test_delete_pending_transactions_all_failures() {
1927 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1928 setup_mocks();
1929 let relayer_model = create_test_relayer();
1930
1931 let pending_transactions = vec![
1932 TransactionRepoModel {
1933 id: "tx1".to_string(),
1934 relayer_id: relayer_model.id.clone(),
1935 status: TransactionStatus::Pending,
1936 ..TransactionRepoModel::default()
1937 },
1938 TransactionRepoModel {
1939 id: "tx2".to_string(),
1940 relayer_id: relayer_model.id.clone(),
1941 status: TransactionStatus::Sent,
1942 ..TransactionRepoModel::default()
1943 },
1944 ];
1945
1946 tx_repo
1947 .expect_find_by_status()
1948 .withf(|relayer_id, statuses| {
1949 relayer_id == "test-relayer-id"
1950 && statuses
1951 == [
1952 TransactionStatus::Pending,
1953 TransactionStatus::Sent,
1954 TransactionStatus::Submitted,
1955 ]
1956 })
1957 .returning(move |_, _| Ok(pending_transactions.clone()))
1958 .once();
1959
1960 job_producer
1961 .expect_produce_submit_transaction_job()
1962 .returning(|_, _| {
1963 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1964 "Queue is full".to_string(),
1965 ))))
1966 })
1967 .times(2);
1968
1969 let relayer = EvmRelayer::new(
1970 relayer_model,
1971 signer,
1972 provider,
1973 create_test_evm_network(),
1974 Arc::new(relayer_repo),
1975 Arc::new(network_repo),
1976 Arc::new(tx_repo),
1977 Arc::new(counter),
1978 Arc::new(job_producer),
1979 )
1980 .unwrap();
1981
1982 let result = relayer.delete_pending_transactions().await.unwrap();
1983 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1984 assert_eq!(result.failed_to_queue_transaction_ids.len(), 2);
1985 assert_eq!(result.total_processed, 2);
1986
1987 let expected_failed_ids = vec!["tx1", "tx2"];
1988 for id in expected_failed_ids {
1989 assert!(result
1990 .failed_to_queue_transaction_ids
1991 .contains(&id.to_string()));
1992 }
1993 }
1994
1995 #[tokio::test]
1996 async fn test_rpc_eth_get_balance() {
1997 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1998 setup_mocks();
1999 let relayer_model = create_test_relayer();
2000
2001 provider
2002 .expect_raw_request_dyn()
2003 .withf(|method, params| {
2004 method == "eth_getBalance"
2005 && params.as_str()
2006 == Some(r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#)
2007 })
2008 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0xde0b6b3a7640000")) }));
2009
2010 let relayer = EvmRelayer::new(
2011 relayer_model,
2012 signer,
2013 provider,
2014 create_test_evm_network(),
2015 Arc::new(relayer_repo),
2016 Arc::new(network_repo),
2017 Arc::new(tx_repo),
2018 Arc::new(counter),
2019 Arc::new(job_producer),
2020 )
2021 .unwrap();
2022
2023 let request = JsonRpcRequest {
2024 jsonrpc: "2.0".to_string(),
2025 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2026 method: "eth_getBalance".to_string(),
2027 params: serde_json::Value::String(
2028 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2029 ),
2030 }),
2031 id: Some(JsonRpcId::Number(1)),
2032 };
2033
2034 let response = relayer.rpc(request).await.unwrap();
2035 assert!(response.error.is_none());
2036 assert!(response.result.is_some());
2037
2038 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2039 assert_eq!(result, serde_json::json!("0xde0b6b3a7640000")); }
2041 }
2042
2043 #[tokio::test]
2044 async fn test_rpc_eth_block_number() {
2045 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2046 setup_mocks();
2047 let relayer_model = create_test_relayer();
2048
2049 provider
2050 .expect_raw_request_dyn()
2051 .withf(|method, params| method == "eth_blockNumber" && params.as_str() == Some("[]"))
2052 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x3039")) }));
2053
2054 let relayer = EvmRelayer::new(
2055 relayer_model,
2056 signer,
2057 provider,
2058 create_test_evm_network(),
2059 Arc::new(relayer_repo),
2060 Arc::new(network_repo),
2061 Arc::new(tx_repo),
2062 Arc::new(counter),
2063 Arc::new(job_producer),
2064 )
2065 .unwrap();
2066
2067 let request = JsonRpcRequest {
2068 jsonrpc: "2.0".to_string(),
2069 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2070 method: "eth_blockNumber".to_string(),
2071 params: serde_json::Value::String("[]".to_string()),
2072 }),
2073 id: Some(JsonRpcId::Number(1)),
2074 };
2075
2076 let response = relayer.rpc(request).await.unwrap();
2077 assert!(response.error.is_none());
2078 assert!(response.result.is_some());
2079
2080 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2081 assert_eq!(result, serde_json::json!("0x3039")); }
2083 }
2084
2085 #[tokio::test]
2086 async fn test_rpc_unsupported_method() {
2087 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2088 setup_mocks();
2089 let relayer_model = create_test_relayer();
2090
2091 provider
2092 .expect_raw_request_dyn()
2093 .withf(|method, _| method == "eth_unsupportedMethod")
2094 .returning(|_, _| {
2095 Box::pin(async {
2096 Err(ProviderError::Other(
2097 "Unsupported method: eth_unsupportedMethod".to_string(),
2098 ))
2099 })
2100 });
2101
2102 let relayer = EvmRelayer::new(
2103 relayer_model,
2104 signer,
2105 provider,
2106 create_test_evm_network(),
2107 Arc::new(relayer_repo),
2108 Arc::new(network_repo),
2109 Arc::new(tx_repo),
2110 Arc::new(counter),
2111 Arc::new(job_producer),
2112 )
2113 .unwrap();
2114
2115 let request = JsonRpcRequest {
2116 jsonrpc: "2.0".to_string(),
2117 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2118 method: "eth_unsupportedMethod".to_string(),
2119 params: serde_json::Value::String("[]".to_string()),
2120 }),
2121 id: Some(JsonRpcId::Number(1)),
2122 };
2123
2124 let response = relayer.rpc(request).await.unwrap();
2125 assert!(response.result.is_none());
2126 assert!(response.error.is_some());
2127
2128 let error = response.error.unwrap();
2129 assert_eq!(error.code, -32603); }
2131
2132 #[tokio::test]
2133 async fn test_rpc_invalid_params() {
2134 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2135 setup_mocks();
2136 let relayer_model = create_test_relayer();
2137
2138 provider
2139 .expect_raw_request_dyn()
2140 .withf(|method, params| method == "eth_getBalance" && params.as_str() == Some("[]"))
2141 .returning(|_, _| {
2142 Box::pin(async {
2143 Err(ProviderError::Other(
2144 "Missing address parameter".to_string(),
2145 ))
2146 })
2147 });
2148
2149 let relayer = EvmRelayer::new(
2150 relayer_model,
2151 signer,
2152 provider,
2153 create_test_evm_network(),
2154 Arc::new(relayer_repo),
2155 Arc::new(network_repo),
2156 Arc::new(tx_repo),
2157 Arc::new(counter),
2158 Arc::new(job_producer),
2159 )
2160 .unwrap();
2161
2162 let request = JsonRpcRequest {
2163 jsonrpc: "2.0".to_string(),
2164 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2165 method: "eth_getBalance".to_string(),
2166 params: serde_json::Value::String("[]".to_string()), }),
2168 id: Some(JsonRpcId::Number(1)),
2169 };
2170
2171 let response = relayer.rpc(request).await.unwrap();
2172 assert!(response.result.is_none());
2173 assert!(response.error.is_some());
2174
2175 let error = response.error.unwrap();
2176 assert_eq!(error.code, -32603); }
2178
2179 #[tokio::test]
2180 async fn test_rpc_non_evm_request() {
2181 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2182 setup_mocks();
2183 let relayer_model = create_test_relayer();
2184
2185 let relayer = EvmRelayer::new(
2186 relayer_model,
2187 signer,
2188 provider,
2189 create_test_evm_network(),
2190 Arc::new(relayer_repo),
2191 Arc::new(network_repo),
2192 Arc::new(tx_repo),
2193 Arc::new(counter),
2194 Arc::new(job_producer),
2195 )
2196 .unwrap();
2197
2198 let request = JsonRpcRequest {
2199 jsonrpc: "2.0".to_string(),
2200 params: NetworkRpcRequest::Solana(crate::models::SolanaRpcRequest::GetSupportedTokens(
2201 crate::models::SolanaGetSupportedTokensRequestParams {},
2202 )),
2203 id: Some(JsonRpcId::Number(1)),
2204 };
2205
2206 let response = relayer.rpc(request).await.unwrap();
2207 assert!(response.result.is_none());
2208 assert!(response.error.is_some());
2209
2210 let error = response.error.unwrap();
2211 assert_eq!(error.code, -32602); }
2213
2214 #[tokio::test]
2215 async fn test_rpc_raw_request_with_array_params() {
2216 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2217 setup_mocks();
2218 let relayer_model = create_test_relayer();
2219
2220 provider
2221 .expect_raw_request_dyn()
2222 .withf(|method, params| {
2223 method == "eth_getTransactionByHash"
2224 && params.as_array().is_some_and(|arr| {
2225 arr.len() == 1 && arr[0].as_str() == Some("0x1234567890abcdef")
2226 })
2227 })
2228 .returning(|_, _| {
2229 Box::pin(async {
2230 Ok(serde_json::json!({
2231 "hash": "0x1234567890abcdef",
2232 "blockNumber": "0x1",
2233 "gasUsed": "0x5208"
2234 }))
2235 })
2236 });
2237
2238 let relayer = EvmRelayer::new(
2239 relayer_model,
2240 signer,
2241 provider,
2242 create_test_evm_network(),
2243 Arc::new(relayer_repo),
2244 Arc::new(network_repo),
2245 Arc::new(tx_repo),
2246 Arc::new(counter),
2247 Arc::new(job_producer),
2248 )
2249 .unwrap();
2250
2251 let request = JsonRpcRequest {
2252 jsonrpc: "2.0".to_string(),
2253 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2254 method: "eth_getTransactionByHash".to_string(),
2255 params: serde_json::json!(["0x1234567890abcdef"]),
2256 }),
2257 id: Some(JsonRpcId::Number(42)),
2258 };
2259
2260 let response = relayer.rpc(request).await.unwrap();
2261 assert!(response.error.is_none());
2262 assert!(response.result.is_some());
2263 assert_eq!(response.id, Some(JsonRpcId::Number(42)));
2264
2265 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2266 assert!(result.get("hash").is_some());
2267 assert!(result.get("blockNumber").is_some());
2268 }
2269 }
2270
2271 #[tokio::test]
2272 async fn test_rpc_raw_request_with_object_params() {
2273 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2274 setup_mocks();
2275 let relayer_model = create_test_relayer();
2276
2277 provider
2278 .expect_raw_request_dyn()
2279 .withf(|method, params| {
2280 method == "eth_call"
2281 && params
2282 .as_object()
2283 .is_some_and(|obj| obj.contains_key("to") && obj.contains_key("data"))
2284 })
2285 .returning(|_, _| {
2286 Box::pin(async {
2287 Ok(serde_json::json!(
2288 "0x0000000000000000000000000000000000000000000000000000000000000001"
2289 ))
2290 })
2291 });
2292
2293 let relayer = EvmRelayer::new(
2294 relayer_model,
2295 signer,
2296 provider,
2297 create_test_evm_network(),
2298 Arc::new(relayer_repo),
2299 Arc::new(network_repo),
2300 Arc::new(tx_repo),
2301 Arc::new(counter),
2302 Arc::new(job_producer),
2303 )
2304 .unwrap();
2305
2306 let request = JsonRpcRequest {
2307 jsonrpc: "2.0".to_string(),
2308 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2309 method: "eth_call".to_string(),
2310 params: serde_json::json!({
2311 "to": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
2312 "data": "0x70a08231000000000000000000000000742d35cc6634c0532925a3b844bc454e4438f44e"
2313 }),
2314 }),
2315 id: Some(JsonRpcId::Number(123)),
2316 };
2317
2318 let response = relayer.rpc(request).await.unwrap();
2319 assert!(response.error.is_none());
2320 assert!(response.result.is_some());
2321 assert_eq!(response.id, Some(JsonRpcId::Number(123)));
2322 }
2323
2324 #[tokio::test]
2325 async fn test_rpc_generic_request_with_empty_params() {
2326 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2327 setup_mocks();
2328 let relayer_model = create_test_relayer();
2329
2330 provider
2331 .expect_raw_request_dyn()
2332 .withf(|method, params| method == "net_version" && params.as_str() == Some("[]"))
2333 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("1")) }));
2334
2335 let relayer = EvmRelayer::new(
2336 relayer_model,
2337 signer,
2338 provider,
2339 create_test_evm_network(),
2340 Arc::new(relayer_repo),
2341 Arc::new(network_repo),
2342 Arc::new(tx_repo),
2343 Arc::new(counter),
2344 Arc::new(job_producer),
2345 )
2346 .unwrap();
2347
2348 let request = JsonRpcRequest {
2349 jsonrpc: "2.0".to_string(),
2350 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2351 method: "net_version".to_string(),
2352 params: serde_json::Value::String("[]".to_string()),
2353 }),
2354 id: Some(JsonRpcId::Number(999)),
2355 };
2356
2357 let response = relayer.rpc(request).await.unwrap();
2358 assert!(response.error.is_none());
2359 assert!(response.result.is_some());
2360 assert_eq!(response.id, Some(JsonRpcId::Number(999)));
2361 }
2362
2363 #[tokio::test]
2364 async fn test_rpc_provider_invalid_address_error() {
2365 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2366 setup_mocks();
2367 let relayer_model = create_test_relayer();
2368
2369 provider.expect_raw_request_dyn().returning(|_, _| {
2370 Box::pin(async {
2371 Err(ProviderError::InvalidAddress(
2372 "Invalid address format".to_string(),
2373 ))
2374 })
2375 });
2376
2377 let relayer = EvmRelayer::new(
2378 relayer_model,
2379 signer,
2380 provider,
2381 create_test_evm_network(),
2382 Arc::new(relayer_repo),
2383 Arc::new(network_repo),
2384 Arc::new(tx_repo),
2385 Arc::new(counter),
2386 Arc::new(job_producer),
2387 )
2388 .unwrap();
2389
2390 let request = JsonRpcRequest {
2391 jsonrpc: "2.0".to_string(),
2392 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2393 method: "eth_getBalance".to_string(),
2394 params: serde_json::Value::String(r#"["invalid_address", "latest"]"#.to_string()),
2395 }),
2396 id: Some(JsonRpcId::Number(1)),
2397 };
2398
2399 let response = relayer.rpc(request).await.unwrap();
2400 assert!(response.result.is_none());
2401 assert!(response.error.is_some());
2402
2403 let error = response.error.unwrap();
2404 assert_eq!(error.code, -32602); }
2406
2407 #[tokio::test]
2408 async fn test_rpc_provider_network_configuration_error() {
2409 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2410 setup_mocks();
2411 let relayer_model = create_test_relayer();
2412
2413 provider.expect_raw_request_dyn().returning(|_, _| {
2414 Box::pin(async {
2415 Err(ProviderError::NetworkConfiguration(
2416 "Network not reachable".to_string(),
2417 ))
2418 })
2419 });
2420
2421 let relayer = EvmRelayer::new(
2422 relayer_model,
2423 signer,
2424 provider,
2425 create_test_evm_network(),
2426 Arc::new(relayer_repo),
2427 Arc::new(network_repo),
2428 Arc::new(tx_repo),
2429 Arc::new(counter),
2430 Arc::new(job_producer),
2431 )
2432 .unwrap();
2433
2434 let request = JsonRpcRequest {
2435 jsonrpc: "2.0".to_string(),
2436 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2437 method: "eth_chainId".to_string(),
2438 params: serde_json::Value::String("[]".to_string()),
2439 }),
2440 id: Some(JsonRpcId::Number(2)),
2441 };
2442
2443 let response = relayer.rpc(request).await.unwrap();
2444 assert!(response.result.is_none());
2445 assert!(response.error.is_some());
2446
2447 let error = response.error.unwrap();
2448 assert_eq!(error.code, -33004); }
2450
2451 #[tokio::test]
2452 async fn test_rpc_provider_timeout_error() {
2453 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2454 setup_mocks();
2455 let relayer_model = create_test_relayer();
2456
2457 provider
2458 .expect_raw_request_dyn()
2459 .returning(|_, _| Box::pin(async { Err(ProviderError::Timeout) }));
2460
2461 let relayer = EvmRelayer::new(
2462 relayer_model,
2463 signer,
2464 provider,
2465 create_test_evm_network(),
2466 Arc::new(relayer_repo),
2467 Arc::new(network_repo),
2468 Arc::new(tx_repo),
2469 Arc::new(counter),
2470 Arc::new(job_producer),
2471 )
2472 .unwrap();
2473
2474 let request = JsonRpcRequest {
2475 jsonrpc: "2.0".to_string(),
2476 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2477 method: "eth_blockNumber".to_string(),
2478 params: serde_json::json!([]),
2479 }),
2480 id: Some(JsonRpcId::Number(3)),
2481 };
2482
2483 let response = relayer.rpc(request).await.unwrap();
2484 assert!(response.result.is_none());
2485 assert!(response.error.is_some());
2486
2487 let error = response.error.unwrap();
2488 assert_eq!(error.code, -33000); }
2490
2491 #[tokio::test]
2492 async fn test_rpc_provider_rate_limited_error() {
2493 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2494 setup_mocks();
2495 let relayer_model = create_test_relayer();
2496
2497 provider
2498 .expect_raw_request_dyn()
2499 .returning(|_, _| Box::pin(async { Err(ProviderError::RateLimited) }));
2500
2501 let relayer = EvmRelayer::new(
2502 relayer_model,
2503 signer,
2504 provider,
2505 create_test_evm_network(),
2506 Arc::new(relayer_repo),
2507 Arc::new(network_repo),
2508 Arc::new(tx_repo),
2509 Arc::new(counter),
2510 Arc::new(job_producer),
2511 )
2512 .unwrap();
2513
2514 let request = JsonRpcRequest {
2515 jsonrpc: "2.0".to_string(),
2516 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2517 method: "eth_getBalance".to_string(),
2518 params: serde_json::Value::String(
2519 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2520 ),
2521 }),
2522 id: Some(JsonRpcId::Number(4)),
2523 };
2524
2525 let response = relayer.rpc(request).await.unwrap();
2526 assert!(response.result.is_none());
2527 assert!(response.error.is_some());
2528
2529 let error = response.error.unwrap();
2530 assert_eq!(error.code, -33001); }
2532
2533 #[tokio::test]
2534 async fn test_rpc_provider_bad_gateway_error() {
2535 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2536 setup_mocks();
2537 let relayer_model = create_test_relayer();
2538
2539 provider
2540 .expect_raw_request_dyn()
2541 .returning(|_, _| Box::pin(async { Err(ProviderError::BadGateway) }));
2542
2543 let relayer = EvmRelayer::new(
2544 relayer_model,
2545 signer,
2546 provider,
2547 create_test_evm_network(),
2548 Arc::new(relayer_repo),
2549 Arc::new(network_repo),
2550 Arc::new(tx_repo),
2551 Arc::new(counter),
2552 Arc::new(job_producer),
2553 )
2554 .unwrap();
2555
2556 let request = JsonRpcRequest {
2557 jsonrpc: "2.0".to_string(),
2558 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2559 method: "eth_gasPrice".to_string(),
2560 params: serde_json::json!([]),
2561 }),
2562 id: Some(JsonRpcId::Number(5)),
2563 };
2564
2565 let response = relayer.rpc(request).await.unwrap();
2566 assert!(response.result.is_none());
2567 assert!(response.error.is_some());
2568
2569 let error = response.error.unwrap();
2570 assert_eq!(error.code, -33002); }
2572
2573 #[tokio::test]
2574 async fn test_rpc_provider_request_error() {
2575 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2576 setup_mocks();
2577 let relayer_model = create_test_relayer();
2578
2579 provider.expect_raw_request_dyn().returning(|_, _| {
2580 Box::pin(async {
2581 Err(ProviderError::RequestError {
2582 error: "Bad request".to_string(),
2583 status_code: 400,
2584 })
2585 })
2586 });
2587
2588 let relayer = EvmRelayer::new(
2589 relayer_model,
2590 signer,
2591 provider,
2592 create_test_evm_network(),
2593 Arc::new(relayer_repo),
2594 Arc::new(network_repo),
2595 Arc::new(tx_repo),
2596 Arc::new(counter),
2597 Arc::new(job_producer),
2598 )
2599 .unwrap();
2600
2601 let request = JsonRpcRequest {
2602 jsonrpc: "2.0".to_string(),
2603 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2604 method: "invalid_method".to_string(),
2605 params: serde_json::Value::String("{}".to_string()),
2606 }),
2607 id: Some(JsonRpcId::Number(6)),
2608 };
2609
2610 let response = relayer.rpc(request).await.unwrap();
2611 assert!(response.result.is_none());
2612 assert!(response.error.is_some());
2613
2614 let error = response.error.unwrap();
2615 assert_eq!(error.code, -33003); }
2617
2618 #[tokio::test]
2619 async fn test_rpc_provider_other_error() {
2620 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2621 setup_mocks();
2622 let relayer_model = create_test_relayer();
2623
2624 provider.expect_raw_request_dyn().returning(|_, _| {
2625 Box::pin(async {
2626 Err(ProviderError::Other(
2627 "Unexpected error occurred".to_string(),
2628 ))
2629 })
2630 });
2631
2632 let relayer = EvmRelayer::new(
2633 relayer_model,
2634 signer,
2635 provider,
2636 create_test_evm_network(),
2637 Arc::new(relayer_repo),
2638 Arc::new(network_repo),
2639 Arc::new(tx_repo),
2640 Arc::new(counter),
2641 Arc::new(job_producer),
2642 )
2643 .unwrap();
2644
2645 let request = JsonRpcRequest {
2646 jsonrpc: "2.0".to_string(),
2647 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2648 method: "eth_getBalance".to_string(),
2649 params: serde_json::json!(["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]),
2650 }),
2651 id: Some(JsonRpcId::Number(7)),
2652 };
2653
2654 let response = relayer.rpc(request).await.unwrap();
2655 assert!(response.result.is_none());
2656 assert!(response.error.is_some());
2657
2658 let error = response.error.unwrap();
2659 assert_eq!(error.code, -32603); }
2661
2662 #[tokio::test]
2663 async fn test_rpc_response_preserves_request_id() {
2664 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2665 setup_mocks();
2666 let relayer_model = create_test_relayer();
2667
2668 provider
2669 .expect_raw_request_dyn()
2670 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x1")) }));
2671
2672 let relayer = EvmRelayer::new(
2673 relayer_model,
2674 signer,
2675 provider,
2676 create_test_evm_network(),
2677 Arc::new(relayer_repo),
2678 Arc::new(network_repo),
2679 Arc::new(tx_repo),
2680 Arc::new(counter),
2681 Arc::new(job_producer),
2682 )
2683 .unwrap();
2684
2685 let request_id = u64::MAX;
2686 let request = JsonRpcRequest {
2687 jsonrpc: "2.0".to_string(),
2688 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2689 method: "eth_chainId".to_string(),
2690 params: serde_json::Value::String("[]".to_string()),
2691 }),
2692 id: Some(JsonRpcId::Number(request_id as i64)),
2693 };
2694
2695 let response = relayer.rpc(request).await.unwrap();
2696 assert_eq!(response.id, Some(JsonRpcId::Number(request_id as i64)));
2697 assert_eq!(response.jsonrpc, "2.0");
2698 }
2699
2700 #[tokio::test]
2701 async fn test_rpc_handles_complex_json_response() {
2702 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2703 setup_mocks();
2704 let relayer_model = create_test_relayer();
2705
2706 let complex_response = serde_json::json!({
2707 "number": "0x1b4",
2708 "hash": "0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae",
2709 "parentHash": "0xe99e022112df268ce40b8b654759b4f39c3cc1b8c86b2f4c7da48ba6d8a6ae8b",
2710 "transactions": [
2711 {
2712 "hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060",
2713 "from": "0xa7d9ddbe1f17865597fbd27ec712455208b6b76d",
2714 "to": "0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb",
2715 "value": "0xf3dbb76162000"
2716 }
2717 ],
2718 "gasUsed": "0x5208"
2719 });
2720
2721 provider.expect_raw_request_dyn().returning(move |_, _| {
2722 let response = complex_response.clone();
2723 Box::pin(async move { Ok(response) })
2724 });
2725
2726 let relayer = EvmRelayer::new(
2727 relayer_model,
2728 signer,
2729 provider,
2730 create_test_evm_network(),
2731 Arc::new(relayer_repo),
2732 Arc::new(network_repo),
2733 Arc::new(tx_repo),
2734 Arc::new(counter),
2735 Arc::new(job_producer),
2736 )
2737 .unwrap();
2738
2739 let request = JsonRpcRequest {
2740 jsonrpc: "2.0".to_string(),
2741 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2742 method: "eth_getBlockByNumber".to_string(),
2743 params: serde_json::json!(["0x1b4", true]),
2744 }),
2745 id: Some(JsonRpcId::Number(8)),
2746 };
2747
2748 let response = relayer.rpc(request).await.unwrap();
2749 assert!(response.error.is_none());
2750 assert!(response.result.is_some());
2751
2752 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2753 assert!(result.get("transactions").is_some());
2754 assert!(result.get("hash").is_some());
2755 assert!(result.get("gasUsed").is_some());
2756 }
2757 }
2758
2759 #[tokio::test]
2760 async fn test_initialize_relayer_disables_when_validation_fails() {
2761 let (
2762 mut provider,
2763 mut relayer_repo,
2764 network_repo,
2765 tx_repo,
2766 mut job_producer,
2767 signer,
2768 mut counter,
2769 ) = setup_mocks();
2770 let mut relayer_model = create_test_relayer();
2771 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2773
2774 provider
2776 .expect_get_transaction_count()
2777 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
2778
2779 counter
2780 .expect_get()
2781 .returning(|| Box::pin(ready(Ok(Some(0u64)))));
2782
2783 provider
2785 .expect_get_balance()
2786 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64)))));
2787
2788 provider
2789 .expect_health_check()
2790 .returning(|| Box::pin(ready(Ok(true))));
2791
2792 let mut disabled_relayer = relayer_model.clone();
2794 disabled_relayer.system_disabled = true;
2795 relayer_repo
2796 .expect_disable_relayer()
2797 .with(eq("test-relayer-id".to_string()), always())
2798 .returning(move |_, _| Ok(disabled_relayer.clone()));
2799
2800 job_producer
2802 .expect_produce_send_notification_job()
2803 .returning(|_, _| Box::pin(ready(Ok(()))));
2804
2805 job_producer
2807 .expect_produce_relayer_health_check_job()
2808 .returning(|_, _| Box::pin(ready(Ok(()))));
2809
2810 let relayer = EvmRelayer::new(
2811 relayer_model,
2812 signer,
2813 provider,
2814 create_test_evm_network(),
2815 Arc::new(relayer_repo),
2816 Arc::new(network_repo),
2817 Arc::new(tx_repo),
2818 Arc::new(counter),
2819 Arc::new(job_producer),
2820 )
2821 .unwrap();
2822
2823 let result = relayer.initialize_relayer().await;
2824 assert!(result.is_ok());
2825 }
2826
2827 #[tokio::test]
2828 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
2829 let (
2830 mut provider,
2831 mut relayer_repo,
2832 network_repo,
2833 tx_repo,
2834 job_producer,
2835 signer,
2836 mut counter,
2837 ) = setup_mocks();
2838 let mut relayer_model = create_test_relayer();
2839 relayer_model.system_disabled = true; provider
2843 .expect_get_transaction_count()
2844 .returning(|_| Box::pin(ready(Ok(42u64))));
2845
2846 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2847
2848 counter
2849 .expect_get()
2850 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2851
2852 provider
2853 .expect_get_balance()
2854 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2857 .expect_health_check()
2858 .returning(|| Box::pin(ready(Ok(true))));
2859
2860 let mut enabled_relayer = relayer_model.clone();
2862 enabled_relayer.system_disabled = false;
2863 relayer_repo
2864 .expect_enable_relayer()
2865 .with(eq("test-relayer-id".to_string()))
2866 .returning(move |_| Ok(enabled_relayer.clone()));
2867
2868 let relayer = EvmRelayer::new(
2869 relayer_model,
2870 signer,
2871 provider,
2872 create_test_evm_network(),
2873 Arc::new(relayer_repo),
2874 Arc::new(network_repo),
2875 Arc::new(tx_repo),
2876 Arc::new(counter),
2877 Arc::new(job_producer),
2878 )
2879 .unwrap();
2880
2881 let result = relayer.initialize_relayer().await;
2882 assert!(result.is_ok());
2883 }
2884
2885 #[tokio::test]
2886 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
2887 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
2888 setup_mocks();
2889 let mut relayer_model = create_test_relayer();
2890 relayer_model.system_disabled = false; provider
2894 .expect_get_transaction_count()
2895 .returning(|_| Box::pin(ready(Ok(42u64))));
2896
2897 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2898
2899 counter
2900 .expect_get()
2901 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2902
2903 provider
2904 .expect_get_balance()
2905 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2908 .expect_health_check()
2909 .returning(|| Box::pin(ready(Ok(true))));
2910
2911 let relayer = EvmRelayer::new(
2914 relayer_model,
2915 signer,
2916 provider,
2917 create_test_evm_network(),
2918 Arc::new(relayer_repo),
2919 Arc::new(network_repo),
2920 Arc::new(tx_repo),
2921 Arc::new(counter),
2922 Arc::new(job_producer),
2923 )
2924 .unwrap();
2925
2926 let result = relayer.initialize_relayer().await;
2927 assert!(result.is_ok());
2928 }
2929
2930 #[tokio::test]
2931 async fn test_initialize_relayer_sends_notification_when_disabled() {
2932 let (
2933 mut provider,
2934 mut relayer_repo,
2935 network_repo,
2936 tx_repo,
2937 mut job_producer,
2938 signer,
2939 mut counter,
2940 ) = setup_mocks();
2941 let mut relayer_model = create_test_relayer();
2942 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2944
2945 provider
2947 .expect_get_transaction_count()
2948 .returning(|_| Box::pin(ready(Ok(42u64))));
2949
2950 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2951
2952 counter
2953 .expect_get()
2954 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2955
2956 provider
2957 .expect_get_balance()
2958 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider.expect_health_check().returning(|| {
2961 Box::pin(ready(Err(ProviderError::Other(
2962 "RPC validation failed".to_string(),
2963 ))))
2964 });
2965
2966 let mut disabled_relayer = relayer_model.clone();
2968 disabled_relayer.system_disabled = true;
2969 relayer_repo
2970 .expect_disable_relayer()
2971 .with(eq("test-relayer-id".to_string()), always())
2972 .returning(move |_, _| Ok(disabled_relayer.clone()));
2973
2974 job_producer
2976 .expect_produce_send_notification_job()
2977 .returning(|_, _| Box::pin(ready(Ok(()))));
2978
2979 job_producer
2981 .expect_produce_relayer_health_check_job()
2982 .returning(|_, _| Box::pin(ready(Ok(()))));
2983
2984 let relayer = EvmRelayer::new(
2985 relayer_model,
2986 signer,
2987 provider,
2988 create_test_evm_network(),
2989 Arc::new(relayer_repo),
2990 Arc::new(network_repo),
2991 Arc::new(tx_repo),
2992 Arc::new(counter),
2993 Arc::new(job_producer),
2994 )
2995 .unwrap();
2996
2997 let result = relayer.initialize_relayer().await;
2998 assert!(result.is_ok());
2999 }
3000
3001 #[tokio::test]
3002 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
3003 let (
3004 mut provider,
3005 mut relayer_repo,
3006 network_repo,
3007 tx_repo,
3008 mut job_producer,
3009 signer,
3010 mut counter,
3011 ) = setup_mocks();
3012 let mut relayer_model = create_test_relayer();
3013 relayer_model.system_disabled = false; relayer_model.notification_id = None; provider
3018 .expect_get_transaction_count()
3019 .returning(|_| Box::pin(ready(Ok(42u64))));
3020
3021 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
3022
3023 counter
3024 .expect_get()
3025 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
3026
3027 provider
3028 .expect_get_balance()
3029 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); provider
3032 .expect_health_check()
3033 .returning(|| Box::pin(ready(Ok(true))));
3034
3035 let mut disabled_relayer = relayer_model.clone();
3037 disabled_relayer.system_disabled = true;
3038 relayer_repo
3039 .expect_disable_relayer()
3040 .with(eq("test-relayer-id".to_string()), always())
3041 .returning(move |_, _| Ok(disabled_relayer.clone()));
3042
3043 job_producer
3046 .expect_produce_relayer_health_check_job()
3047 .returning(|_, _| Box::pin(ready(Ok(()))));
3048
3049 let relayer = EvmRelayer::new(
3050 relayer_model,
3051 signer,
3052 provider,
3053 create_test_evm_network(),
3054 Arc::new(relayer_repo),
3055 Arc::new(network_repo),
3056 Arc::new(tx_repo),
3057 Arc::new(counter),
3058 Arc::new(job_producer),
3059 )
3060 .unwrap();
3061
3062 let result = relayer.initialize_relayer().await;
3063 assert!(result.is_ok());
3064 }
3065}