From 279b3e87c381d40c2693447cee47d629da45d2ad Mon Sep 17 00:00:00 2001 From: Revertron Date: Thu, 11 Feb 2021 21:51:32 +0100 Subject: [PATCH] Implemented P2P peer exchange. Refactored project structure. --- alfis.cfg | 8 +- src/context.rs | 1 + src/p2p/message.rs | 9 +- src/p2p/mod.rs | 3 + src/p2p/network.rs | 245 +++++++++++++++++++++++------------------- src/p2p/peer.rs | 47 +++++++- src/p2p/peers.rs | 126 ++++++++++++++++++++++ src/p2p/state.rs | 22 ++++ src/webview/bulma.css | 2 +- 9 files changed, 345 insertions(+), 118 deletions(-) create mode 100644 src/p2p/peers.rs diff --git a/alfis.cfg b/alfis.cfg index 09a0eeb..a254567 100644 --- a/alfis.cfg +++ b/alfis.cfg @@ -2,8 +2,12 @@ "chain_name": "test", "version_flags": 0, "key_file": "default.key", - "listen": "127.0.0.1:4442", + "listen": "127.0.0.1:4244", + "public": true, "peers": [ - "127.0.0.1:44421" + "127.0.0.1:44421", + "127.0.0.1:10000", + "127.0.0.1:10001", + "127.0.0.1:10002" ] } \ No newline at end of file diff --git a/src/context.rs b/src/context.rs index 796ce52..c1bf4e0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -52,6 +52,7 @@ pub struct Settings { pub version_flags: u32, pub key_file: String, pub listen: String, + pub public: bool, pub peers: Vec } diff --git a/src/p2p/message.rs b/src/p2p/message.rs index 3718214..3733f99 100644 --- a/src/p2p/message.rs +++ b/src/p2p/message.rs @@ -2,16 +2,17 @@ extern crate serde; extern crate serde_json; use serde::{Deserialize, Serialize}; +use crate::p2p::peer::Peer; #[derive(Debug, Serialize, Deserialize)] pub enum Message { Error, - Hand { chain: String, version: u32 }, + Hand { chain: String, version: u32, public: bool }, Shake { ok: bool, height: u64 }, Ping { height: u64 }, Pong { height: u64 }, GetPeers, - Peers, + Peers { peers: Vec }, GetBlock { index: u64 }, Block { index: u64, block: String }, } @@ -25,8 +26,8 @@ impl Message { } } - pub fn hand(chain: &str, version: u32) -> Self { - Message::Hand { chain: chain.to_owned(), version } + pub fn hand(chain: &str, version: u32, public: bool) -> Self { + Message::Hand { chain: chain.to_owned(), version, public } } pub fn shake(ok: bool, height: u64) -> Self { diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index bdf99fe..878695b 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -2,8 +2,11 @@ pub mod network; pub mod message; pub mod state; pub mod peer; +pub mod peers; pub use network::Network; pub use message::Message; pub use state::State; +pub use peer::Peer; +pub use peers::Peers; diff --git a/src/p2p/network.rs b/src/p2p/network.rs index a59a6a9..482cdb0 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -13,13 +13,13 @@ use mio::event::Event; use mio::net::{TcpListener, TcpStream}; use serde::{Deserialize, Serialize}; -use crate::{Context, Block}; -use crate::p2p::Message; -use crate::p2p::State; -use crate::p2p::peer::Peer; +use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers}; +use std::net::{SocketAddr, IpAddr, SocketAddrV4}; +use std::borrow::BorrowMut; const SERVER: Token = Token(0); -const POLL_TIMEOUT: Option = Some(Duration::from_millis(1000)); +const POLL_TIMEOUT: Option = Some(Duration::from_millis(3000)); +pub const LISTEN_PORT: u16 = 4244; pub struct Network { context: Arc> @@ -31,7 +31,7 @@ impl Network { } pub fn start(&mut self) -> Result<(), String> { - let (listen_addr, peers) = { + let (listen_addr, peers_addrs) = { let c = self.context.lock().unwrap(); (c.settings.listen.clone(), c.settings.peers.clone()) }; @@ -46,63 +46,58 @@ impl Network { poll.registry().register(&mut server, SERVER, Interest::READABLE).expect("Error registering poll"); let context = self.context.clone(); thread::spawn(move || { + // Give UI some time to appear :) + thread::sleep(Duration::from_millis(2000)); // Unique token for each incoming connection. let mut unique_token = Token(SERVER.0 + 1); - // Map of `Token` -> `TcpStream`. - let mut connections = HashMap::new(); // States of peer connections, and some data to send when sockets become writable - let mut peer_state: HashMap = HashMap::new(); + let mut peers = Peers::new(); // Starting peer connections to bootstrap nodes - for peer in peers.iter() { - match TcpStream::connect(peer.parse().expect("Error parsing peer address")) { - Ok(mut stream) => { - println!("Created connection to peer {}", &peer); - let token = next(&mut unique_token); - poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap(); - peer_state.insert(token, Peer::new(peer.clone(), State::Connecting)); - connections.insert(token, stream); - } - Err(e) => { - println!("Error connecting to peer {}: {}", &peer, e); - } - } - } + connect_peers(peers_addrs, &mut poll, &mut peers, &mut unique_token); loop { // Poll Mio for events, blocking until we get an event. poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); - //println!("Polling finished, got events: {}", !events.is_empty()); // Process each event. for event in events.iter() { - println!("Event for {} is {:?}", event.token().0, &event); + println!("Event for socket {} is {:?}", event.token().0, &event); // We can use the token we previously provided to `register` to determine for which socket the event is. match event.token() { SERVER => { // If this is an event for the server, it means a connection is ready to be accepted. let connection = server.accept(); match connection { - Ok((mut connection, address)) => { + Ok((mut stream, mut address)) => { + // Checking if it is an ipv4-mapped ipv6 if yes convert to ipv4 + if address.is_ipv6() { + if let IpAddr::V6(ipv6) = address.ip() { + if let Some(ipv4) = ipv6.to_ipv4() { + address = SocketAddr::V4(SocketAddrV4::new(ipv4, address.port())) + } + } + } + println!("Accepted connection from: {}", address); let token = next(&mut unique_token); - poll.registry().register(&mut connection, token, Interest::READABLE).expect("Error registering poll"); - peer_state.insert(token, Peer::new(address.to_string(), State::Connected)); - connections.insert(token, connection); + poll.registry().register(&mut stream, token, Interest::READABLE).expect("Error registering poll"); + peers.add_peer(token, Peer::new(address, stream, State::Connected, true)); } Err(_) => {} } } token => { - match connections.get_mut(&token) { - Some(connection) => { - match handle_connection_event(context.clone(), &mut peer_state, &poll.registry(), connection, &event) { + match peers.get_mut_peer(&token) { + Some(peer) => { + match handle_connection_event(context.clone(), &mut peers, &poll.registry(), &event) { Ok(result) => { if !result { - connections.remove(&token); - peer_state.remove(&token); + peers.remove_peer(&token); } } - Err(err) => {} + Err(_err) => { + peers.remove_peer(&token); + } } } None => { println!("Odd event from poll"); } @@ -110,88 +105,60 @@ impl Network { } } } + // Send pings to idle peers - for (token, peer) in peer_state.iter_mut() { - match peer.get_state() { - State::Idle { from } => { - if from.elapsed().as_secs() >= 30 { - let c = context.lock().unwrap(); - peer.set_state(State::message(Message::ping(c.blockchain.height()))); - let mut connection = connections.get_mut(&token).unwrap(); - poll.registry().reregister(connection, token.clone(), Interest::WRITABLE).unwrap(); - } - } - _ => {} - } - } + let height = { context.lock().unwrap().blockchain.height() }; + peers.send_pings(poll.registry(), height); + peers.connect_new_peers(poll.registry(), &mut unique_token); } }); Ok(()) } } -fn handle_connection_event(context: Arc>, peer_state: &mut HashMap, registry: &Registry, connection: &mut TcpStream, event: &Event) -> io::Result { - if event.is_error() { +fn handle_connection_event(context: Arc>, peers: &mut Peers, registry: &Registry, event: &Event) -> io::Result { + if event.is_error() || (event.is_read_closed() && event.is_write_closed()) { return Ok(false); } if event.is_readable() { - let data_size = match connection.read_u32::() { - Ok(size) => { size as usize } - Err(e) => { - println!("Error reading from socket! {}", e); - 0 - } + let data = { + let mut peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection"); + let mut stream = peer.get_stream(); + read_message(&mut stream) }; - println!("Payload size is {}", data_size); - // TODO check for very big buffer, make it no more 10Mb - let mut buf = vec![0u8; data_size]; - let mut bytes_read = 0; - loop { - match connection.read(&mut buf[bytes_read..]) { - Ok(bytes) => { - bytes_read += bytes; - } - // Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => break, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(err) => return Err(err), - } - } - - if bytes_read == data_size { - match Message::from_bytes(buf) { + if data.is_ok() { + let data = data.unwrap(); + match Message::from_bytes(data) { Ok(message) => { println!("Got message from socket {}: {:?}", &event.token().0, &message); - let new_state = handle_message(context.clone(), message); + let new_state = handle_message(context.clone(), message, peers, &event.token()); + let mut peer = peers.get_mut_peer(&event.token()).unwrap(); + let mut stream = peer.get_stream(); match new_state { State::Message { data } => { if event.is_writable() { // TODO handle all errors and buffer data to send - send_message(connection, &data); + send_message(stream, &data); } else { - registry.reregister(connection, event.token(), Interest::WRITABLE).unwrap(); - let mut peer = peer_state.get_mut(&event.token()).unwrap(); + registry.reregister(stream, event.token(), Interest::WRITABLE).unwrap(); peer.set_state(State::Message { data }); } } State::Connecting => {} State::Connected => {} State::Idle { .. } => { - let mut peer = peer_state.get_mut(&event.token()).unwrap(); peer.set_state(State::idle()); } State::Error => {} State::Banned => {} State::Offline { .. } => { - let mut peer = peer_state.get_mut(&event.token()).unwrap(); peer.set_state(State::offline(1)); } } } - Err(_) => {} + Err(_) => { return Ok(false); } } } else { // Consider connection as unreliable @@ -200,48 +167,81 @@ fn handle_connection_event(context: Arc>, peer_state: &mut HashMa } if event.is_writable() { - println!("Socket {} is writable", event.token().0); - match peer_state.get(&event.token()) { + //println!("Socket {} is writable", event.token().0); + match peers.get_mut_peer(&event.token()) { None => {} Some(peer) => { - match peer.get_state() { + match peer.get_state().clone() { State::Connecting => { - println!("Hello needed for socket {}", event.token().0); + println!("Sending hello to socket {}", event.token().0); let data: String = { let mut c = context.lock().unwrap(); - let message = Message::Hand { chain: c.settings.chain_name.clone(), version: c.settings.version_flags }; + let message = Message::hand(&c.settings.chain_name, c.settings.version_flags, c.settings.public); serde_json::to_string(&message).unwrap() }; - send_message(connection, &data.into_bytes()); - println!("Sent hello through socket {}", event.token().0); + send_message(peer.get_stream(), &data.into_bytes()); + //println!("Sent hello through socket {}", event.token().0); } State::Message { data } => { println!("Sending data to socket {}: {}", event.token().0, &String::from_utf8(data.clone()).unwrap()); - send_message(connection, data); + send_message(peer.get_stream(), &data); } State::Connected => {} State::Idle { from } => { + println!("Odd version of pings :)"); if from.elapsed().as_secs() >= 30 { let data: String = { let mut c = context.lock().unwrap(); let message = Message::ping(c.blockchain.height()); serde_json::to_string(&message).unwrap() }; - send_message(connection, &data.into_bytes()); + send_message(peer.get_stream(), &data.into_bytes()); } } State::Error => {} State::Banned => {} State::Offline { .. } => {} } + registry.reregister(peer.get_stream(), event.token(), Interest::READABLE).unwrap(); } } - registry.reregister(connection, event.token(), Interest::READABLE).unwrap(); } Ok(true) } +fn read_message(stream: &mut &mut TcpStream) -> Result, Vec> { + let data_size = match stream.read_u32::() { + Ok(size) => { size as usize } + Err(e) => { + println!("Error reading from socket! {}", e); + 0 + } + }; + println!("Payload size is {}", data_size); + + // TODO check for very big buffer, make it no more 10Mb + let mut buf = vec![0u8; data_size]; + let mut bytes_read = 0; + loop { + match stream.read(&mut buf[bytes_read..]) { + Ok(bytes) => { + bytes_read += bytes; + } + // Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation. + Err(ref err) if would_block(err) => break, + Err(ref err) if interrupted(err) => continue, + // Other errors we'll consider fatal. + Err(err) => return Err(buf), + } + } + if buf.len() == data_size { + Ok(buf) + } else { + Err(buf) + } +} + fn send_message(connection: &mut TcpStream, data: &Vec) { // TODO handle errors connection.write_u32::(data.len() as u32); @@ -249,11 +249,17 @@ fn send_message(connection: &mut TcpStream, data: &Vec) { connection.flush(); } -fn handle_message(context: Arc>, message: Message) -> State { +fn handle_message(context: Arc>, message: Message, peers: &mut Peers, token: &Token) -> State { + let my_height = { + let context = context.lock().unwrap(); + context.blockchain.height() + }; match message { - Message::Hand { chain, version } => { + Message::Hand { chain, version, public } => { let context = context.lock().unwrap(); if chain == context.settings.chain_name && version == context.settings.version_flags { + let mut peer = peers.get_mut_peer(token).unwrap(); + peer.set_public(public); State::message(Message::shake(true, context.blockchain.height())) } else { State::Error @@ -261,11 +267,10 @@ fn handle_message(context: Arc>, message: Message) -> State { } Message::Shake { ok, height } => { if ok { - let context = context.lock().unwrap(); - if height > context.blockchain.height() { - State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + if height > my_height { + State::message(Message::GetBlock { index: my_height + 1u64 }) } else { - State::idle() + State::message(Message::GetPeers) } } else { State::Error @@ -273,23 +278,27 @@ fn handle_message(context: Arc>, message: Message) -> State { } Message::Error => { State::Error } Message::Ping { height } => { - let context = context.lock().unwrap(); - if height > context.blockchain.height() { - State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + if height > my_height { + State::message(Message::GetBlock { index: my_height + 1u64 }) } else { - State::message(Message::pong(context.blockchain.height())) + State::message(Message::pong(my_height)) } } Message::Pong { height } => { - let context = context.lock().unwrap(); - if height > context.blockchain.height() { - State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + if height > my_height { + State::message(Message::GetBlock { index: my_height + 1u64 }) } else { State::idle() } } - Message::GetPeers => { State::Error } - Message::Peers => { State::Error } + Message::GetPeers => { + let peer = peers.get_peer(token).unwrap(); + State::message(Message::Peers { peers: peers.get_peers_for_exchange(&peer.get_addr()) }) + } + Message::Peers { peers: new_peers } => { + peers.add_peers_from_exchange(new_peers); + State::idle() + } Message::GetBlock { index } => { let context = context.lock().unwrap(); match context.blockchain.get_block(index) { @@ -314,7 +323,27 @@ fn handle_message(context: Arc>, message: Message) -> State { } } -fn next(current: &mut Token) -> Token { +/// Connecting to configured (bootstrap) peers +fn connect_peers(peers_addrs: Vec, poll: &mut Poll, peers: &mut Peers, unique_token: &mut Token) { + for peer in peers_addrs.iter() { + let addr: SocketAddr = peer.parse().expect(&format!("Error parsing peer address {}", &peer)); + match TcpStream::connect(addr.clone()) { + Ok(mut stream) => { + println!("Created connection to peer {}", &peer); + let token = next(unique_token); + poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap(); + let mut peer = Peer::new(addr, stream, State::Connecting, false); + peer.set_public(true); + peers.add_peer(token, peer); + } + Err(e) => { + println!("Error connecting to peer {}: {}", &peer, e); + } + } + } +} + +pub(crate) fn next(current: &mut Token) -> Token { let next = current.0; current.0 += 1; Token(next) diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 57c98a8..7bb1cb5 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -1,13 +1,28 @@ use crate::p2p::State; +use std::net::SocketAddr; +use mio::net::TcpStream; +use std::sync::RwLock; +#[derive(Debug)] pub struct Peer { - addr: String, + addr: SocketAddr, + stream: TcpStream, state: State, + inbound: bool, + public: bool, } impl Peer { - pub fn new(addr: String, state: State) -> Self { - Peer { addr, state } + pub fn new(addr: SocketAddr, stream: TcpStream, state: State, inbound: bool) -> Self { + Peer { addr, stream, state, inbound, public: false } + } + + pub fn get_addr(&self) -> SocketAddr { + self.addr.clone() + } + + pub fn get_stream(&mut self) -> &mut TcpStream { + &mut self.stream } pub fn get_state(&self) -> &State { @@ -17,4 +32,30 @@ impl Peer { pub fn set_state(&mut self, state: State) { self.state = state; } + + pub fn is_public(&self) -> bool { + self.public + } + + pub fn set_public(&mut self, public: bool) { + self.public = public; + } + + pub fn active(&self) -> bool { + self.state.active() + } + + pub fn disabled(&self) -> bool { + self.state.disabled() + } + + pub fn equals(&self, addr: &SocketAddr) -> bool { + /// If loopback address then we care about ip and port. + /// If regular address then we only care about the ip and ignore the port. + if self.addr.ip().is_loopback() { + self.addr == *addr + } else { + self.addr.ip() == addr.ip() + } + } } \ No newline at end of file diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs new file mode 100644 index 0000000..d31b39b --- /dev/null +++ b/src/p2p/peers.rs @@ -0,0 +1,126 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use mio::{Token, Interest, Registry}; +use mio::net::TcpStream; +use crate::p2p::{Peer, State, Message}; +use crate::p2p::network::LISTEN_PORT; +use crate::p2p::network::next; +use rand::random; + +pub struct Peers { + peers: HashMap, + new_peers: Vec +} + +const PING_PERIOD: u64 = 30; + +impl Peers { + pub fn new() -> Self { + Peers { peers: HashMap::new(), new_peers: Vec::new() } + } + + pub fn add_peer(&mut self, token: Token, peer: Peer) { + self.peers.insert(token, peer); + } + + pub fn get_peer(&self, token: &Token) -> Option<&Peer> { + self.peers.get(token) + } + + pub fn get_mut_peer(&mut self, token: &Token) -> Option<&mut Peer> { + self.peers.get_mut(token) + } + + pub fn remove_peer(&mut self, token: &Token) -> Option { + self.peers.remove(token) + } + + pub fn add_peers_from_exchange(&mut self, peers: Vec) { + println!("Got peers: {:?}", &peers); + // TODO make it return error if these peers are wrong and seem like an attack + for peer in peers.iter() { + let addr: SocketAddr = peer.parse().expect(&format!("Error parsing peer {}", peer)); + if addr.ip().is_loopback() { + continue; // Return error in future + } + let mut found = false; + for (_token, p) in self.peers.iter() { + if p.equals(&addr) { + found = true; + break; + } + } + if found { + continue; + } + self.new_peers.push(addr); + } + } + + pub fn get_peers_for_exchange(&self, peer_address: &SocketAddr) -> Vec { + let mut result: Vec = Vec::new(); + for (_, peer) in self.peers.iter() { + if peer.equals(peer_address) { + continue; + } + if peer.is_public() { + result.push(SocketAddr::new(peer.get_addr().ip(), LISTEN_PORT).to_string()); + } + } + result + } + + pub fn skip_peer_connection(&self, addr: &SocketAddr) -> bool { + for (_, peer) in self.peers.iter() { + if peer.equals(addr) && (!peer.is_public() || peer.active() || peer.disabled()) { + return true; + } + } + false + } + + pub fn send_pings(&mut self, registry: &Registry, height: u64) { + for (token, mut peer) in self.peers.iter_mut() { + match peer.get_state() { + State::Idle { from } => { + if from.elapsed().as_secs() >= PING_PERIOD { + // Sometimes we check for new peers instead of pinging + let random: u8 = random(); + let message = if random < 16 { + Message::GetPeers + } else { + Message::ping(height) + }; + + peer.set_state(State::message(message)); + let mut stream = peer.get_stream(); + registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap(); + } + } + _ => {} + } + } + } + + pub fn connect_new_peers(&mut self, registry: &Registry, unique_token: &mut Token) { + if self.new_peers.is_empty() { + return; + } + for addr in self.new_peers.iter() { + match TcpStream::connect(addr.clone()) { + Ok(mut stream) => { + println!("Created connection to peer {}", &addr); + let token = next(unique_token); + registry.register(&mut stream, token, Interest::WRITABLE).unwrap(); + let mut peer = Peer::new(addr.clone(), stream, State::Connecting, false); + peer.set_public(true); + self.peers.insert(token, peer); + } + Err(e) => { + println!("Error connecting to peer {}: {}", &addr, e); + } + } + } + self.new_peers.clear(); + } +} \ No newline at end of file diff --git a/src/p2p/state.rs b/src/p2p/state.rs index 34a9a53..a171180 100644 --- a/src/p2p/state.rs +++ b/src/p2p/state.rs @@ -1,6 +1,7 @@ use std::time::Instant; use crate::p2p::Message; +#[derive(Debug, Clone, PartialEq)] pub enum State { Connecting, Connected, @@ -35,4 +36,25 @@ impl State { let response = serde_json::to_string(&message).unwrap(); State::Message {data: Vec::from(response.as_bytes()) } } + + pub fn active(&self) -> bool { + match self { + State::Connecting => { true } + State::Connected => { true } + State::Idle { .. } => { true } + State::Message { .. } => { true } + _ => { false } + } + } + + pub fn disabled(&self) -> bool { + match self { + State::Error => { true } + State::Banned => { true } + State::Offline { from, attempts } => { + from.elapsed().as_secs() < 60 // We check offline peers to become online every 5 minutes + } + _ => { false } + } + } } diff --git a/src/webview/bulma.css b/src/webview/bulma.css index 9eb983f..4b0bb65 100644 --- a/src/webview/bulma.css +++ b/src/webview/bulma.css @@ -10831,7 +10831,7 @@ label.panel-block:hover { } /*# sourceMappingURL=bulma.css.map */ -// TODO move to another file +/* TODO move to another file */ .container { margin: 10pt; } \ No newline at end of file