diff --git a/Cargo.toml b/Cargo.toml index c7667fe..c0ec437 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ chrono = { version = "0.4.13", features = ["serde"] } rand = "0.8.3" sqlite = "0.25.3" uuid = { version = "0.8.2", features = ["serde", "v4"] } -mio = { version = "0.7", features = ["os-poll", "net"] } +mio = { version = "0.7.10", features = ["os-poll", "net"] } # for DNS from hermes derive_more = "0.99.9" diff --git a/src/blockchain/chain.rs b/src/blockchain/chain.rs index ac17770..32dcc3b 100644 --- a/src/blockchain/chain.rs +++ b/src/blockchain/chain.rs @@ -185,7 +185,7 @@ impl Chain { None } Some(block) => { - trace!("Loaded block: {:?}", &block); + //trace!("Loaded block: {:?}", &block); Some(block) } }; @@ -229,7 +229,7 @@ impl Chain { None } Some(block) => { - trace!("Got last full block: {:?}", &block); + //trace!("Got last full block: {:?}", &block); Some(block) } }; diff --git a/src/blockchain/filter.rs b/src/blockchain/filter.rs index 1b02663..8234a0b 100644 --- a/src/blockchain/filter.rs +++ b/src/blockchain/filter.rs @@ -43,12 +43,12 @@ impl DnsFilter for BlockchainFilter { let mut packet = DnsPacket::new(); packet.questions.push(DnsQuestion::new(String::from(qname), qtype)); packet.header.rescode = ResultCode::SERVFAIL; - trace!("Returning packet: {:?}", &packet); + //trace!("Returning packet: {:?}", &packet); return Some(packet); } } Some(data) => { - info!("Found data for domain {}", &search); + debug!("Found data for domain {}", &search); let mut data: DomainData = match serde_json::from_str(&data) { Err(_) => { return None; } Ok(data) => { data } @@ -136,14 +136,14 @@ impl DnsFilter for BlockchainFilter { for answer in answers { packet.answers.push(answer); } - trace!("Returning packet: {:?}", &packet); + //trace!("Returning packet: {:?}", &packet); Some(packet) } else { // Create DnsPacket let mut packet = DnsPacket::new(); packet.questions.push(DnsQuestion::new(String::from(qname), qtype)); packet.header.rescode = ResultCode::SERVFAIL; - trace!("Returning packet: {:?}", &packet); + //trace!("Returning packet: {:?}", &packet); Some(packet) } } diff --git a/src/bytes.rs b/src/bytes.rs index b5d0ca6..16f34be 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -59,7 +59,7 @@ impl Bytes { } pub fn to_string(&self) -> String { - crate::utils::to_hex(&self.data) + crate::commons::to_hex(&self.data) } pub fn get_tail_u64(&self) -> u64 { @@ -127,14 +127,14 @@ impl Deref for Bytes { impl fmt::Debug for Bytes { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str(&crate::utils::to_hex(&self.data)) + fmt.write_str(&crate::commons::to_hex(&self.data)) } } impl Serialize for Bytes { fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> where S: Serializer { - serializer.serialize_str(&crate::utils::to_hex(&self.data)) + serializer.serialize_str(&crate::commons::to_hex(&self.data)) } } diff --git a/src/utils.rs b/src/commons/mod.rs similarity index 84% rename from src/utils.rs rename to src/commons/mod.rs index 3b14c13..830cb41 100644 --- a/src/utils.rs +++ b/src/commons/mod.rs @@ -1,4 +1,5 @@ use std::num; +use rand::Rng; #[cfg(not(target_os = "macos"))] use thread_priority::*; @@ -79,6 +80,19 @@ fn split_n(s: &str, n: usize) -> Vec<&str> { .collect() } +/// Generates random string of given length +pub fn random_string(length: usize) -> String { + let chars: Vec = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!?".chars().collect(); + let mut rng = rand::thread_rng(); + let mut result = String::with_capacity(length); + for _ in 0..length { + let position: usize = rng.gen::() % chars.len(); + let c: char = *chars.get(position).unwrap(); + result.push(c); + } + result +} + #[cfg(test)] mod test { use crate::check_domain; diff --git a/src/lib.rs b/src/lib.rs index 10abe0e..b2e4ef8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,10 @@ pub use crate::bytes::Bytes; pub use crate::keys::Keystore; pub use crate::x_zones::ExternalZones; pub use crate::simplebus::*; -pub use crate::utils::*; +pub use crate::commons::*; pub mod blockchain; -pub mod utils; +pub mod commons; pub mod simplebus; pub mod keys; pub mod miner; diff --git a/src/main.rs b/src/main.rs index a82c54d..b002e8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,7 +71,7 @@ fn main() { None => { SETTINGS_FILENAME.to_owned() } Some(path) => { path } }; - SimpleLogger::new().with_level(level).init().unwrap(); + SimpleLogger::new().with_level(level).with_module_level("mio::poll", LevelFilter::Warn).init().unwrap(); info!(target: LOG_TARGET_MAIN, "Starting ALFIS {}", env!("CARGO_PKG_VERSION")); let settings = Settings::load(&config_name); diff --git a/src/p2p/message.rs b/src/p2p/message.rs index 97bc8cf..3ae6fba 100644 --- a/src/p2p/message.rs +++ b/src/p2p/message.rs @@ -7,7 +7,7 @@ use crate::Bytes; #[derive(Debug, Serialize, Deserialize)] pub enum Message { Error, - Hand { origin: String, version: u32, public: bool }, + Hand { origin: String, version: u32, public: bool, #[serde(default)] rand: String }, Shake { origin: String, version: u32, ok: bool, height: u64 }, Ping { height: u64, hash: Bytes }, Pong { height: u64, hash: Bytes }, @@ -26,8 +26,8 @@ impl Message { } } - pub fn hand(origin: &str, version: u32, public: bool) -> Self { - Message::Hand { origin: origin.to_owned(), version, public } + pub fn hand(origin: &str, version: u32, public: bool, rand: &str) -> Self { + Message::Hand { origin: origin.to_owned(), version, public, rand: rand.to_owned() } } pub fn shake(origin: &str, version: u32, ok: bool, height: u64) -> Self { @@ -46,3 +46,16 @@ impl Message { Message::Block { index: height, block: str } } } + +#[cfg(test)] +mod tests { + use crate::p2p::Message; + + #[test] + pub fn test_hand() { + assert!(serde_json::from_str::("\"Error\"").is_ok()); + assert!(serde_json::from_str::("{\"Hand\":{\"origin\":\"\",\"version\":1,\"public\":false,\"rand\":\"123\"}}").is_ok()); + assert!(serde_json::from_str::("{\"Hand\":{\"origin\":\"\",\"version\":1,\"public\":false}}").is_ok()); + } + +} \ No newline at end of file diff --git a/src/p2p/network.rs b/src/p2p/network.rs index b994ca8..ecb0a29 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -13,11 +13,11 @@ use mio::net::{TcpListener, TcpStream}; #[allow(unused_imports)] use log::{trace, debug, info, warn, error}; +use std::net::{SocketAddr, IpAddr, SocketAddrV4, Shutdown}; +use std::collections::HashSet; use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers, Bytes}; -use std::net::{SocketAddr, IpAddr, SocketAddrV4, ToSocketAddrs}; use crate::blockchain::enums::BlockQuality; use crate::blockchain::CHAIN_VERSION; -use std::collections::HashSet; const SERVER: Token = Token(0); const POLL_TIMEOUT: Option = Some(Duration::from_millis(3000)); @@ -57,7 +57,7 @@ impl Network { // States of peer connections, and some data to send when sockets become writable let mut peers = Peers::new(); // Starting peer connections to bootstrap nodes - connect_peers(peers_addrs, &mut poll, &mut peers, &mut unique_token); + peers.connect_peers(peers_addrs, &poll.registry(), &mut unique_token); loop { // Poll Mio for events, blocking until we get an event. @@ -65,7 +65,7 @@ impl Network { // Process each event. for event in events.iter() { - trace!("Event for socket {} is {:?}", event.token().0, &event); + //trace!("Event for socket {} is {:?}", event.token().0, &event); // We can use the token we previously provided to `register` to determine for which socket the event is. match event.token() { SERVER => { @@ -82,10 +82,18 @@ impl Network { } } - info!("Accepted connection from: {}", address); - let token = next(&mut unique_token); - poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll"); - peers.add_peer(token, Peer::new(address, stream, State::Connected, true)); + // If connection is from the same IP and not from loopback we ignore it to avoid connection loops + let local_ip = stream.local_addr().unwrap_or("0.0.0.0:0".parse().unwrap()); + if !local_ip.ip().is_loopback() && local_ip.ip() == address.ip() { + peers.ignore_ip(&address.ip()); + stream.shutdown(Shutdown::Both); + warn!("Detected connection loop, ignoring IP: {}", &address.ip()); + } else { + info!("Accepted connection from: {} to local IP: {}", address, local_ip); + let token = next(&mut unique_token); + poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll"); + peers.add_peer(token, Peer::new(address, stream, State::Connected, true)); + } } Err(_) => {} } @@ -105,8 +113,13 @@ impl Network { // Send pings to idle peers let (height, hash) = { - let context = context.lock().unwrap(); - (context.chain.height(), context.chain.last_hash()) + let mut context = context.lock().unwrap(); + let height = context.chain.height(); + let nodes = peers.get_peers_active_count(); + if nodes > 0 { + context.bus.post(crate::event::Event::NetworkStatus { nodes, blocks: height }); + } + (height, context.chain.last_hash()) }; mine_locker_block(context.clone()); peers.send_pings(poll.registry(), height, hash); @@ -153,7 +166,9 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi State::Idle { .. } => { peer.set_state(State::idle()); } - State::Error => {} + State::Error => { + peers.ignore_peer(registry, &event.token()); + } State::Banned => {} State::Offline { .. } => { peer.set_state(State::offline()); @@ -168,7 +183,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi } if event.is_writable() { - trace!("Socket {} is writable", event.token().0); + //trace!("Socket {} is writable", event.token().0); match peers.get_mut_peer(&event.token()) { None => {} Some(peer) => { @@ -177,11 +192,11 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi debug!("Sending hello to {}", &peer.get_addr()); let data: String = { let c = context.lock().unwrap(); - let message = Message::hand(&c.settings.origin, CHAIN_VERSION, c.settings.public); + let message = Message::hand(&c.settings.origin, CHAIN_VERSION, c.settings.public, peer.get_rand()); serde_json::to_string(&message).unwrap() }; send_message(peer.get_stream(), &data.into_bytes()); - debug!("Sent hello to {}", &peer.get_addr()); + //debug!("Sent hello to {}", &peer.get_addr()); } State::Message { data } => { debug!("Sending data to {}: {}", &peer.get_addr(), &String::from_utf8(data.clone()).unwrap()); @@ -220,7 +235,7 @@ fn read_message(stream: &mut TcpStream) -> Result, ()> { 0 } }; - trace!("Payload size is {}", data_size); + //trace!("Payload size is {}", data_size); if data_size > MAX_PACKET_SIZE || data_size == 0 { return Err(()); } @@ -256,11 +271,10 @@ fn read_message(stream: &mut TcpStream) -> Result, ()> { } } -fn send_message(connection: &mut TcpStream, data: &Vec) { - // TODO handle errors - connection.write_u32::(data.len() as u32).expect("Error sending message"); - connection.write_all(&data).expect("Error writing to socket"); - connection.flush().expect("Error sending message"); +fn send_message(connection: &mut TcpStream, data: &Vec) -> io::Result<()> { + connection.write_u32::(data.len() as u32)?; + connection.write_all(&data)?; + connection.flush() } fn handle_message(context: Arc>, message: Message, peers: &mut Peers, token: &Token) -> State { @@ -270,14 +284,19 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe (context.chain.height(), context.chain.last_hash(), &context.settings.origin.clone(), CHAIN_VERSION) }; match message { - Message::Hand { origin, version, public } => { - if origin.eq(my_origin) && version == my_version { - let peer = peers.get_mut_peer(token).unwrap(); - peer.set_public(public); - State::message(Message::shake(&origin, version, true, my_height)) - } else { - warn!("Handshake from unsupported chain or version"); + Message::Hand { origin, version, public, rand} => { + if peers.is_our_own_connect(&rand) { + warn!("Detected loop connect"); State::Error + } else { + if origin.eq(my_origin) && version == my_version { + let peer = peers.get_mut_peer(token).unwrap(); + peer.set_public(public); + State::message(Message::shake(&origin, version, true, my_height)) + } else { + warn!("Handshake from unsupported chain or version"); + State::Error + } } } Message::Shake { origin, version, ok, height } => { @@ -456,32 +475,6 @@ fn check_block(block: &Block, prev: &Block) -> bool { prev.index == block.index - 1 && prev.hash == block.prev_block_hash } -/// Connecting to configured (bootstrap) peers -fn connect_peers(peers_addrs: Vec, poll: &mut Poll, peers: &mut Peers, unique_token: &mut Token) { - for peer in peers_addrs.iter() { - let addresses: Vec = match peer.to_socket_addrs() { - Ok(peers) => { peers.collect() } - Err(_) => { error!("Can't resolve address {}", &peer); continue; } - }; - - for addr in addresses { - match TcpStream::connect(addr.clone()) { - Ok(mut stream) => { - info!("Created connection to peer {}", &addr); - let token = next(unique_token); - poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap(); - let mut peer = Peer::new(addr, stream, State::Connecting, false); - peer.set_public(true); - peers.add_peer(token, peer); - } - Err(e) => { - error!("Error connecting to peer {}: {}", &addr, e); - } - } - } - } -} - pub(crate) fn next(current: &mut Token) -> Token { let next = current.0; current.0 += 1; diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index d3333e8..f503e78 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -2,13 +2,14 @@ use std::net::SocketAddr; use std::collections::HashMap; use mio::net::TcpStream; use crate::p2p::State; -use crate::Block; +use crate::{Block, commons}; #[derive(Debug)] pub struct Peer { addr: SocketAddr, stream: TcpStream, state: State, + rand: String, height: u64, inbound: bool, public: bool, @@ -19,7 +20,7 @@ pub struct Peer { impl Peer { pub fn new(addr: SocketAddr, stream: TcpStream, state: State, inbound: bool) -> Self { - Peer { addr, stream, state, height: 0, inbound, public: false, active: false, received_block: 0, fork: HashMap::new() } + Peer { addr, stream, state, rand: commons::random_string(6), height: 0, inbound, public: false, active: false, received_block: 0, fork: HashMap::new() } } pub fn get_addr(&self) -> SocketAddr { @@ -42,6 +43,10 @@ impl Peer { self.state = state; } + pub fn get_rand(&self) -> &str { + &self.rand + } + pub fn set_height(&mut self, height: u64) { self.height = height; } diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index 8b238a0..8aac57e 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet}; -use std::net::{SocketAddr, Shutdown}; +use std::net::{SocketAddr, IpAddr, Shutdown, ToSocketAddrs}; use mio::{Token, Interest, Registry}; use mio::net::TcpStream; use crate::p2p::{Peer, State, Message}; @@ -13,14 +13,15 @@ use crate::Bytes; pub struct Peers { peers: HashMap, - new_peers: Vec + new_peers: Vec, + ignored: HashSet } const PING_PERIOD: u64 = 30; impl Peers { pub fn new() -> Self { - Peers { peers: HashMap::new(), new_peers: Vec::new() } + Peers { peers: HashMap::new(), new_peers: Vec::new(), ignored: HashSet::new() } } pub fn add_peer(&mut self, token: Token, peer: Peer) { @@ -65,26 +66,32 @@ impl Peers { debug!("Got {} peers: {:?}", peers.len(), &peers); // TODO make it return error if these peers are wrong and seem like an attack for peer in peers.iter() { - let addr: SocketAddr = peer.parse().expect(&format!("Error parsing peer {}", peer)); + let addr: SocketAddr = match peer.parse() { + Err(_) => { + warn!("Error parsing peer {}", peer); + continue; + } + Ok(addr) => addr + }; if self.peers .iter() - .find(|(_token, peer)| peer.get_addr() == addr) + .find(|(_token, peer)| peer.get_addr().ip() == addr.ip()) .is_some() { - debug!("Skipping address from exchange: {}", &addr); + //debug!("Skipping address from exchange: {}", &addr); continue; } if self.new_peers .iter() - .find(|a| a.clone().eq(&addr)) + .find(|a| a.ip().eq(&addr.ip())) .is_some() { - debug!("Skipping address from exchange: {}", &addr); + //debug!("Skipping address from exchange: {}", &addr); continue; } if skip_addr(&addr) { - debug!("Skipping address from exchange: {}", &addr); + //debug!("Skipping address from exchange: {}", &addr); continue; // Return error in future } let mut found = false; @@ -101,6 +108,15 @@ impl Peers { } } + pub fn is_our_own_connect(&self, rand: &str) -> bool { + match self.peers.values().find(|p| p.get_rand() == rand) { + None => { false } + Some(p) => { + !p.is_inbound() + } + } + } + pub fn get_peers_for_exchange(&self, peer_address: &SocketAddr) -> Vec { let mut result: Vec = Vec::new(); for (_, peer) in self.peers.iter() { @@ -124,6 +140,28 @@ impl Peers { count } + pub fn ignore_peer(&mut self, registry: &Registry, token: &Token) { + let peer = self.peers.get_mut(token).unwrap(); + peer.set_state(State::Banned); + let ip = peer.get_addr().ip().clone(); + self.close_peer(registry, token); + self.ignored.insert(ip); + match self.peers + .iter() + .find(|(_, p)| p.get_addr().ip() == ip) + .map(|(t, _)| t.clone()) { + None => {} + Some(t) => { + self.close_peer(registry, &t); + self.peers.remove(&t); + } + } + } + + pub fn ignore_ip(&mut self, ip: &IpAddr) { + self.ignored.insert(ip.clone()); + } + pub fn skip_peer_connection(&self, addr: &SocketAddr) -> bool { for (_, peer) in self.peers.iter() { if peer.equals(addr) && (!peer.is_public() || peer.active() || peer.disabled()) { @@ -138,7 +176,8 @@ impl Peers { for (token, peer) in self.peers.iter_mut() { match peer.get_state() { State::Idle { from } => { - if from.elapsed().as_secs() >= PING_PERIOD { + let random_time = random::() % PING_PERIOD; + if from.elapsed().as_secs() >= PING_PERIOD + random_time { // Sometimes we check for new peers instead of pinging let random: u8 = random(); let message = if random < 16 { @@ -195,10 +234,10 @@ impl Peers { let addr = peer.get_addr(); match TcpStream::connect(addr.clone()) { Ok(mut stream) => { + info!("Created connection to peer {}, state = {:?}", &addr, peer.get_state()); registry.register(&mut stream, token.clone(), Interest::WRITABLE).unwrap(); peer.set_state(State::Connecting); peer.set_stream(stream); - info!("Created connection to peer {}", &addr); } Err(e) => { error!("Error connecting to peer {}: {}", &addr, e); @@ -213,6 +252,9 @@ impl Peers { return; } for addr in self.new_peers.iter() { + if self.ignored.contains(&addr.ip()) { + continue; + } match TcpStream::connect(addr.clone()) { Ok(mut stream) => { info!("Created connection to peer {}", &addr); @@ -229,6 +271,32 @@ impl Peers { } self.new_peers.clear(); } + + /// Connecting to configured (bootstrap) peers + pub fn connect_peers(&mut self, peers_addrs: Vec, registry: &Registry, unique_token: &mut Token) { + for peer in peers_addrs.iter() { + let addresses: Vec = match peer.to_socket_addrs() { + Ok(peers) => { peers.collect() } + Err(_) => { error!("Can't resolve address {}", &peer); continue; } + }; + + for addr in addresses { + match TcpStream::connect(addr.clone()) { + Ok(mut stream) => { + info!("Created connection to peer {}", &addr); + let token = next(unique_token); + registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); + let mut peer = Peer::new(addr, stream, State::Connecting, false); + peer.set_public(true); + self.add_peer(token, peer); + } + Err(e) => { + error!("Error connecting to peer {}: {}", &addr, e); + } + } + } + } + } } fn skip_addr(addr: &SocketAddr) -> bool { diff --git a/src/web_ui.rs b/src/web_ui.rs index 3323665..30725c9 100644 --- a/src/web_ui.rs +++ b/src/web_ui.rs @@ -165,7 +165,7 @@ fn action_loaded(context: &Arc>, web_view: &mut WebView<()>) { let mut status = Status::new(); let mut c = context.lock().unwrap(); c.bus.register(move |_uuid, e| { - debug!("Got event from bus {:?}", &e); + //debug!("Got event from bus {:?}", &e); let eval = match e { Event::KeyCreated { path, public, hash } | Event::KeyLoaded { path, public, hash } | @@ -213,7 +213,7 @@ fn action_loaded(context: &Arc>, web_view: &mut WebView<()>) { }; if !eval.is_empty() { - debug!("Evaluating {}", &eval); + //debug!("Evaluating {}", &eval); handle.dispatch(move |web_view| { web_view.eval(&eval.replace("\\", "\\\\")) }).expect("Error dispatching!"); diff --git a/src/webview/index.html b/src/webview/index.html index f9c626d..125b06f 100644 --- a/src/webview/index.html +++ b/src/webview/index.html @@ -338,7 +338,7 @@
-
Connecting and syncing...
+
Connecting...