diff --git a/src/p2p/network.rs b/src/p2p/network.rs index f918869..29fe148 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -359,7 +359,7 @@ impl Network { } State::SendLoop => { let stream = peer.get_stream(); - registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); + registry.reregister(stream, event.token(), Interest::WRITABLE | Interest::READABLE).unwrap(); peer.set_state(State::SendLoop); } State::Twin => { @@ -461,6 +461,7 @@ impl Network { let answer = match message { Message::Hand { app_version, origin, version, public, rand_id } => { if app_version.starts_with("0.6") { + info!("Banning peer with version {}", &app_version); return State::Banned; } if self.peers.is_our_own_connect(&rand_id) { @@ -495,6 +496,7 @@ impl Network { return State::Twin; } if app_version.starts_with("0.6") { + info!("Banning peer with version {}", &app_version); return State::Banned; } let nodes = self.peers.get_peers_active_count(); diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 862af8e..f9fe406 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -1,11 +1,10 @@ -use std::collections::HashMap; use std::net::SocketAddr; +use std::time::Instant; use mio::net::TcpStream; use crate::crypto::Chacha; use crate::p2p::State; -use crate::Block; #[derive(Debug)] pub struct Peer { @@ -17,11 +16,11 @@ pub struct Peer { inbound: bool, public: bool, active: bool, + last_active: Instant, reconnects: u32, received_block: u64, sent_height: u64, - cipher: Option, - fork: HashMap + cipher: Option } impl Peer { @@ -35,11 +34,11 @@ impl Peer { inbound, public: false, active: false, + last_active: Instant::now(), reconnects: 0, received_block: 0, sent_height: 0, - cipher: None, - fork: HashMap::new() + cipher: None } } @@ -133,10 +132,13 @@ impl Peer { pub fn set_active(&mut self, active: bool) { self.active = active; + if active { + self.last_active = Instant::now(); + } } pub fn active(&self) -> bool { - self.active + self.active && self.last_active.elapsed().as_secs() < 120 } pub fn reconnects(&self) -> u32 { @@ -159,14 +161,6 @@ impl Peer { self.inbound } - pub fn add_fork_block(&mut self, block: Block) { - self.fork.insert(block.index, block); - } - - pub fn get_fork(&self) -> &HashMap { - &self.fork - } - /// If loopback address then we care about ip and port. /// If regular address then we only care about the ip and ignore the port. pub fn equals(&self, addr: &SocketAddr) -> bool { diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 27e9002..31aa698 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -194,6 +194,10 @@ impl Peers { for (_, peer) in self.peers.iter() { if peer.active() { count += 1; + } else { + if !matches!(peer.get_state(), State::Connecting) { + debug!("Inactive peer from {:?} in state: {:?}", peer.get_addr(), peer.get_state()); + } } } count @@ -249,6 +253,7 @@ impl Peers { let nodes = self.get_peers_active_count(); let random_time = random::() % PING_PERIOD; + let mut stale_tokens = Vec::new(); for (token, peer) in self.peers.iter_mut() { if let State::Idle { from } = peer.get_state() { if from.elapsed().as_secs() >= PING_PERIOD + random_time { @@ -261,10 +266,23 @@ impl Peers { peer.set_state(State::message(message)); let stream = peer.get_stream(); + registry.reregister(stream, *token, Interest::WRITABLE | Interest::READABLE).unwrap(); + } + } else { + if matches!(peer.get_state(), State::Message {..}) { + if !peer.active() { + stale_tokens.push((token.clone(), peer.get_addr())); + continue; + } + let stream = peer.get_stream(); registry.reregister(stream, *token, Interest::WRITABLE).unwrap(); } } } + for (token, addr) in &stale_tokens { + info!("Closing stale peer from {}", addr); + self.close_peer(registry, token); + } // Just purging ignored/banned IPs every 10 minutes // TODO make it individual for every IP @@ -289,7 +307,7 @@ impl Peers { None => {} Some((token, peer)) => { debug!("Peer {} is behind ({}), sending ping", &peer.get_addr().ip(), peer.get_height()); - registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap(); + registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap(); peer.set_state(State::message(Message::Ping { height, hash })); peer.set_sent_height(height); self.update_behind_ping_time(); @@ -336,7 +354,7 @@ impl Peers { None => {} Some((token, peer)) => { debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1); - registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap(); + registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap(); peer.set_state(State::message(Message::GetBlock { index: height + 1 })); } } @@ -356,7 +374,7 @@ impl Peers { continue; } debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index); - registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap(); + registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap(); peer.set_state(State::message(Message::GetBlock { index })); index += 1; if index > max_height {