diff --git a/Cargo.toml b/Cargo.toml index 5d16de7..18b8e58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "alfis" -version = "0.3.12" +version = "0.3.13" authors = ["Revertron "] edition = "2018" build = "build.rs" diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 369c23b..506344e 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -63,6 +63,7 @@ impl Network { // Starting peer connections to bootstrap nodes peers.connect_peers(peers_addrs, &poll.registry(), &mut unique_token, yggdrasil_only); + let mut peers_timer = Instant::now(); loop { // Poll Mio for events, blocking until we get an event. poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); @@ -131,19 +132,22 @@ impl Network { } events.clear(); - // Send pings to idle peers - let (height, hash) = { - let mut context = context.lock().unwrap(); - let height = context.chain.height(); - let nodes = peers.get_peers_active_count(); - if nodes > 0 { - context.bus.post(crate::event::Event::NetworkStatus { nodes, blocks: height }); - } - (height, context.chain.last_hash()) - }; - mine_locker_block(Arc::clone(&context)); - peers.send_pings(poll.registry(), height, hash); - peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only); + if peers_timer.elapsed().as_millis() > 100 { + // Send pings to idle peers + let (height, hash) = { + let mut context = context.lock().unwrap(); + let height = context.chain.height(); + let nodes = peers.get_peers_active_count(); + if nodes > 0 { + context.bus.post(crate::event::Event::NetworkStatus { nodes, blocks: height }); + } + (height, context.chain.last_hash()) + }; + mine_locker_block(Arc::clone(&context)); + peers.send_pings(poll.registry(), height, hash); + peers.connect_new_peers(poll.registry(), &mut unique_token, yggdrasil_only); + peers_timer = Instant::now(); + } } info!("Network loop finished"); }); diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 340ee62..953c027 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -18,7 +18,7 @@ pub struct Peers { ignored: HashSet } -const PING_PERIOD: u64 = 30; +const PING_PERIOD: u64 = 60; impl Peers { pub fn new() -> Self { @@ -44,7 +44,26 @@ impl Peers { let stream = peer.get_stream(); let _ = stream.shutdown(Shutdown::Both); let _ = registry.deregister(stream); - debug!("Peer connection {} to {:?} has shut down", &token.0, &peer.get_addr()); + match peer.get_state() { + State::Connecting => { + debug!("Peer connection {} to {:?} has timed out", &token.0, &peer.get_addr()); + } + State::Connected => { + debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr()); + } + State::Idle { .. } | State::Message { .. } => { + debug!("Peer connection {} to {:?} disconnected", &token.0, &peer.get_addr()); + } + State::Error => { + debug!("Peer connection {} to {:?} has shut down on error", &token.0, &peer.get_addr()); + } + State::Banned => { + debug!("Peer connection {} to {:?} has shut down, banned", &token.0, &peer.get_addr()); + } + State::Offline { .. } => { + debug!("Peer connection {} to {:?} is offline", &token.0, &peer.get_addr()); + } + } if !peer.disabled() && !peer.is_inbound() { peer.set_state(State::offline()); @@ -91,7 +110,12 @@ impl Peers { continue; } - if skip_addr(&addr) { + if self.ignored.contains(&addr.ip()) { + trace!("Skipping address from exchange: {}", &addr); + continue; + } + + if skip_private_addr(&addr) { //debug!("Skipping address from exchange: {}", &addr); continue; // Return error in future } @@ -233,8 +257,18 @@ impl Peers { } } + let mut offline_ips = Vec::new(); // Remove all peers that are offline for a long time - self.peers.retain(|_, p| { !(p.get_state().need_reconnect() && p.reconnects() >= MAX_RECONNECTS) }); + self.peers.retain(|_, p| { + let offline = p.get_state().need_reconnect() && p.reconnects() >= MAX_RECONNECTS; + if offline { + offline_ips.push(p.get_addr().ip()); + } + !offline + }); + for ip in offline_ips { + self.ignore_ip(&ip); + } for (token, peer) in self.peers.iter_mut() { if peer.get_state().need_reconnect() { @@ -286,7 +320,7 @@ impl Peers { } if let Ok(mut stream) = TcpStream::connect(addr.clone()) { let token = next(unique_token); - debug!("Trying to reconnect connection {}, to peer {}", &token.0, &addr); + debug!("Created connection {}, to peer {}", &token.0, &addr); registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); peer.set_public(true); @@ -295,7 +329,7 @@ impl Peers { } } -fn skip_addr(addr: &SocketAddr) -> bool { +fn skip_private_addr(addr: &SocketAddr) -> bool { if addr.ip().is_loopback() { return true; }