1mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29 models::UpdateRelayerRequest,
30 models::{
31 DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32 },
33 repositories::{PaginatedResult, Repository},
34 utils::RedisConnections,
35};
36use async_trait::async_trait;
37use deadpool_redis::Pool;
38use std::sync::Arc;
39
40#[async_trait]
41pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
42 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
43 async fn list_by_signer_id(
44 &self,
45 signer_id: &str,
46 ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
47 async fn list_by_notification_id(
48 &self,
49 notification_id: &str,
50 ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
51 async fn partial_update(
52 &self,
53 id: String,
54 update: UpdateRelayerRequest,
55 ) -> Result<RelayerRepoModel, RepositoryError>;
56 async fn enable_relayer(&self, relayer_id: String)
57 -> Result<RelayerRepoModel, RepositoryError>;
58 async fn disable_relayer(
59 &self,
60 relayer_id: String,
61 reason: DisabledReason,
62 ) -> Result<RelayerRepoModel, RepositoryError>;
63 async fn update_policy(
64 &self,
65 id: String,
66 policy: RelayerNetworkPolicy,
67 ) -> Result<RelayerRepoModel, RepositoryError>;
68 fn is_persistent_storage(&self) -> bool;
71
72 fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
82 None
83 }
84}
85
86#[cfg(test)]
87mockall::mock! {
88 pub RelayerRepository {}
89
90 #[async_trait]
91 impl Repository<RelayerRepoModel, String> for RelayerRepository {
92 async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
93 async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
94 async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
95 async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
96 async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
97 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
98 async fn count(&self) -> Result<usize, RepositoryError>;
99 async fn has_entries(&self) -> Result<bool, RepositoryError>;
100 async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
101 }
102
103 #[async_trait]
104 impl RelayerRepository for RelayerRepository {
105 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
106 async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
107 async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
108 async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
109 async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
110 async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
111 async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
112 fn is_persistent_storage(&self) -> bool;
113 fn connection_info(&self) -> Option<(Arc<Pool>, String)>;
114 }
115}
116
117#[derive(Debug, Clone)]
119pub enum RelayerRepositoryStorage {
120 InMemory(InMemoryRelayerRepository),
121 Redis(RedisRelayerRepository),
122}
123
124impl RelayerRepositoryStorage {
125 pub fn new_in_memory() -> Self {
126 Self::InMemory(InMemoryRelayerRepository::new())
127 }
128
129 pub fn new_redis(
130 connections: Arc<RedisConnections>,
131 key_prefix: String,
132 ) -> Result<Self, RepositoryError> {
133 Ok(Self::Redis(RedisRelayerRepository::new(
134 connections,
135 key_prefix,
136 )?))
137 }
138
139 pub fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
149 match self {
150 RelayerRepositoryStorage::InMemory(_) => None,
151 RelayerRepositoryStorage::Redis(repo) => {
152 Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
153 }
154 }
155 }
156}
157
158impl Default for RelayerRepositoryStorage {
159 fn default() -> Self {
160 Self::new_in_memory()
161 }
162}
163
164#[async_trait]
165impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
166 async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
167 match self {
168 RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
169 RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
170 }
171 }
172
173 async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
174 match self {
175 RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
176 RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
177 }
178 }
179
180 async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
181 match self {
182 RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
183 RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
184 }
185 }
186
187 async fn list_paginated(
188 &self,
189 query: PaginationQuery,
190 ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
191 match self {
192 RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
193 RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
194 }
195 }
196
197 async fn update(
198 &self,
199 id: String,
200 entity: RelayerRepoModel,
201 ) -> Result<RelayerRepoModel, RepositoryError> {
202 match self {
203 RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
204 RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
205 }
206 }
207
208 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
209 match self {
210 RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
211 RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
212 }
213 }
214
215 async fn count(&self) -> Result<usize, RepositoryError> {
216 match self {
217 RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
218 RelayerRepositoryStorage::Redis(repo) => repo.count().await,
219 }
220 }
221
222 async fn has_entries(&self) -> Result<bool, RepositoryError> {
223 match self {
224 RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
225 RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
226 }
227 }
228
229 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
230 match self {
231 RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
232 RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
233 }
234 }
235}
236
237#[async_trait]
238impl RelayerRepository for RelayerRepositoryStorage {
239 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
240 match self {
241 RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
242 RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
243 }
244 }
245
246 async fn list_by_signer_id(
247 &self,
248 signer_id: &str,
249 ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
250 match self {
251 RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
252 RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
253 }
254 }
255
256 async fn list_by_notification_id(
257 &self,
258 notification_id: &str,
259 ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
260 match self {
261 RelayerRepositoryStorage::InMemory(repo) => {
262 repo.list_by_notification_id(notification_id).await
263 }
264 RelayerRepositoryStorage::Redis(repo) => {
265 repo.list_by_notification_id(notification_id).await
266 }
267 }
268 }
269
270 async fn partial_update(
271 &self,
272 id: String,
273 update: UpdateRelayerRequest,
274 ) -> Result<RelayerRepoModel, RepositoryError> {
275 match self {
276 RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
277 RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
278 }
279 }
280
281 async fn enable_relayer(
282 &self,
283 relayer_id: String,
284 ) -> Result<RelayerRepoModel, RepositoryError> {
285 match self {
286 RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
287 RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
288 }
289 }
290
291 async fn disable_relayer(
292 &self,
293 relayer_id: String,
294 reason: DisabledReason,
295 ) -> Result<RelayerRepoModel, RepositoryError> {
296 match self {
297 RelayerRepositoryStorage::InMemory(repo) => {
298 repo.disable_relayer(relayer_id, reason).await
299 }
300 RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
301 }
302 }
303
304 async fn update_policy(
305 &self,
306 id: String,
307 policy: RelayerNetworkPolicy,
308 ) -> Result<RelayerRepoModel, RepositoryError> {
309 match self {
310 RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
311 RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
312 }
313 }
314
315 fn is_persistent_storage(&self) -> bool {
316 match self {
317 RelayerRepositoryStorage::InMemory(_) => false,
318 RelayerRepositoryStorage::Redis(_) => true,
319 }
320 }
321
322 fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
323 match self {
324 RelayerRepositoryStorage::InMemory(_) => None,
325 RelayerRepositoryStorage::Redis(repo) => {
326 Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
327 }
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
336
337 fn create_test_relayer(id: String) -> RelayerRepoModel {
338 RelayerRepoModel {
339 id: id.clone(),
340 name: format!("Relayer {}", id.clone()),
341 network: "TestNet".to_string(),
342 paused: false,
343 network_type: NetworkType::Evm,
344 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
345 include_revert_data: None,
346 min_balance: Some(0),
347 gas_limit_estimation: Some(true),
348 gas_price_cap: None,
349 whitelist_receivers: None,
350 eip1559_pricing: Some(false),
351 private_transactions: Some(false),
352 }),
353 signer_id: "test".to_string(),
354 address: "0x".to_string(),
355 notification_id: None,
356 system_disabled: false,
357 custom_rpc_urls: None,
358 ..Default::default()
359 }
360 }
361
362 #[actix_web::test]
363 async fn test_in_memory_repository_impl() {
364 let impl_repo = RelayerRepositoryStorage::new_in_memory();
365 let relayer = create_test_relayer("test-relayer".to_string());
366
367 let created = impl_repo.create(relayer.clone()).await.unwrap();
369 assert_eq!(created.id, relayer.id);
370
371 let retrieved = impl_repo
373 .get_by_id("test-relayer".to_string())
374 .await
375 .unwrap();
376 assert_eq!(retrieved.id, relayer.id);
377
378 let all_relayers = impl_repo.list_all().await.unwrap();
380 assert!(!all_relayers.is_empty());
381
382 let count = impl_repo.count().await.unwrap();
384 assert!(count >= 1);
385
386 let mut updated_relayer = relayer.clone();
388 updated_relayer.name = "Updated Name".to_string();
389 let updated = impl_repo
390 .update(relayer.id.clone(), updated_relayer)
391 .await
392 .unwrap();
393 assert_eq!(updated.name, "Updated Name");
394
395 impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
397 let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
398 assert!(get_result.is_err());
399 }
400
401 #[actix_web::test]
402 async fn test_relayer_repository_trait_methods() {
403 let impl_repo = RelayerRepositoryStorage::new_in_memory();
404 let relayer = create_test_relayer("test-relayer".to_string());
405
406 impl_repo.create(relayer.clone()).await.unwrap();
408
409 let active_relayers = impl_repo.list_active().await.unwrap();
411 assert!(!active_relayers.is_empty());
412
413 let update = UpdateRelayerRequest {
415 paused: Some(true),
416 ..Default::default()
417 };
418 let updated = impl_repo
419 .partial_update(relayer.id.clone(), update)
420 .await
421 .unwrap();
422 assert!(updated.paused);
423
424 let disabled = impl_repo
426 .disable_relayer(
427 relayer.id.clone(),
428 DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
429 )
430 .await
431 .unwrap();
432 assert!(disabled.system_disabled);
433 assert_eq!(
434 disabled.disabled_reason,
435 Some(DisabledReason::BalanceCheckFailed(
436 "Test disable reason".to_string()
437 ))
438 );
439
440 let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
441 assert!(!enabled.system_disabled);
442 assert_eq!(enabled.disabled_reason, None);
443
444 let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
446 include_revert_data: None,
447 min_balance: Some(1000000000000000000),
448 gas_limit_estimation: Some(true),
449 gas_price_cap: Some(50_000_000_000),
450 whitelist_receivers: None,
451 eip1559_pricing: Some(true),
452 private_transactions: Some(false),
453 });
454 let policy_updated = impl_repo
455 .update_policy(relayer.id.clone(), new_policy)
456 .await
457 .unwrap();
458
459 if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
460 assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
461 assert_eq!(evm_policy.eip1559_pricing, Some(true));
462 } else {
463 panic!("Expected EVM policy");
464 }
465 }
466
467 #[actix_web::test]
468 async fn test_create_repository_in_memory() {
469 let result = RelayerRepositoryStorage::new_in_memory();
470
471 assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
472 }
473
474 #[actix_web::test]
475 async fn test_pagination() {
476 let impl_repo = RelayerRepositoryStorage::new_in_memory();
477 let relayer1 = create_test_relayer("test-relayer-1".to_string());
478 let relayer2 = create_test_relayer("test-relayer-2".to_string());
479
480 impl_repo.create(relayer1).await.unwrap();
481 impl_repo.create(relayer2).await.unwrap();
482
483 let query = PaginationQuery {
484 page: 1,
485 per_page: 10,
486 };
487
488 let result = impl_repo.list_paginated(query).await.unwrap();
489 assert!(result.total >= 2);
490 assert_eq!(result.page, 1);
491 assert_eq!(result.per_page, 10);
492 }
493
494 #[actix_web::test]
495 async fn test_delete_relayer() {
496 let impl_repo = RelayerRepositoryStorage::new_in_memory();
497 let relayer = create_test_relayer("delete-test".to_string());
498
499 impl_repo.create(relayer.clone()).await.unwrap();
501
502 impl_repo
504 .delete_by_id("delete-test".to_string())
505 .await
506 .unwrap();
507
508 let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
510 assert!(get_result.is_err());
511 assert!(matches!(
512 get_result.unwrap_err(),
513 RepositoryError::NotFound(_)
514 ));
515
516 let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
518 assert!(delete_result.is_err());
519 }
520
521 #[actix_web::test]
522 async fn test_has_entries() {
523 let repo = InMemoryRelayerRepository::new();
524 assert!(!repo.has_entries().await.unwrap());
525
526 let relayer = create_test_relayer("test".to_string());
527
528 repo.create(relayer.clone()).await.unwrap();
529 assert!(repo.has_entries().await.unwrap());
530
531 repo.delete_by_id(relayer.id.clone()).await.unwrap();
532 assert!(!repo.has_entries().await.unwrap());
533 }
534
535 #[actix_web::test]
536 async fn test_drop_all_entries() {
537 let repo = InMemoryRelayerRepository::new();
538 let relayer = create_test_relayer("test".to_string());
539
540 repo.create(relayer.clone()).await.unwrap();
541 assert!(repo.has_entries().await.unwrap());
542
543 repo.drop_all_entries().await.unwrap();
544 assert!(!repo.has_entries().await.unwrap());
545 }
546
547 #[tokio::test]
548 async fn test_connection_info_returns_none_for_in_memory() {
549 let storage = RelayerRepositoryStorage::new_in_memory();
550
551 assert!(storage.connection_info().is_none());
553 }
554
555 #[tokio::test]
556 async fn test_is_persistent_storage_returns_false_for_in_memory() {
557 let storage = RelayerRepositoryStorage::new_in_memory();
558
559 assert!(!storage.is_persistent_storage());
561 }
562
563 #[tokio::test]
564 async fn test_trait_connection_info_returns_none_for_in_memory() {
565 let storage = RelayerRepositoryStorage::new_in_memory();
566
567 let trait_ref: &dyn RelayerRepository = &storage;
569 assert!(trait_ref.connection_info().is_none());
570 }
571
572 #[tokio::test]
573 async fn test_struct_connection_info_returns_none_for_in_memory() {
574 let storage = RelayerRepositoryStorage::new_in_memory();
575
576 let result: Option<(Arc<Pool>, String)> = storage.connection_info();
578 assert!(result.is_none());
579 }
580}