Fixed block-spam over the p2p if there is no consensus.

pull/272/head v0.7.5
Revertron 2 years ago
parent a3b262159f
commit ef5cef290e

2
Cargo.lock generated

@ -84,7 +84,7 @@ dependencies = [
[[package]] [[package]]
name = "alfis" name = "alfis"
version = "0.7.4" version = "0.7.5"
dependencies = [ dependencies = [
"base64", "base64",
"bincode", "bincode",

@ -1,6 +1,6 @@
[package] [package]
name = "alfis" name = "alfis"
version = "0.7.4" version = "0.7.5"
authors = ["Revertron <alfis@revertron.com>"] authors = ["Revertron <alfis@revertron.com>"]
edition = "2021" edition = "2021"
build = "build.rs" build = "build.rs"

@ -25,7 +25,7 @@ use crate::commons::*;
use crate::crypto::Chacha; use crate::crypto::Chacha;
use crate::eventbus::{post, register}; use crate::eventbus::{post, register};
use crate::p2p::{Message, Peer, Peers, State}; use crate::p2p::{Message, Peer, Peers, State};
use crate::{Block, Context}; use crate::{Block, Bytes, Context};
const SERVER: Token = Token(0); const SERVER: Token = Token(0);
@ -82,6 +82,7 @@ impl Network {
let mut old_keys = 0i64; let mut old_keys = 0i64;
let mut old_nodes = 0usize; let mut old_nodes = 0usize;
let mut old_banned = 0usize; let mut old_banned = 0usize;
let mut seen_blocks = HashSet::new();
loop { loop {
if self.peers.get_peers_count() == 0 && bootstrap_timer.elapsed().as_secs() > 60 { if self.peers.get_peers_count() == 0 && bootstrap_timer.elapsed().as_secs() > 60 {
warn!("Restarting swarm connections..."); warn!("Restarting swarm connections...");
@ -145,7 +146,7 @@ impl Network {
} }
} }
token => { token => {
if !self.handle_connection_event(poll.registry(), event) { if !self.handle_connection_event(poll.registry(), event, &mut seen_blocks) {
let _ = self.peers.close_peer(poll.registry(), &token); let _ = self.peers.close_peer(poll.registry(), &token);
let blocks = self.context.lock().unwrap().chain.get_height(); let blocks = self.context.lock().unwrap().chain.get_height();
let keys = self.context.lock().unwrap().chain.get_users_count(); let keys = self.context.lock().unwrap().chain.get_users_count();
@ -199,6 +200,7 @@ impl Network {
warn!("Last network events time {} seconds ago", elapsed); warn!("Last network events time {} seconds ago", elapsed);
} }
log_timer = Instant::now(); log_timer = Instant::now();
seen_blocks.clear();
} }
if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 5 { if nodes < MAX_NODES && connect_timer.elapsed().as_secs() >= 5 {
self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only); self.peers.connect_new_peers(poll.registry(), &mut self.token, yggdrasil_only);
@ -219,13 +221,13 @@ impl Network {
} }
} }
fn handle_connection_event(&mut self, registry: &Registry, event: &Event) -> bool { fn handle_connection_event(&mut self, registry: &Registry, event: &Event, seen_blocks: &mut HashSet<Bytes>) -> bool {
if event.is_error() || (event.is_read_closed() && event.is_write_closed()) { if event.is_error() || (event.is_read_closed() && event.is_write_closed()) {
return false; return false;
} }
if event.is_readable() { if event.is_readable() {
return self.process_readable(registry, event); return self.process_readable(registry, event, seen_blocks);
} }
if event.is_writable() { if event.is_writable() {
@ -235,7 +237,7 @@ impl Network {
true true
} }
fn process_readable(&mut self, registry: &Registry, event: &Event) -> bool { fn process_readable(&mut self, registry: &Registry, event: &Event, seen_blocks: &mut HashSet<Bytes>) -> bool {
let data = { let data = {
let token = event.token(); let token = event.token();
match self.peers.get_mut_peer(&token) { match self.peers.get_mut_peer(&token) {
@ -328,7 +330,7 @@ impl Network {
match Message::from_bytes(data) { match Message::from_bytes(data) {
Ok(message) => { Ok(message) => {
//let m = format!("{:?}", &message); //let m = format!("{:?}", &message);
let new_state = self.handle_message(message, &event.token()); let new_state = self.handle_message(message, &event.token(), seen_blocks);
let peer = self.peers.get_mut_peer(&event.token()).unwrap(); let peer = self.peers.get_mut_peer(&event.token()).unwrap();
//debug!("Got message from {}: {:?}", &peer.get_addr(), &m); //debug!("Got message from {}: {:?}", &peer.get_addr(), &m);
let stream = peer.get_stream(); let stream = peer.get_stream();
@ -448,7 +450,7 @@ impl Network {
true true
} }
fn handle_message(&mut self, message: Message, token: &Token) -> State { fn handle_message(&mut self, message: Message, token: &Token, seen_blocks: &mut HashSet<Bytes>) -> State {
let (my_height, my_hash, my_origin, my_version, me_public) = { let (my_height, my_hash, my_origin, my_version, me_public) = {
let context = self.context.lock().unwrap(); let context = self.context.lock().unwrap();
// TODO cache it somewhere // TODO cache it somewhere
@ -514,6 +516,9 @@ impl Network {
let peer = self.peers.get_mut_peer(token).unwrap(); let peer = self.peers.get_mut_peer(token).unwrap();
peer.set_height(height); peer.set_height(height);
peer.set_active(true); peer.set_active(true);
if seen_blocks.contains(&hash) {
return State::message(Message::pong(my_height, my_hash));
}
if peer.is_higher(my_height) { if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap(); let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height); context.chain.update_max_height(height);
@ -532,6 +537,9 @@ impl Network {
let peer = self.peers.get_mut_peer(token).unwrap(); let peer = self.peers.get_mut_peer(token).unwrap();
peer.set_height(height); peer.set_height(height);
peer.set_active(true); peer.set_active(true);
if seen_blocks.contains(&hash) {
return State::idle();
}
if peer.is_higher(my_height) { if peer.is_higher(my_height) {
let mut context = self.context.lock().unwrap(); let mut context = self.context.lock().unwrap();
context.chain.update_max_height(height); context.chain.update_max_height(height);
@ -585,7 +593,11 @@ impl Network {
return State::Banned; return State::Banned;
} }
info!("Received block {} with hash {:?}", block.index, &block.hash); info!("Received block {} with hash {:?}", block.index, &block.hash);
self.handle_block(token, block) if !seen_blocks.contains(&block.hash) {
self.handle_block(token, block, seen_blocks)
} else {
State::idle()
}
} }
Message::Twin => State::Twin, Message::Twin => State::Twin,
Message::Loop => State::Loop Message::Loop => State::Loop
@ -593,7 +605,8 @@ impl Network {
answer answer
} }
fn handle_block(&mut self, token: &Token, block: Block) -> State { fn handle_block(&mut self, token: &Token, block: Block, seen_blocks: &mut HashSet<Bytes>) -> State {
seen_blocks.insert(block.hash.clone());
let peers_count = self.peers.get_peers_active_count(); let peers_count = self.peers.get_peers_active_count();
let peer = self.peers.get_mut_peer(token).unwrap(); let peer = self.peers.get_mut_peer(token).unwrap();
peer.set_received_block(block.index); peer.set_received_block(block.index);
@ -652,16 +665,14 @@ impl Network {
debug!("Got forked block {} with hash {:?}", block.index, block.hash); debug!("Got forked block {} with hash {:?}", block.index, block.hash);
// If we are very much behind of blockchain // If we are very much behind of blockchain
let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height; let lagged = block.index == context.chain.get_height() && block.index + LIMITED_CONFIDENCE_DEPTH <= max_height;
let last_block = context.chain.last_block().unwrap(); let our_block = context.chain.get_block(block.index).unwrap();
if block.is_better_than(&last_block) || lagged { if block.is_better_than(&our_block) || lagged {
context.chain.replace_block(block).expect("Error replacing block with fork"); context.chain.replace_block(block).expect("Error replacing block with fork");
let index = context.chain.get_height(); let index = context.chain.get_height();
post(crate::event::Event::BlockchainChanged { index }); post(crate::event::Event::BlockchainChanged { index });
} else { } else {
debug!("Fork in not better than our block, dropping."); debug!("Fork in not better than our block, dropping.");
if let Some(block) = context.chain.get_block(block.index) { return State::message(Message::block(our_block.index, our_block.as_bytes()));
return State::message(Message::block(block.index, block.as_bytes()));
}
} }
} }
} }

Loading…
Cancel
Save