openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{
31        DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32    },
33    repositories::{PaginatedResult, Repository},
34    utils::RedisConnections,
35};
36use async_trait::async_trait;
37use deadpool_redis::Pool;
38use std::sync::Arc;
39
40#[async_trait]
41pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
42    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
43    async fn list_by_signer_id(
44        &self,
45        signer_id: &str,
46    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
47    async fn list_by_notification_id(
48        &self,
49        notification_id: &str,
50    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
51    async fn partial_update(
52        &self,
53        id: String,
54        update: UpdateRelayerRequest,
55    ) -> Result<RelayerRepoModel, RepositoryError>;
56    async fn enable_relayer(&self, relayer_id: String)
57        -> Result<RelayerRepoModel, RepositoryError>;
58    async fn disable_relayer(
59        &self,
60        relayer_id: String,
61        reason: DisabledReason,
62    ) -> Result<RelayerRepoModel, RepositoryError>;
63    async fn update_policy(
64        &self,
65        id: String,
66        policy: RelayerNetworkPolicy,
67    ) -> Result<RelayerRepoModel, RepositoryError>;
68    /// Returns true if this repository uses persistent storage (e.g., Redis).
69    /// Returns false for in-memory storage.
70    fn is_persistent_storage(&self) -> bool;
71
72    /// Returns connection info for distributed operations.
73    ///
74    /// This method provides access to the underlying connection and key prefix
75    /// when using persistent storage. This is useful for distributed locking and
76    /// other coordination operations that need direct storage access.
77    ///
78    /// # Returns
79    /// * `Some((pool, prefix))` - If using persistent storage (e.g., Redis)
80    /// * `None` - If using in-memory storage (default)
81    fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
82        None
83    }
84}
85
86#[cfg(test)]
87mockall::mock! {
88    pub RelayerRepository {}
89
90    #[async_trait]
91    impl Repository<RelayerRepoModel, String> for RelayerRepository {
92        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
93        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
94        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
95        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
96        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
97        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
98        async fn count(&self) -> Result<usize, RepositoryError>;
99        async fn has_entries(&self) -> Result<bool, RepositoryError>;
100        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
101    }
102
103    #[async_trait]
104    impl RelayerRepository for RelayerRepository {
105        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
106        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
107        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
108        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
109        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
110        async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
111        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
112        fn is_persistent_storage(&self) -> bool;
113        fn connection_info(&self) -> Option<(Arc<Pool>, String)>;
114    }
115}
116
117/// Enum wrapper for different relayer repository implementations
118#[derive(Debug, Clone)]
119pub enum RelayerRepositoryStorage {
120    InMemory(InMemoryRelayerRepository),
121    Redis(RedisRelayerRepository),
122}
123
124impl RelayerRepositoryStorage {
125    pub fn new_in_memory() -> Self {
126        Self::InMemory(InMemoryRelayerRepository::new())
127    }
128
129    pub fn new_redis(
130        connections: Arc<RedisConnections>,
131        key_prefix: String,
132    ) -> Result<Self, RepositoryError> {
133        Ok(Self::Redis(RedisRelayerRepository::new(
134            connections,
135            key_prefix,
136        )?))
137    }
138
139    /// Returns connection info for distributed operations.
140    ///
141    /// This method provides access to the underlying Redis connection and key prefix
142    /// when using Redis-backed storage. This is useful for distributed locking and
143    /// other coordination operations that need direct Redis access.
144    ///
145    /// # Returns
146    /// * `Some((pool, prefix))` - If using persistent storage (e.g., Redis)
147    /// * `None` - If using in-memory storage
148    pub fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
149        match self {
150            RelayerRepositoryStorage::InMemory(_) => None,
151            RelayerRepositoryStorage::Redis(repo) => {
152                Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
153            }
154        }
155    }
156}
157
158impl Default for RelayerRepositoryStorage {
159    fn default() -> Self {
160        Self::new_in_memory()
161    }
162}
163
164#[async_trait]
165impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
166    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
167        match self {
168            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
169            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
170        }
171    }
172
173    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
174        match self {
175            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
176            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
177        }
178    }
179
180    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
181        match self {
182            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
183            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
184        }
185    }
186
187    async fn list_paginated(
188        &self,
189        query: PaginationQuery,
190    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
191        match self {
192            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
193            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
194        }
195    }
196
197    async fn update(
198        &self,
199        id: String,
200        entity: RelayerRepoModel,
201    ) -> Result<RelayerRepoModel, RepositoryError> {
202        match self {
203            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
204            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
205        }
206    }
207
208    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
209        match self {
210            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
211            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
212        }
213    }
214
215    async fn count(&self) -> Result<usize, RepositoryError> {
216        match self {
217            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
218            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
219        }
220    }
221
222    async fn has_entries(&self) -> Result<bool, RepositoryError> {
223        match self {
224            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
225            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
226        }
227    }
228
229    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
230        match self {
231            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
232            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
233        }
234    }
235}
236
237#[async_trait]
238impl RelayerRepository for RelayerRepositoryStorage {
239    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
240        match self {
241            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
242            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
243        }
244    }
245
246    async fn list_by_signer_id(
247        &self,
248        signer_id: &str,
249    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
250        match self {
251            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
252            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
253        }
254    }
255
256    async fn list_by_notification_id(
257        &self,
258        notification_id: &str,
259    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
260        match self {
261            RelayerRepositoryStorage::InMemory(repo) => {
262                repo.list_by_notification_id(notification_id).await
263            }
264            RelayerRepositoryStorage::Redis(repo) => {
265                repo.list_by_notification_id(notification_id).await
266            }
267        }
268    }
269
270    async fn partial_update(
271        &self,
272        id: String,
273        update: UpdateRelayerRequest,
274    ) -> Result<RelayerRepoModel, RepositoryError> {
275        match self {
276            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
277            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
278        }
279    }
280
281    async fn enable_relayer(
282        &self,
283        relayer_id: String,
284    ) -> Result<RelayerRepoModel, RepositoryError> {
285        match self {
286            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
287            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
288        }
289    }
290
291    async fn disable_relayer(
292        &self,
293        relayer_id: String,
294        reason: DisabledReason,
295    ) -> Result<RelayerRepoModel, RepositoryError> {
296        match self {
297            RelayerRepositoryStorage::InMemory(repo) => {
298                repo.disable_relayer(relayer_id, reason).await
299            }
300            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
301        }
302    }
303
304    async fn update_policy(
305        &self,
306        id: String,
307        policy: RelayerNetworkPolicy,
308    ) -> Result<RelayerRepoModel, RepositoryError> {
309        match self {
310            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
311            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
312        }
313    }
314
315    fn is_persistent_storage(&self) -> bool {
316        match self {
317            RelayerRepositoryStorage::InMemory(_) => false,
318            RelayerRepositoryStorage::Redis(_) => true,
319        }
320    }
321
322    fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
323        match self {
324            RelayerRepositoryStorage::InMemory(_) => None,
325            RelayerRepositoryStorage::Redis(repo) => {
326                Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
327            }
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
336
337    fn create_test_relayer(id: String) -> RelayerRepoModel {
338        RelayerRepoModel {
339            id: id.clone(),
340            name: format!("Relayer {}", id.clone()),
341            network: "TestNet".to_string(),
342            paused: false,
343            network_type: NetworkType::Evm,
344            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
345                include_revert_data: None,
346                min_balance: Some(0),
347                gas_limit_estimation: Some(true),
348                gas_price_cap: None,
349                whitelist_receivers: None,
350                eip1559_pricing: Some(false),
351                private_transactions: Some(false),
352            }),
353            signer_id: "test".to_string(),
354            address: "0x".to_string(),
355            notification_id: None,
356            system_disabled: false,
357            custom_rpc_urls: None,
358            ..Default::default()
359        }
360    }
361
362    #[actix_web::test]
363    async fn test_in_memory_repository_impl() {
364        let impl_repo = RelayerRepositoryStorage::new_in_memory();
365        let relayer = create_test_relayer("test-relayer".to_string());
366
367        // Test create
368        let created = impl_repo.create(relayer.clone()).await.unwrap();
369        assert_eq!(created.id, relayer.id);
370
371        // Test get
372        let retrieved = impl_repo
373            .get_by_id("test-relayer".to_string())
374            .await
375            .unwrap();
376        assert_eq!(retrieved.id, relayer.id);
377
378        // Test list all
379        let all_relayers = impl_repo.list_all().await.unwrap();
380        assert!(!all_relayers.is_empty());
381
382        // Test count
383        let count = impl_repo.count().await.unwrap();
384        assert!(count >= 1);
385
386        // Test update
387        let mut updated_relayer = relayer.clone();
388        updated_relayer.name = "Updated Name".to_string();
389        let updated = impl_repo
390            .update(relayer.id.clone(), updated_relayer)
391            .await
392            .unwrap();
393        assert_eq!(updated.name, "Updated Name");
394
395        // Test delete
396        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
397        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
398        assert!(get_result.is_err());
399    }
400
401    #[actix_web::test]
402    async fn test_relayer_repository_trait_methods() {
403        let impl_repo = RelayerRepositoryStorage::new_in_memory();
404        let relayer = create_test_relayer("test-relayer".to_string());
405
406        // Create the relayer first
407        impl_repo.create(relayer.clone()).await.unwrap();
408
409        // Test list_active
410        let active_relayers = impl_repo.list_active().await.unwrap();
411        assert!(!active_relayers.is_empty());
412
413        // Test partial_update
414        let update = UpdateRelayerRequest {
415            paused: Some(true),
416            ..Default::default()
417        };
418        let updated = impl_repo
419            .partial_update(relayer.id.clone(), update)
420            .await
421            .unwrap();
422        assert!(updated.paused);
423
424        // Test enable/disable
425        let disabled = impl_repo
426            .disable_relayer(
427                relayer.id.clone(),
428                DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
429            )
430            .await
431            .unwrap();
432        assert!(disabled.system_disabled);
433        assert_eq!(
434            disabled.disabled_reason,
435            Some(DisabledReason::BalanceCheckFailed(
436                "Test disable reason".to_string()
437            ))
438        );
439
440        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
441        assert!(!enabled.system_disabled);
442        assert_eq!(enabled.disabled_reason, None);
443
444        // Test update_policy
445        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
446            include_revert_data: None,
447            min_balance: Some(1000000000000000000),
448            gas_limit_estimation: Some(true),
449            gas_price_cap: Some(50_000_000_000),
450            whitelist_receivers: None,
451            eip1559_pricing: Some(true),
452            private_transactions: Some(false),
453        });
454        let policy_updated = impl_repo
455            .update_policy(relayer.id.clone(), new_policy)
456            .await
457            .unwrap();
458
459        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
460            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
461            assert_eq!(evm_policy.eip1559_pricing, Some(true));
462        } else {
463            panic!("Expected EVM policy");
464        }
465    }
466
467    #[actix_web::test]
468    async fn test_create_repository_in_memory() {
469        let result = RelayerRepositoryStorage::new_in_memory();
470
471        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
472    }
473
474    #[actix_web::test]
475    async fn test_pagination() {
476        let impl_repo = RelayerRepositoryStorage::new_in_memory();
477        let relayer1 = create_test_relayer("test-relayer-1".to_string());
478        let relayer2 = create_test_relayer("test-relayer-2".to_string());
479
480        impl_repo.create(relayer1).await.unwrap();
481        impl_repo.create(relayer2).await.unwrap();
482
483        let query = PaginationQuery {
484            page: 1,
485            per_page: 10,
486        };
487
488        let result = impl_repo.list_paginated(query).await.unwrap();
489        assert!(result.total >= 2);
490        assert_eq!(result.page, 1);
491        assert_eq!(result.per_page, 10);
492    }
493
494    #[actix_web::test]
495    async fn test_delete_relayer() {
496        let impl_repo = RelayerRepositoryStorage::new_in_memory();
497        let relayer = create_test_relayer("delete-test".to_string());
498
499        // Create relayer
500        impl_repo.create(relayer.clone()).await.unwrap();
501
502        // Delete relayer
503        impl_repo
504            .delete_by_id("delete-test".to_string())
505            .await
506            .unwrap();
507
508        // Verify deletion
509        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
510        assert!(get_result.is_err());
511        assert!(matches!(
512            get_result.unwrap_err(),
513            RepositoryError::NotFound(_)
514        ));
515
516        // Test deleting non-existent relayer
517        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
518        assert!(delete_result.is_err());
519    }
520
521    #[actix_web::test]
522    async fn test_has_entries() {
523        let repo = InMemoryRelayerRepository::new();
524        assert!(!repo.has_entries().await.unwrap());
525
526        let relayer = create_test_relayer("test".to_string());
527
528        repo.create(relayer.clone()).await.unwrap();
529        assert!(repo.has_entries().await.unwrap());
530
531        repo.delete_by_id(relayer.id.clone()).await.unwrap();
532        assert!(!repo.has_entries().await.unwrap());
533    }
534
535    #[actix_web::test]
536    async fn test_drop_all_entries() {
537        let repo = InMemoryRelayerRepository::new();
538        let relayer = create_test_relayer("test".to_string());
539
540        repo.create(relayer.clone()).await.unwrap();
541        assert!(repo.has_entries().await.unwrap());
542
543        repo.drop_all_entries().await.unwrap();
544        assert!(!repo.has_entries().await.unwrap());
545    }
546
547    #[tokio::test]
548    async fn test_connection_info_returns_none_for_in_memory() {
549        let storage = RelayerRepositoryStorage::new_in_memory();
550
551        // In-memory storage should return None for connection_info
552        assert!(storage.connection_info().is_none());
553    }
554
555    #[tokio::test]
556    async fn test_is_persistent_storage_returns_false_for_in_memory() {
557        let storage = RelayerRepositoryStorage::new_in_memory();
558
559        // In-memory storage should return false for is_persistent_storage
560        assert!(!storage.is_persistent_storage());
561    }
562
563    #[tokio::test]
564    async fn test_trait_connection_info_returns_none_for_in_memory() {
565        let storage = RelayerRepositoryStorage::new_in_memory();
566
567        // Test the RelayerRepository trait's connection_info method
568        let trait_ref: &dyn RelayerRepository = &storage;
569        assert!(trait_ref.connection_info().is_none());
570    }
571
572    #[tokio::test]
573    async fn test_struct_connection_info_returns_none_for_in_memory() {
574        let storage = RelayerRepositoryStorage::new_in_memory();
575
576        // Test the struct's own connection_info method
577        let result: Option<(Arc<Pool>, String)> = storage.connection_info();
578        assert!(result.is_none());
579    }
580}