diff --git a/Cargo.toml b/Cargo.toml index 4c22ab7..45c8441 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ chrono = "0.4.9" rand = "0.7.2" sqlite = "0.25.3" uuid = { version = "0.8.2", features = ["serde", "v4"] } +mio = { version = "0.7", features = ["os-poll", "net"] } [build-dependencies] winres = "0.1" diff --git a/alfis.cfg b/alfis.cfg index 8f56749..09a0eeb 100644 --- a/alfis.cfg +++ b/alfis.cfg @@ -1,5 +1,9 @@ { "chain_name": "test", "version_flags": 0, - "key_file": "default.key" + "key_file": "default.key", + "listen": "127.0.0.1:4442", + "peers": [ + "127.0.0.1:44421" + ] } \ No newline at end of file diff --git a/src/block.rs b/src/blockchain/block.rs similarity index 98% rename from src/block.rs rename to src/blockchain/block.rs index aaae5c3..191655c 100644 --- a/src/block.rs +++ b/src/blockchain/block.rs @@ -3,7 +3,6 @@ extern crate serde_json; extern crate num_bigint; extern crate num_traits; -use super::*; use std::fmt::Debug; use chrono::Utc; use serde::{Serialize, Deserialize}; @@ -12,6 +11,7 @@ use num_traits::One; use crypto::sha2::Sha256; use crypto::digest::Digest; use crate::keys::Bytes; +use crate::Transaction; #[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] pub struct Block { diff --git a/src/blockchain.rs b/src/blockchain/blockchain.rs similarity index 86% rename from src/blockchain.rs rename to src/blockchain/blockchain.rs index 6c919c0..a6184cd 100644 --- a/src/blockchain.rs +++ b/src/blockchain/blockchain.rs @@ -1,6 +1,7 @@ -use crate::{Block, Transaction, Bytes, Keystore}; use chrono::Utc; -use sqlite::{Connection, State, Readable, Statement, Error}; +use sqlite::{Connection, Error, Readable, State, Statement}; + +use crate::{Block, Bytes, Keystore, Transaction}; const DB_NAME: &str = "blockchain.db"; @@ -112,6 +113,31 @@ impl Blockchain { statement.next().expect("Error adding transaction to DB"); } + pub fn get_block(&self, index: u64) -> Option { + match self.db.prepare("SELECT * FROM blocks WHERE id=? LIMIT 1;") { + Ok(mut statement) => { + statement.bind(1, index as i64); + while statement.next().unwrap() == State::Row { + return match Self::get_block_from_statement(&mut statement) { + None => { + println!("Something wrong with block in DB!"); + None + } + Some(block) => { + println!("Loaded block: {:?}", &block); + Some(block) + } + } + } + None + } + Err(_) => { + println!("Can't find block {}", index); + None + } + } + } + pub fn is_domain_available(&self, domain: &str, keystore: &Keystore) -> bool { if domain.is_empty() { return false; @@ -146,10 +172,19 @@ impl Blockchain { true } - pub fn get_last_block(&self) -> Option { + pub fn last_block(&self) -> Option { self.last_block.clone() } + pub fn height(&self) -> u64 { + match self.last_block { + None => { 0u64 } + Some(ref block) => { + block.index + } + } + } + /*pub fn check(&self) -> bool { let mut prev_block = None; for block in self.blocks.iter() { @@ -163,6 +198,7 @@ impl Blockchain { }*/ fn check_block(&self, block: &Block, prev_block: &Option) -> bool { + // TODO check if it is already stored, or its height is more than we need if !Self::check_block_hash(block) { return false; } @@ -194,4 +230,4 @@ impl Blockchain { let data = serde_json::to_string(©).unwrap(); Block::hash(data.as_bytes()) == block.hash } -} \ No newline at end of file +} diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs new file mode 100644 index 0000000..545461f --- /dev/null +++ b/src/blockchain/mod.rs @@ -0,0 +1,7 @@ +pub mod transaction; +pub mod block; +pub mod blockchain; + +pub use transaction::Transaction; +pub use block::Block; +pub use blockchain::Blockchain; \ No newline at end of file diff --git a/src/transaction.rs b/src/blockchain/transaction.rs similarity index 100% rename from src/transaction.rs rename to src/blockchain/transaction.rs diff --git a/src/context.rs b/src/context.rs index bf3dfa0..796ce52 100644 --- a/src/context.rs +++ b/src/context.rs @@ -50,7 +50,9 @@ impl Context { pub struct Settings { pub chain_name: String, pub version_flags: u32, - pub key_file: String + pub key_file: String, + pub listen: String, + pub peers: Vec } impl Settings { diff --git a/src/lib.rs b/src/lib.rs index fb3be52..1dcbd3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,20 @@ -mod block; -pub use crate::block::Block; -mod blockchain; +pub use blockchain::block::Block; +pub use blockchain::transaction::Transaction; + pub use crate::blockchain::Blockchain; -pub mod transaction; -pub use crate::transaction::Transaction; -pub mod utils; -pub use crate::utils::*; -pub mod simplebus; -pub use crate::simplebus::*; -pub mod keys; -pub use crate::keys::Keystore; +pub use crate::context::Context; +pub use crate::context::Settings; pub use crate::keys::Bytes; +pub use crate::keys::Keystore; +pub use crate::simplebus::*; +pub use crate::utils::*; + +mod blockchain; +pub mod utils; +pub mod simplebus; +pub mod keys; pub mod miner; pub mod context; pub mod event; +pub mod p2p; -pub use crate::context::Context; -pub use crate::context::Settings; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5531017..e696749 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use web_view::*; use alfis::{Block, Blockchain, Bytes, Context, Keystore, Settings, Transaction}; use alfis::event::Event; use alfis::miner::Miner; +use alfis::p2p::Network; extern crate serde; extern crate serde_json; @@ -37,6 +38,9 @@ fn main() { miner_obj.start_mining_thread(); let miner: Arc> = Arc::new(Mutex::new(miner_obj)); + let mut network = Network::new(context.clone()); + network.start(); + create_genesis_if_needed(&context, &miner); run_interface(context.clone(), miner.clone()); } @@ -44,17 +48,17 @@ fn main() { fn create_genesis_if_needed(context: &Arc>, miner: &Arc>) { // TODO check settings and if there is no mention of bootstrap nodes, generate genesis block let context_guard = context.lock().unwrap(); - if context_guard.get_blockchain().get_last_block().is_none() { + if context_guard.get_blockchain().last_block().is_none() { // If blockchain is empty, we are going to mine a Genesis block create_genesis(miner.clone(), GENESIS_ZONE, &context_guard.get_keystore(), GENESIS_ZONE_DIFFICULTY); } } fn run_interface(context: Arc>, miner: Arc>) { - let file_content = include_str!("index.html"); - let mut styles= inline_style(include_str!("bulma.css")); - styles.push_str(&inline_style(include_str!("loader.css"))); - let scripts = inline_script(include_str!("scripts.js")); + let file_content = include_str!("webview/index.html"); + let mut styles= inline_style(include_str!("webview/bulma.css")); + styles.push_str(&inline_style(include_str!("webview/loader.css"))); + let scripts = inline_script(include_str!("webview/scripts.js")); let html = Content::Html(file_content.to_owned().replace("{styles}", &styles).replace("{scripts}", &scripts)); web_view::builder() diff --git a/src/miner.rs b/src/miner.rs index 2ed8755..b8491a9 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -110,7 +110,7 @@ impl Miner { } // Get last block for mining - let last_block = { context.lock().unwrap().blockchain.get_last_block() }; + let last_block = { context.lock().unwrap().blockchain.last_block() }; match last_block { None => { println!("Mining genesis block"); diff --git a/src/p2p/message.rs b/src/p2p/message.rs new file mode 100644 index 0000000..3718214 --- /dev/null +++ b/src/p2p/message.rs @@ -0,0 +1,47 @@ +extern crate serde; +extern crate serde_json; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Error, + Hand { chain: String, version: u32 }, + Shake { ok: bool, height: u64 }, + Ping { height: u64 }, + Pong { height: u64 }, + GetPeers, + Peers, + GetBlock { index: u64 }, + Block { index: u64, block: String }, +} + +impl Message { + pub fn from_bytes(bytes: Vec) -> Result { + let text = String::from_utf8(bytes).unwrap_or(String::from("Error{}")); + match serde_json::from_str(&text) { + Ok(cmd) => Ok(cmd), + Err(_) => Err(()) + } + } + + pub fn hand(chain: &str, version: u32) -> Self { + Message::Hand { chain: chain.to_owned(), version } + } + + pub fn shake(ok: bool, height: u64) -> Self { + Message::Shake { ok, height } + } + + pub fn ping(height: u64) -> Self { + Message::Ping { height } + } + + pub fn pong(height: u64) -> Self { + Message::Pong { height } + } + + pub fn block(height: u64, str: String) -> Self { + Message::Block { index: height, block: str } + } +} diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs new file mode 100644 index 0000000..bdf99fe --- /dev/null +++ b/src/p2p/mod.rs @@ -0,0 +1,9 @@ +pub mod network; +pub mod message; +pub mod state; +pub mod peer; + +pub use network::Network; +pub use message::Message; +pub use state::State; + diff --git a/src/p2p/network.rs b/src/p2p/network.rs new file mode 100644 index 0000000..a59a6a9 --- /dev/null +++ b/src/p2p/network.rs @@ -0,0 +1,329 @@ +extern crate serde; +extern crate serde_json; + +use std::{io, thread}; +use std::collections::HashMap; +use std::io::{Read, Write}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use mio::{Events, Interest, Poll, Registry, Token}; +use mio::event::Event; +use mio::net::{TcpListener, TcpStream}; +use serde::{Deserialize, Serialize}; + +use crate::{Context, Block}; +use crate::p2p::Message; +use crate::p2p::State; +use crate::p2p::peer::Peer; + +const SERVER: Token = Token(0); +const POLL_TIMEOUT: Option = Some(Duration::from_millis(1000)); + +pub struct Network { + context: Arc> +} + +impl Network { + pub fn new(context: Arc>) -> Self { + Network { context } + } + + pub fn start(&mut self) -> Result<(), String> { + let (listen_addr, peers) = { + let c = self.context.lock().unwrap(); + (c.settings.listen.clone(), c.settings.peers.clone()) + }; + + // Starting server socket + let addr = listen_addr.parse().expect("Error parsing listen address"); + let mut server = TcpListener::bind(addr).expect("Can't bind to address"); + println!("Started node listener on {}", server.local_addr().unwrap()); + + let mut events = Events::with_capacity(64); + let mut poll = Poll::new().expect("Unable to create poll"); + poll.registry().register(&mut server, SERVER, Interest::READABLE).expect("Error registering poll"); + let context = self.context.clone(); + thread::spawn(move || { + // Unique token for each incoming connection. + let mut unique_token = Token(SERVER.0 + 1); + // Map of `Token` -> `TcpStream`. + let mut connections = HashMap::new(); + // States of peer connections, and some data to send when sockets become writable + let mut peer_state: HashMap = HashMap::new(); + // Starting peer connections to bootstrap nodes + for peer in peers.iter() { + match TcpStream::connect(peer.parse().expect("Error parsing peer address")) { + Ok(mut stream) => { + println!("Created connection to peer {}", &peer); + let token = next(&mut unique_token); + poll.registry().register(&mut stream, token, Interest::WRITABLE).unwrap(); + peer_state.insert(token, Peer::new(peer.clone(), State::Connecting)); + connections.insert(token, stream); + } + Err(e) => { + println!("Error connecting to peer {}: {}", &peer, e); + } + } + } + + loop { + // Poll Mio for events, blocking until we get an event. + poll.poll(&mut events, POLL_TIMEOUT).expect("Error polling sockets"); + //println!("Polling finished, got events: {}", !events.is_empty()); + + // Process each event. + for event in events.iter() { + println!("Event for {} is {:?}", event.token().0, &event); + // We can use the token we previously provided to `register` to determine for which socket the event is. + match event.token() { + SERVER => { + // If this is an event for the server, it means a connection is ready to be accepted. + let connection = server.accept(); + match connection { + Ok((mut connection, address)) => { + println!("Accepted connection from: {}", address); + let token = next(&mut unique_token); + poll.registry().register(&mut connection, token, Interest::READABLE).expect("Error registering poll"); + peer_state.insert(token, Peer::new(address.to_string(), State::Connected)); + connections.insert(token, connection); + } + Err(_) => {} + } + } + token => { + match connections.get_mut(&token) { + Some(connection) => { + match handle_connection_event(context.clone(), &mut peer_state, &poll.registry(), connection, &event) { + Ok(result) => { + if !result { + connections.remove(&token); + peer_state.remove(&token); + } + } + Err(err) => {} + } + } + None => { println!("Odd event from poll"); } + } + } + } + } + // Send pings to idle peers + for (token, peer) in peer_state.iter_mut() { + match peer.get_state() { + State::Idle { from } => { + if from.elapsed().as_secs() >= 30 { + let c = context.lock().unwrap(); + peer.set_state(State::message(Message::ping(c.blockchain.height()))); + let mut connection = connections.get_mut(&token).unwrap(); + poll.registry().reregister(connection, token.clone(), Interest::WRITABLE).unwrap(); + } + } + _ => {} + } + } + } + }); + Ok(()) + } +} + +fn handle_connection_event(context: Arc>, peer_state: &mut HashMap, registry: &Registry, connection: &mut TcpStream, event: &Event) -> io::Result { + if event.is_error() { + return Ok(false); + } + + if event.is_readable() { + let data_size = match connection.read_u32::() { + Ok(size) => { size as usize } + Err(e) => { + println!("Error reading from socket! {}", e); + 0 + } + }; + println!("Payload size is {}", data_size); + + // TODO check for very big buffer, make it no more 10Mb + let mut buf = vec![0u8; data_size]; + let mut bytes_read = 0; + loop { + match connection.read(&mut buf[bytes_read..]) { + Ok(bytes) => { + bytes_read += bytes; + } + // Would block "errors" are the OS's way of saying that the connection is not actually ready to perform this I/O operation. + Err(ref err) if would_block(err) => break, + Err(ref err) if interrupted(err) => continue, + // Other errors we'll consider fatal. + Err(err) => return Err(err), + } + } + + if bytes_read == data_size { + match Message::from_bytes(buf) { + Ok(message) => { + println!("Got message from socket {}: {:?}", &event.token().0, &message); + let new_state = handle_message(context.clone(), message); + match new_state { + State::Message { data } => { + if event.is_writable() { + // TODO handle all errors and buffer data to send + send_message(connection, &data); + } else { + registry.reregister(connection, event.token(), Interest::WRITABLE).unwrap(); + let mut peer = peer_state.get_mut(&event.token()).unwrap(); + peer.set_state(State::Message { data }); + } + } + State::Connecting => {} + State::Connected => {} + State::Idle { .. } => { + let mut peer = peer_state.get_mut(&event.token()).unwrap(); + peer.set_state(State::idle()); + } + State::Error => {} + State::Banned => {} + State::Offline { .. } => { + let mut peer = peer_state.get_mut(&event.token()).unwrap(); + peer.set_state(State::offline(1)); + } + } + } + Err(_) => {} + } + } else { + // Consider connection as unreliable + return Ok(false); + } + } + + if event.is_writable() { + println!("Socket {} is writable", event.token().0); + match peer_state.get(&event.token()) { + None => {} + Some(peer) => { + match peer.get_state() { + State::Connecting => { + println!("Hello needed for socket {}", event.token().0); + let data: String = { + let mut c = context.lock().unwrap(); + let message = Message::Hand { chain: c.settings.chain_name.clone(), version: c.settings.version_flags }; + serde_json::to_string(&message).unwrap() + }; + send_message(connection, &data.into_bytes()); + println!("Sent hello through socket {}", event.token().0); + } + State::Message { data } => { + println!("Sending data to socket {}: {}", event.token().0, &String::from_utf8(data.clone()).unwrap()); + send_message(connection, data); + } + State::Connected => {} + State::Idle { from } => { + if from.elapsed().as_secs() >= 30 { + let data: String = { + let mut c = context.lock().unwrap(); + let message = Message::ping(c.blockchain.height()); + serde_json::to_string(&message).unwrap() + }; + send_message(connection, &data.into_bytes()); + } + } + State::Error => {} + State::Banned => {} + State::Offline { .. } => {} + } + } + } + registry.reregister(connection, event.token(), Interest::READABLE).unwrap(); + } + + Ok(true) +} + +fn send_message(connection: &mut TcpStream, data: &Vec) { + // TODO handle errors + connection.write_u32::(data.len() as u32); + connection.write_all(&data).expect("Error writing to socket"); + connection.flush(); +} + +fn handle_message(context: Arc>, message: Message) -> State { + match message { + Message::Hand { chain, version } => { + let context = context.lock().unwrap(); + if chain == context.settings.chain_name && version == context.settings.version_flags { + State::message(Message::shake(true, context.blockchain.height())) + } else { + State::Error + } + } + Message::Shake { ok, height } => { + if ok { + let context = context.lock().unwrap(); + if height > context.blockchain.height() { + State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + } else { + State::idle() + } + } else { + State::Error + } + } + Message::Error => { State::Error } + Message::Ping { height } => { + let context = context.lock().unwrap(); + if height > context.blockchain.height() { + State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + } else { + State::message(Message::pong(context.blockchain.height())) + } + } + Message::Pong { height } => { + let context = context.lock().unwrap(); + if height > context.blockchain.height() { + State::message(Message::GetBlock { index: context.blockchain.height() + 1u64 }) + } else { + State::idle() + } + } + Message::GetPeers => { State::Error } + Message::Peers => { State::Error } + Message::GetBlock { index } => { + let context = context.lock().unwrap(); + match context.blockchain.get_block(index) { + Some(block) => State::message(Message::block(block.index, serde_json::to_string(&block).unwrap())), + None => State::Error + } + } + Message::Block { index, block } => { + let block: Block = match serde_json::from_str(&block) { + Ok(block) => block, + Err(_) => return State::Error + }; + // TODO check if the block is good + let context = context.clone(); + thread::spawn(move || { + let mut context = context.lock().unwrap(); + context.blockchain.add_block(block); + context.bus.post(crate::event::Event::BlockchainChanged) + }); + State::idle() + } + } +} + +fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) +} + +fn would_block(err: &io::Error) -> bool { + err.kind() == io::ErrorKind::WouldBlock +} + +fn interrupted(err: &io::Error) -> bool { + err.kind() == io::ErrorKind::Interrupted +} diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs new file mode 100644 index 0000000..57c98a8 --- /dev/null +++ b/src/p2p/peer.rs @@ -0,0 +1,20 @@ +use crate::p2p::State; + +pub struct Peer { + addr: String, + state: State, +} + +impl Peer { + pub fn new(addr: String, state: State) -> Self { + Peer { addr, state } + } + + pub fn get_state(&self) -> &State { + &self.state + } + + pub fn set_state(&mut self, state: State) { + self.state = state; + } +} \ No newline at end of file diff --git a/src/p2p/state.rs b/src/p2p/state.rs new file mode 100644 index 0000000..34a9a53 --- /dev/null +++ b/src/p2p/state.rs @@ -0,0 +1,38 @@ +use std::time::Instant; +use crate::p2p::Message; + +pub enum State { + Connecting, + Connected, + Idle { from: Instant }, + Message { data: Vec }, + Error, + Banned, + Offline { from: Instant, attempts: usize }, +} + +impl State { + pub fn idle() -> Self { + Self::Idle { from: Instant::now() } + } + + pub fn offline(attempts: usize) -> Self { + Self::Offline { attempts, from: Instant::now() } + } + + pub fn still_offline(state: Self) -> Self { + match state { + State::Offline { attempts, from } => { + Self::Offline { attempts: attempts + 1, from } + } + _ => { + Self::Offline { attempts: 1, from: Instant::now() } + } + } + } + + pub fn message(message: Message) -> Self { + let response = serde_json::to_string(&message).unwrap(); + State::Message {data: Vec::from(response.as_bytes()) } + } +} diff --git a/src/styles.css b/src/styles.css deleted file mode 100644 index ffa8faa..0000000 --- a/src/styles.css +++ /dev/null @@ -1,75 +0,0 @@ -html { - background: #202020; - background-attachment: fixed; -} -html, body { - width: 100%; - height: 100%; - margin: 0; - padding: 0; - background: linear-gradient(0deg, #404040, #090909); - background-attachment: fixed; - overflow: hidden; -} -h1, h2, h3 { color: #dddddd; } -p { color: #dddddd; } -a { color: #dddddd; } - /* Style the tab */ -.tab { - overflow: hidden; - border: 0px; - background: linear-gradient(180deg, #404040, #202020); -} - -/* Style the buttons that are used to open the tab content */ -.tab button { - background: inherit; - color: #dddddd; - float: left; - border: none; - outline: none; - cursor: pointer; - padding: 8px 16px; - transition: 0.3s; -} - -/* Change background color of buttons on hover */ -.tab button:hover { - background: linear-gradient(180deg, #808080, #202020); -} - -/* Create an active/current tablink class */ -.tab button.active { - background: linear-gradient(180deg, #606060, #202020); -} - -/* Style the tab content */ -.tabcontent { - display: none; - padding: 12px 12px; - border: 0px; - border-top: none; - height: 100%; -} - -button { - border: none; - outline: none; - color: #dddddd; - padding: 8px 8px; - background: linear-gradient(180deg, #404040, #202020); -} - -.tabcontent button { - min-width: 200px; -} - -.footer { - position: fixed; - left: 0; - bottom: 0; - width: 100%; - background-color: #404040; - color: #dddddd; - padding: 6px 12px; -} \ No newline at end of file diff --git a/src/bulma.css b/src/webview/bulma.css similarity index 100% rename from src/bulma.css rename to src/webview/bulma.css diff --git a/src/index.html b/src/webview/index.html similarity index 100% rename from src/index.html rename to src/webview/index.html diff --git a/src/loader.css b/src/webview/loader.css similarity index 100% rename from src/loader.css rename to src/webview/loader.css diff --git a/src/scripts.js b/src/webview/scripts.js similarity index 100% rename from src/scripts.js rename to src/webview/scripts.js