@ -11,6 +11,7 @@ use bcrypt::{verify};
use std ::str ::FromStr ;
use diesel ::PgConnection ;
use failure ::Error ;
use std ::time ::{ SystemTime } ;
use { Crud , Joinable , Likeable , Followable , Bannable , Saveable , establish_connection , naive_now , naive_from_unix , SortType , SearchType , has_slurs , remove_slurs } ;
use actions ::community ::* ;
@ -25,9 +26,14 @@ use actions::user_view::*;
use actions ::moderator_views ::* ;
use actions ::moderator ::* ;
const RATE_LIMIT_MESSAGES : i32 = 30 ;
const RATE_LIMIT_PER_SECOND : i32 = 60 ;
const RATE_LIMIT_REGISTER_MESSAGES : i32 = 1 ;
const RATE_LIMIT_REGISTER_PER_SECOND : i32 = 60 ;
#[ derive(EnumString,ToString,Debug) ]
pub enum UserOperation {
Login , Register , CreateCommunity , CreatePost , ListCommunities , ListCategories , GetPost , GetCommunity , CreateComment , EditComment , SaveComment , CreateCommentLike , GetPosts , CreatePostLike , EditPost , SavePost , EditCommunity , FollowCommunity , GetFollowedCommunities , GetUserDetails , GetReplies , GetModlog , BanFromCommunity , AddModToCommunity , CreateSite , EditSite , GetSite , AddAdmin , BanUser , Search
Login , Register , CreateCommunity , CreatePost , ListCommunities , ListCategories , GetPost , GetCommunity , CreateComment , EditComment , SaveComment , CreateCommentLike , GetPosts , CreatePostLike , EditPost , SavePost , EditCommunity , FollowCommunity , GetFollowedCommunities , GetUserDetails , GetReplies , GetModlog , BanFromCommunity , AddModToCommunity , CreateSite , EditSite , GetSite , AddAdmin , BanUser , Search , MarkAllAsRead
}
#[ derive(Fail, Debug) ]
@ -48,12 +54,14 @@ pub struct WSMessage(pub String);
#[ rtype(usize) ]
pub struct Connect {
pub addr : Recipient < WSMessage > ,
pub ip : String ,
}
/// Session is disconnected
#[ derive(Message) ]
pub struct Disconnect {
pub id : usize ,
pub ip : String ,
}
/// Send message to specific room
@ -219,6 +227,7 @@ pub struct EditComment {
creator_id : i32 ,
post_id : i32 ,
removed : Option < bool > ,
deleted : Option < bool > ,
reason : Option < String > ,
read : Option < bool > ,
auth : String
@ -268,6 +277,7 @@ pub struct EditPost {
url : Option < String > ,
body : Option < String > ,
removed : Option < bool > ,
deleted : Option < bool > ,
locked : Option < bool > ,
reason : Option < String > ,
auth : String
@ -288,6 +298,7 @@ pub struct EditCommunity {
description : Option < String > ,
category_id : i32 ,
removed : Option < bool > ,
deleted : Option < bool > ,
reason : Option < String > ,
expires : Option < i64 > ,
auth : String
@ -320,6 +331,7 @@ pub struct GetUserDetails {
limit : Option < i64 > ,
community_id : Option < i32 > ,
saved_only : bool ,
auth : Option < String > ,
}
#[ derive(Serialize, Deserialize) ]
@ -478,10 +490,27 @@ pub struct SearchResponse {
posts : Vec < PostView > ,
}
#[ derive(Serialize, Deserialize) ]
pub struct MarkAllAsRead {
auth : String
}
#[ derive(Debug) ]
pub struct RateLimitBucket {
last_checked : SystemTime ,
allowance : f64
}
pub struct SessionInfo {
pub addr : Recipient < WSMessage > ,
pub ip : String ,
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat
/// session. implementation is super primitive
pub struct ChatServer {
sessions : HashMap < usize , Recipient < WSMessage > > , // A map from generated random ID to session addr
sessions : HashMap < usize , SessionInfo > , // A map from generated random ID to session addr
rate_limits : HashMap < String , RateLimitBucket > ,
rooms : HashMap < i32 , HashSet < usize > > , // A map from room / post name to set of connectionIDs
rng : ThreadRng ,
}
@ -493,6 +522,7 @@ impl Default for ChatServer {
ChatServer {
sessions : HashMap ::new ( ) ,
rate_limits : HashMap ::new ( ) ,
rooms : rooms ,
rng : rand ::thread_rng ( ) ,
}
@ -505,8 +535,8 @@ impl ChatServer {
if let Some ( sessions ) = self . rooms . get ( & room ) {
for id in sessions {
if * id ! = skip_id {
if let Some ( addr ) = self . sessions . get ( id ) {
let _ = addr. do_send ( WSMessage ( message . to_owned ( ) ) ) ;
if let Some ( info ) = self . sessions . get ( id ) {
let _ = info. addr. do_send ( WSMessage ( message . to_owned ( ) ) ) ;
}
}
}
@ -531,8 +561,51 @@ impl ChatServer {
Ok ( ( ) )
}
fn check_rate_limit_register ( & mut self , addr : usize ) -> Result < ( ) , Error > {
self . check_rate_limit_full ( addr , RATE_LIMIT_REGISTER_MESSAGES , RATE_LIMIT_REGISTER_PER_SECOND )
}
fn check_rate_limit ( & mut self , addr : usize ) -> Result < ( ) , Error > {
self . check_rate_limit_full ( addr , RATE_LIMIT_MESSAGES , RATE_LIMIT_PER_SECOND )
}
fn check_rate_limit_full ( & mut self , addr : usize , rate : i32 , per : i32 ) -> Result < ( ) , Error > {
if let Some ( info ) = self . sessions . get ( & addr ) {
if let Some ( rate_limit ) = self . rate_limits . get_mut ( & info . ip ) {
// The initial value
if rate_limit . allowance = = - 2 f64 {
rate_limit . allowance = rate as f64 ;
} ;
let current = SystemTime ::now ( ) ;
let time_passed = current . duration_since ( rate_limit . last_checked ) ? . as_secs ( ) as f64 ;
rate_limit . last_checked = current ;
rate_limit . allowance + = time_passed * ( rate as f64 / per as f64 ) ;
if rate_limit . allowance > rate as f64 {
rate_limit . allowance = rate as f64 ;
}
if rate_limit . allowance < 1.0 {
println! ( "Rate limited IP: {}, time_passed: {}, allowance: {}" , & info . ip , time_passed , rate_limit . allowance ) ;
Err ( ErrorMessage {
op : "Rate Limit" . to_string ( ) ,
message : format ! ( "Too many requests. {} per {} seconds" , rate , per ) ,
} ) ?
} else {
rate_limit . allowance - = 1.0 ;
Ok ( ( ) )
}
} else {
Ok ( ( ) )
}
} else {
Ok ( ( ) )
}
}
}
/// Make actor from `ChatServer`
impl Actor for ChatServer {
/// We are going to use simple Context, we just need ability to communicate
@ -546,14 +619,30 @@ impl Actor for ChatServer {
impl Handler < Connect > for ChatServer {
type Result = usize ;
fn handle ( & mut self , msg : Connect , _ : & mut Context < Self > ) -> Self ::Result {
fn handle ( & mut self , msg : Connect , _ ctx : & mut Context < Self > ) -> Self ::Result {
// notify all users in same room
// self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
// register session with random id
let id = self . rng . gen ::< usize > ( ) ;
self . sessions . insert ( id , msg . addr ) ;
println! ( "{} joined" , & msg . ip ) ;
self . sessions . insert ( id , SessionInfo {
addr : msg . addr ,
ip : msg . ip . to_owned ( ) ,
} ) ;
if self . rate_limits . get ( & msg . ip ) . is_none ( ) {
self . rate_limits . insert ( msg . ip , RateLimitBucket {
last_checked : SystemTime ::now ( ) ,
allowance : - 2 f64 ,
} ) ;
}
// for (k,v) in &self.rate_limits {
// println!("{}: {:?}", k,v);
// }
// auto join session to Main room
// self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
@ -563,6 +652,7 @@ impl Handler<Connect> for ChatServer {
}
}
/// Handler for Disconnect message.
impl Handler < Disconnect > for ChatServer {
type Result = ( ) ;
@ -728,6 +818,10 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str
let search : Search = serde_json ::from_str ( data ) ? ;
search . perform ( chat , msg . id )
} ,
UserOperation ::MarkAllAsRead = > {
let mark_all_as_read : MarkAllAsRead = serde_json ::from_str ( data ) ? ;
mark_all_as_read . perform ( chat , msg . id )
} ,
}
}
@ -781,10 +875,12 @@ impl Perform for Register {
fn op_type ( & self ) -> UserOperation {
UserOperation ::Register
}
fn perform ( & self , _ chat: & mut ChatServer , _ addr: usize ) -> Result < String , Error > {
fn perform ( & self , chat: & mut ChatServer , addr: usize ) -> Result < String , Error > {
let conn = establish_connection ( ) ;
chat . check_rate_limit_register ( addr ) ? ;
// Make sure passwords match
if & self . password ! = & self . password_verify {
return Err ( self . error ( "Passwords do not match." ) ) ?
@ -871,10 +967,12 @@ impl Perform for CreateCommunity {
UserOperation ::CreateCommunity
}
fn perform ( & self , _ chat: & mut ChatServer , _ addr: usize ) -> Result < String , Error > {
fn perform ( & self , chat: & mut ChatServer , addr: usize ) -> Result < String , Error > {
let conn = establish_connection ( ) ;
chat . check_rate_limit_register ( addr ) ? ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
@ -903,6 +1001,7 @@ impl Perform for CreateCommunity {
category_id : self . category_id ,
creator_id : user_id ,
removed : None ,
deleted : None ,
updated : None ,
} ;
@ -1016,10 +1115,12 @@ impl Perform for CreatePost {
UserOperation ::CreatePost
}
fn perform ( & self , _ chat: & mut ChatServer , _ addr: usize ) -> Result < String , Error > {
fn perform ( & self , chat: & mut ChatServer , addr: usize ) -> Result < String , Error > {
let conn = establish_connection ( ) ;
chat . check_rate_limit_register ( addr ) ? ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
@ -1051,6 +1152,7 @@ impl Perform for CreatePost {
community_id : self . community_id ,
creator_id : user_id ,
removed : None ,
deleted : None ,
locked : None ,
updated : None
} ;
@ -1227,6 +1329,8 @@ impl Perform for CreateComment {
let conn = establish_connection ( ) ;
chat . check_rate_limit ( addr ) ? ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
@ -1255,6 +1359,7 @@ impl Perform for CreateComment {
post_id : self . post_id ,
creator_id : user_id ,
removed : None ,
deleted : None ,
read : None ,
updated : None
} ;
@ -1371,9 +1476,10 @@ impl Perform for EditComment {
post_id : self . post_id ,
creator_id : self . creator_id ,
removed : self . removed . to_owned ( ) ,
deleted : self . deleted . to_owned ( ) ,
read : self . read . to_owned ( ) ,
updated : if self . read . is_some ( ) { orig_comment . updated } else { Some ( naive_now ( ) ) }
} ;
} ;
let _updated_comment = match Comment ::update ( & conn , self . edit_id , & comment_form ) {
Ok ( comment ) = > comment ,
@ -1483,6 +1589,8 @@ impl Perform for CreateCommentLike {
let conn = establish_connection ( ) ;
chat . check_rate_limit ( addr ) ? ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
@ -1611,10 +1719,12 @@ impl Perform for CreatePostLike {
UserOperation ::CreatePostLike
}
fn perform ( & self , _ chat: & mut ChatServer , _ addr: usize ) -> Result < String , Error > {
fn perform ( & self , chat: & mut ChatServer , addr: usize ) -> Result < String , Error > {
let conn = establish_connection ( ) ;
chat . check_rate_limit ( addr ) ? ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
@ -1734,6 +1844,7 @@ impl Perform for EditPost {
creator_id : self . creator_id . to_owned ( ) ,
community_id : self . community_id ,
removed : self . removed . to_owned ( ) ,
deleted : self . deleted . to_owned ( ) ,
locked : self . locked . to_owned ( ) ,
updated : Some ( naive_now ( ) )
} ;
@ -1899,6 +2010,7 @@ impl Perform for EditCommunity {
category_id : self . category_id . to_owned ( ) ,
creator_id : user_id ,
removed : self . removed . to_owned ( ) ,
deleted : self . deleted . to_owned ( ) ,
updated : Some ( naive_now ( ) )
} ;
@ -2051,6 +2163,19 @@ impl Perform for GetUserDetails {
let conn = establish_connection ( ) ;
let user_id : Option < i32 > = match & self . auth {
Some ( auth ) = > {
match Claims ::decode ( & auth ) {
Ok ( claims ) = > {
let user_id = claims . claims . id ;
Some ( user_id )
}
Err ( _e ) = > None
}
}
None = > None
} ;
//TODO add save
let sort = SortType ::from_str ( & self . sort ) ? ;
@ -2081,7 +2206,7 @@ impl Perform for GetUserDetails {
self . community_id ,
Some ( user_details_id ) ,
None ,
None ,
user_id ,
self . saved_only ,
false ,
self . page ,
@ -2103,7 +2228,7 @@ impl Perform for GetUserDetails {
None ,
Some ( user_details_id ) ,
None ,
None ,
user_id ,
self . saved_only ,
self . page ,
self . limit ) ?
@ -2663,7 +2788,7 @@ impl Perform for Search {
} ,
SearchType ::Comments = > {
comments = CommentView ::list ( & conn ,
& sort ,
& sort ,
None ,
None ,
Some ( self . q . to_owned ( ) ) ,
@ -2685,7 +2810,7 @@ impl Perform for Search {
self . page ,
self . limit ) ? ;
comments = CommentView ::list ( & conn ,
& sort ,
& sort ,
None ,
None ,
Some ( self . q . to_owned ( ) ) ,
@ -2709,3 +2834,57 @@ impl Perform for Search {
)
}
}
impl Perform for MarkAllAsRead {
fn op_type ( & self ) -> UserOperation {
UserOperation ::MarkAllAsRead
}
fn perform ( & self , _chat : & mut ChatServer , _addr : usize ) -> Result < String , Error > {
let conn = establish_connection ( ) ;
let claims = match Claims ::decode ( & self . auth ) {
Ok ( claims ) = > claims . claims ,
Err ( _e ) = > {
return Err ( self . error ( "Not logged in." ) ) ?
}
} ;
let user_id = claims . id ;
let replies = ReplyView ::get_replies ( & conn , user_id , & SortType ::New , true , Some ( 1 ) , Some ( 999 ) ) ? ;
for reply in & replies {
let comment_form = CommentForm {
content : reply . to_owned ( ) . content ,
parent_id : reply . to_owned ( ) . parent_id ,
post_id : reply . to_owned ( ) . post_id ,
creator_id : reply . to_owned ( ) . creator_id ,
removed : None ,
deleted : None ,
read : Some ( true ) ,
updated : reply . to_owned ( ) . updated
} ;
let _updated_comment = match Comment ::update ( & conn , reply . id , & comment_form ) {
Ok ( comment ) = > comment ,
Err ( _e ) = > {
return Err ( self . error ( "Couldn't update Comment" ) ) ?
}
} ;
}
let replies = ReplyView ::get_replies ( & conn , user_id , & SortType ::New , true , Some ( 1 ) , Some ( 999 ) ) ? ;
Ok (
serde_json ::to_string (
& GetRepliesResponse {
op : self . op_type ( ) . to_string ( ) ,
replies : replies ,
}
) ?
)
}
}