Fixed stale connections.

pull/295/head
Revertron 2 years ago
parent de4b9ac65e
commit 3949bd046b

@ -359,7 +359,7 @@ impl Network {
}
State::SendLoop => {
let stream = peer.get_stream();
registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap();
registry.reregister(stream, event.token(), Interest::WRITABLE | Interest::READABLE).unwrap();
peer.set_state(State::SendLoop);
}
State::Twin => {
@ -461,6 +461,7 @@ impl Network {
let answer = match message {
Message::Hand { app_version, origin, version, public, rand_id } => {
if app_version.starts_with("0.6") {
info!("Banning peer with version {}", &app_version);
return State::Banned;
}
if self.peers.is_our_own_connect(&rand_id) {
@ -495,6 +496,7 @@ impl Network {
return State::Twin;
}
if app_version.starts_with("0.6") {
info!("Banning peer with version {}", &app_version);
return State::Banned;
}
let nodes = self.peers.get_peers_active_count();

@ -1,11 +1,10 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::Instant;
use mio::net::TcpStream;
use crate::crypto::Chacha;
use crate::p2p::State;
use crate::Block;
#[derive(Debug)]
pub struct Peer {
@ -17,11 +16,11 @@ pub struct Peer {
inbound: bool,
public: bool,
active: bool,
last_active: Instant,
reconnects: u32,
received_block: u64,
sent_height: u64,
cipher: Option<Chacha>,
fork: HashMap<u64, Block>
cipher: Option<Chacha>
}
impl Peer {
@ -35,11 +34,11 @@ impl Peer {
inbound,
public: false,
active: false,
last_active: Instant::now(),
reconnects: 0,
received_block: 0,
sent_height: 0,
cipher: None,
fork: HashMap::new()
cipher: None
}
}
@ -133,10 +132,13 @@ impl Peer {
pub fn set_active(&mut self, active: bool) {
self.active = active;
if active {
self.last_active = Instant::now();
}
}
pub fn active(&self) -> bool {
self.active
self.active && self.last_active.elapsed().as_secs() < 120
}
pub fn reconnects(&self) -> u32 {
@ -159,14 +161,6 @@ impl Peer {
self.inbound
}
pub fn add_fork_block(&mut self, block: Block) {
self.fork.insert(block.index, block);
}
pub fn get_fork(&self) -> &HashMap<u64, Block> {
&self.fork
}
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
pub fn equals(&self, addr: &SocketAddr) -> bool {

@ -194,6 +194,10 @@ impl Peers {
for (_, peer) in self.peers.iter() {
if peer.active() {
count += 1;
} else {
if !matches!(peer.get_state(), State::Connecting) {
debug!("Inactive peer from {:?} in state: {:?}", peer.get_addr(), peer.get_state());
}
}
}
count
@ -249,6 +253,7 @@ impl Peers {
let nodes = self.get_peers_active_count();
let random_time = random::<u64>() % PING_PERIOD;
let mut stale_tokens = Vec::new();
for (token, peer) in self.peers.iter_mut() {
if let State::Idle { from } = peer.get_state() {
if from.elapsed().as_secs() >= PING_PERIOD + random_time {
@ -261,10 +266,23 @@ impl Peers {
peer.set_state(State::message(message));
let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE | Interest::READABLE).unwrap();
}
} else {
if matches!(peer.get_state(), State::Message {..}) {
if !peer.active() {
stale_tokens.push((token.clone(), peer.get_addr()));
continue;
}
let stream = peer.get_stream();
registry.reregister(stream, *token, Interest::WRITABLE).unwrap();
}
}
}
for (token, addr) in &stale_tokens {
info!("Closing stale peer from {}", addr);
self.close_peer(registry, token);
}
// Just purging ignored/banned IPs every 10 minutes
// TODO make it individual for every IP
@ -289,7 +307,7 @@ impl Peers {
None => {}
Some((token, peer)) => {
debug!("Peer {} is behind ({}), sending ping", &peer.get_addr().ip(), peer.get_height());
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap();
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap();
peer.set_state(State::message(Message::Ping { height, hash }));
peer.set_sent_height(height);
self.update_behind_ping_time();
@ -336,7 +354,7 @@ impl Peers {
None => {}
Some((token, peer)) => {
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), height + 1);
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap();
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index: height + 1 }));
}
}
@ -356,7 +374,7 @@ impl Peers {
continue;
}
debug!("Peer {} is higher than we are, requesting block {}", &peer.get_addr().ip(), index);
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE).unwrap();
registry.reregister(peer.get_stream(), *token, Interest::WRITABLE | Interest::READABLE).unwrap();
peer.set_state(State::message(Message::GetBlock { index }));
index += 1;
if index > max_height {

Loading…
Cancel
Save