@@ -24,8 +24,8 @@ use crate::{
24
24
} ;
25
25
use actix_web:: { http:: header:: ContentType , web, HttpRequest , HttpResponse , Responder } ;
26
26
use bytes:: Bytes ;
27
+ use chrono:: Utc ;
27
28
use http:: StatusCode ;
28
- use rand:: distributions:: DistString ;
29
29
use serde_json:: Error as SerdeError ;
30
30
31
31
pub async fn list ( req : HttpRequest ) -> Result < impl Responder , FiltersError > {
@@ -48,48 +48,57 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, FiltersError> {
48
48
return Ok ( ( web:: Json ( filter) , StatusCode :: OK ) ) ;
49
49
}
50
50
51
- Err ( FiltersError :: Metadata ( "Filter Not Found " ) )
51
+ Err ( FiltersError :: Metadata ( "Filter does not exist " ) )
52
52
}
53
53
54
54
pub async fn post ( body : Bytes ) -> Result < impl Responder , PostError > {
55
- let filter: Filter = serde_json:: from_slice ( & body) ?;
56
- let filter_id = rand:: distributions:: Alphanumeric . sample_string ( & mut rand:: thread_rng ( ) , 10 ) ;
57
- let user_id = & filter. user_id ;
58
- let stream_name = & filter. stream_name ;
59
- let mut cloned_filter = filter. clone ( ) ;
60
- cloned_filter. filter_id = Some ( filter_id. clone ( ) ) ;
61
- cloned_filter. version = Some ( CURRENT_FILTER_VERSION . to_string ( ) ) ;
62
- FILTERS . update ( & cloned_filter) ;
63
-
64
- let path = filter_path ( user_id, stream_name, & format ! ( "{}.json" , filter_id) ) ;
55
+ let mut filter: Filter = serde_json:: from_slice ( & body) ?;
56
+ let filter_id = format ! (
57
+ "{}.{}.{}" ,
58
+ & filter. user_id,
59
+ & filter. stream_name,
60
+ Utc :: now( ) . timestamp_millis( )
61
+ ) ;
62
+ filter. filter_id = Some ( filter_id. clone ( ) ) ;
63
+ filter. version = Some ( CURRENT_FILTER_VERSION . to_string ( ) ) ;
64
+ FILTERS . update ( & filter) ;
65
+
66
+ let path = filter_path (
67
+ & filter. user_id ,
68
+ & filter. stream_name ,
69
+ & format ! ( "{}.json" , filter_id) ,
70
+ ) ;
65
71
66
72
let store = CONFIG . storage ( ) . get_object_store ( ) ;
67
- let filter_bytes = serde_json:: to_vec ( & cloned_filter ) ?;
73
+ let filter_bytes = serde_json:: to_vec ( & filter ) ?;
68
74
store. put_object ( & path, Bytes :: from ( filter_bytes) ) . await ?;
69
75
70
- Ok ( ( web:: Json ( cloned_filter ) , StatusCode :: OK ) )
76
+ Ok ( ( web:: Json ( filter ) , StatusCode :: OK ) )
71
77
}
72
78
73
79
pub async fn update ( req : HttpRequest , body : Bytes ) -> Result < HttpResponse , PostError > {
74
80
let filter_id = req
75
81
. match_info ( )
76
82
. get ( "filter_id" )
77
83
. ok_or ( FiltersError :: Metadata ( "No Filter Id Provided" ) ) ?;
78
- let filter = FILTERS
79
- . get_filter ( filter_id)
80
- . ok_or ( FiltersError :: Metadata ( "Filter Not Found" ) ) ?;
81
- let user_id = & filter. user_id ;
82
- let stream_name = & filter. stream_name ;
83
-
84
- let mut cloned_filter: Filter = serde_json:: from_slice ( & body) ?;
85
- cloned_filter. filter_id = Some ( filter_id. to_string ( ) ) ;
86
- cloned_filter. version = Some ( CURRENT_FILTER_VERSION . to_string ( ) ) ;
87
- FILTERS . update ( & cloned_filter) ;
84
+ if FILTERS . get_filter ( filter_id) . is_none ( ) {
85
+ return Err ( PostError :: FiltersError ( FiltersError :: Metadata (
86
+ "Filter does not exist" ,
87
+ ) ) ) ;
88
+ }
89
+ let mut filter: Filter = serde_json:: from_slice ( & body) ?;
90
+ filter. filter_id = Some ( filter_id. to_string ( ) ) ;
91
+ filter. version = Some ( CURRENT_FILTER_VERSION . to_string ( ) ) ;
92
+ FILTERS . update ( & filter) ;
88
93
89
- let path = filter_path ( user_id, stream_name, & format ! ( "{}.json" , filter_id) ) ;
94
+ let path = filter_path (
95
+ & filter. user_id ,
96
+ & filter. stream_name ,
97
+ & format ! ( "{}.json" , filter_id) ,
98
+ ) ;
90
99
91
100
let store = CONFIG . storage ( ) . get_object_store ( ) ;
92
- let filter_bytes = serde_json:: to_vec ( & cloned_filter ) ?;
101
+ let filter_bytes = serde_json:: to_vec ( & filter ) ?;
93
102
store. put_object ( & path, Bytes :: from ( filter_bytes) ) . await ?;
94
103
95
104
Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -102,11 +111,13 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
102
111
. ok_or ( FiltersError :: Metadata ( "No Filter Id Provided" ) ) ?;
103
112
let filter = FILTERS
104
113
. get_filter ( filter_id)
105
- . ok_or ( FiltersError :: Metadata ( "Filter Not Found" ) ) ?;
106
- let stream_name = & filter. stream_name ;
107
- let user_id = & filter. user_id ;
114
+ . ok_or ( FiltersError :: Metadata ( "Filter does not exist" ) ) ?;
108
115
109
- let path = filter_path ( user_id, stream_name, & format ! ( "{}.json" , filter_id) ) ;
116
+ let path = filter_path (
117
+ & filter. user_id ,
118
+ & filter. stream_name ,
119
+ & format ! ( "{}.json" , filter_id) ,
120
+ ) ;
110
121
let store = CONFIG . storage ( ) . get_object_store ( ) ;
111
122
store. delete_object ( & path) . await ?;
112
123
0 commit comments