diff --git a/alfis.cfg b/alfis.cfg index 4de36d3..771681d 100644 --- a/alfis.cfg +++ b/alfis.cfg @@ -2,7 +2,7 @@ "chain_name": "test", "origin": "", "version": 0, - "key_file": "default3.key", + "key_file": "default.key", "listen": "127.0.0.1:4244", "public": true, "peers": [ diff --git a/src/blockchain/block.rs b/src/blockchain/block.rs index 5a4f0ef..be33495 100644 --- a/src/blockchain/block.rs +++ b/src/blockchain/block.rs @@ -4,10 +4,7 @@ extern crate num_bigint; extern crate num_traits; use std::fmt::Debug; -use chrono::Utc; use serde::{Serialize, Deserialize}; -use num_bigint::BigUint; -use num_traits::One; use crypto::sha2::Sha256; use crypto::digest::Digest; use crate::keys::Bytes; diff --git a/src/blockchain/blockchain.rs b/src/blockchain/blockchain.rs index e5e0b2b..58f12c0 100644 --- a/src/blockchain/blockchain.rs +++ b/src/blockchain/blockchain.rs @@ -1,12 +1,11 @@ -use chrono::Utc; -use sqlite::{Connection, Error, Readable, State, Statement}; +use sqlite::{Connection, State, Statement}; -use crate::{Block, Bytes, Keystore, Transaction}; +use crate::{Block, Bytes, Keystore, Transaction, Settings}; const DB_NAME: &str = "blockchain.db"; pub struct Blockchain { - origin: String, + origin: Bytes, pub version: u32, pub blocks: Vec, last_block: Option, @@ -14,7 +13,10 @@ pub struct Blockchain { } impl Blockchain { - pub fn new(origin: String, version: u32) -> Self { + pub fn new(settings: &Settings) -> Self { + let origin = settings.get_origin(); + let version = settings.version; + let db = sqlite::open(DB_NAME).expect("Unable to open blockchain DB"); let mut blockchain = Blockchain{ origin, version, blocks: Vec::new(), last_block: None, db}; blockchain.init_db(); @@ -59,62 +61,65 @@ impl Blockchain { } } - pub fn add_block(&mut self, block: Block) { - if self.check_block(&block, &self.last_block) { - println!("Adding block:\n{:?}", &block); - self.blocks.push(block.clone()); - self.last_block = Some(block.clone()); - let transaction = block.transaction.clone(); + pub fn add_block(&mut self, block: Block) -> Result<(), &str> { + if !self.check_block(&block, &self.last_block) { + println!("Bad block found, ignoring:\n{:?}", &block); + return Err("Bad block found, ignoring"); + } + println!("Adding block:\n{:?}", &block); + self.blocks.push(block.clone()); + self.last_block = Some(block.clone()); + let transaction = block.transaction.clone(); - { - // Adding block to DB - let mut statement = self.db.prepare("INSERT INTO blocks (\ + { + // Adding block to DB + let mut statement = self.db.prepare("INSERT INTO blocks (\ id, timestamp, version, difficulty, random,\ nonce, 'transaction', prev_block_hash, hash)\ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);").unwrap(); - statement.bind(1, block.index as i64); - statement.bind(2, block.timestamp as i64); - statement.bind(3, block.version as i64); - statement.bind(4, block.difficulty as i64); - statement.bind(5, block.random as i64); - statement.bind(6, block.nonce as i64); - match &transaction { - None => { statement.bind(7, ""); } - Some(transaction) => { - statement.bind(7, transaction.to_string().as_ref() as &str); - } - } - statement.bind(8, block.prev_block_hash.as_bytes()); - statement.bind(9, block.hash.as_bytes()); - statement.next().expect("Error adding block to DB"); - } - + statement.bind(1, block.index as i64).expect("Error in bind"); + statement.bind(2, block.timestamp as i64).expect("Error in bind"); + statement.bind(3, block.version as i64).expect("Error in bind"); + statement.bind(4, block.difficulty as i64).expect("Error in bind"); + statement.bind(5, block.random as i64).expect("Error in bind"); + statement.bind(6, block.nonce as i64).expect("Error in bind"); match &transaction { - None => {} + None => { statement.bind(7, "").expect("Error in bind"); } Some(transaction) => { - self.add_transaction(transaction); + statement.bind(7, transaction.to_string().as_ref() as &str).expect("Error in bind"); } } - } else { - println!("Bad block found, ignoring:\n{:?}", &block); + statement.bind(8, block.prev_block_hash.as_bytes()).expect("Error in bind"); + statement.bind(9, block.hash.as_bytes()).expect("Error in bind"); + statement.next().expect("Error adding block to DB"); + } + + match &transaction { + None => { + Err("Error adding transaction!") + } + Some(transaction) => { + self.add_transaction(transaction); + Ok(()) + } } } fn add_transaction(&mut self, t: &Transaction) { let mut statement = self.db.prepare("INSERT INTO transactions (identity, confirmation, method, data, pub_key, signature) VALUES (?, ?, ?, ?, ?, ?)").unwrap(); - statement.bind(1, t.identity.as_bytes()); - statement.bind(2, t.confirmation.as_bytes()); - statement.bind(3, t.method.as_ref() as &str); - statement.bind(4, t.data.as_ref() as &str); - statement.bind(5, t.pub_key.as_bytes()); - statement.bind(6, t.signature.as_bytes()); + statement.bind(1, t.identity.as_bytes()).expect("Error in bind"); + statement.bind(2, t.confirmation.as_bytes()).expect("Error in bind"); + statement.bind(3, t.method.as_ref() as &str).expect("Error in bind"); + statement.bind(4, t.data.as_ref() as &str).expect("Error in bind"); + statement.bind(5, t.pub_key.as_bytes()).expect("Error in bind"); + statement.bind(6, t.signature.as_bytes()).expect("Error in bind"); statement.next().expect("Error adding transaction to DB"); } pub fn get_block(&self, index: u64) -> Option { match self.db.prepare("SELECT * FROM blocks WHERE id=? LIMIT 1;") { Ok(mut statement) => { - statement.bind(1, index as i64); + statement.bind(1, index as i64).expect("Error in bind"); while statement.next().unwrap() == State::Row { return match Self::get_block_from_statement(&mut statement) { None => { @@ -142,7 +147,7 @@ impl Blockchain { } let identity_hash = Transaction::hash_identity(domain); let mut statement = self.db.prepare("SELECT pub_key FROM transactions WHERE identity = ? ORDER BY id DESC LIMIT 1;").unwrap(); - statement.bind(1, identity_hash.as_bytes()); + statement.bind(1, identity_hash.as_bytes()).expect("Error in bind"); while let State::Row = statement.next().unwrap() { let pub_key = Bytes::from_bytes(statement.read::>(0).unwrap().as_slice()); if !pub_key.eq(&keystore.get_public()) { @@ -159,7 +164,7 @@ impl Blockchain { // Checking for available zone, for this domain let identity_hash = Transaction::hash_identity(parts.first().unwrap()); let mut statement = self.db.prepare("SELECT identity FROM transactions WHERE identity = ? ORDER BY id DESC LIMIT 1;").unwrap(); - statement.bind(1, identity_hash.as_bytes()); + statement.bind(1, identity_hash.as_bytes()).expect("Error in bind"); while let State::Row = statement.next().unwrap() { // If there is such a zone return true; @@ -196,15 +201,36 @@ impl Blockchain { }*/ fn check_block(&self, block: &Block, prev_block: &Option) -> bool { - // TODO check if it is already stored, or its height is more than we need - if !Self::check_block_hash(block) { + if !check_block_hash(block) { + println!("{:?} has wrong hash! Ignoring!", &block); return false; } - if prev_block.is_none() { - return true; + // TODO make transaction not Optional + let transaction = block.transaction.as_ref().unwrap(); + if !check_transaction_signature(&transaction) { + println!("{:?} has wrong signature! Ignoring block!", &transaction); + return false; } + match prev_block { + None => { + if block.index != 0 { + return false; + } - return block.prev_block_hash == prev_block.as_ref().unwrap().hash; + if self.origin.is_zero() && block.index > 0 { + panic!("Error adding block {} without origin! Please, fill in origin in config!", block.index); + } + + self.origin.is_zero() || block.hash.eq(&self.origin) + } + Some(prev) => { + if block.index != prev.index + 1 { + println!("Discarding block with index {} as not needed now", block.index); + return false; + } + block.prev_block_hash.eq(&prev.hash) + } + } } fn get_block_from_statement(statement: &mut Statement) -> Option { @@ -219,12 +245,18 @@ impl Blockchain { let hash = Bytes::from_bytes(statement.read::>(8).unwrap().as_slice()); Some(Block::from_all_params(index, timestamp, version, difficulty, random, nonce, prev_block_hash, hash, transaction)) } - - pub fn check_block_hash(block: &Block) -> bool { - // We need to clear Hash value to rehash it without it for check :( - let mut copy: Block = block.clone(); - copy.hash = Bytes::default(); - let data = serde_json::to_string(©).unwrap(); - Block::hash(data.as_bytes()) == block.hash - } +} + +pub fn check_block_hash(block: &Block) -> bool { + let mut copy: Block = block.clone(); + copy.hash = Bytes::default(); + let data = serde_json::to_string(©).unwrap(); + Block::hash(data.as_bytes()) == block.hash +} + +pub fn check_transaction_signature(transaction: &Transaction) -> bool { + let mut copy = transaction.clone(); + copy.signature = Bytes::zero64(); + let data = copy.get_bytes(); + Keystore::check(data.as_slice(), copy.pub_key.as_bytes(), transaction.signature.as_bytes()) } diff --git a/src/blockchain/transaction.rs b/src/blockchain/transaction.rs index 8d623e7..1d12e43 100644 --- a/src/blockchain/transaction.rs +++ b/src/blockchain/transaction.rs @@ -85,12 +85,12 @@ impl fmt::Debug for Transaction { impl Serialize for Transaction { fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> where S: Serializer { let mut structure = serializer.serialize_struct("Transaction", 6).unwrap(); - structure.serialize_field("identity", &self.identity); - structure.serialize_field("confirmation", &self.confirmation); - structure.serialize_field("method", &self.method); - structure.serialize_field("data", &self.data); - structure.serialize_field("pub_key", &self.pub_key); - structure.serialize_field("signature", &self.signature); + structure.serialize_field("identity", &self.identity)?; + structure.serialize_field("confirmation", &self.confirmation)?; + structure.serialize_field("method", &self.method)?; + structure.serialize_field("data", &self.data)?; + structure.serialize_field("pub_key", &self.pub_key)?; + structure.serialize_field("signature", &self.signature)?; structure.end() } } \ No newline at end of file diff --git a/src/context.rs b/src/context.rs index 4fe0354..8e2b21f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,8 +1,6 @@ use crate::{Keystore, Blockchain, Bus, Bytes}; use crate::event::Event; -use std::collections::HashMap; -use serde::{Serialize, Deserialize, Serializer, Deserializer}; -use serde::de::Error; +use serde::{Serialize, Deserialize}; use std::fs::File; use std::io::Read; @@ -65,7 +63,7 @@ impl Settings { match File::open(file_name) { Ok(mut file) => { let mut text = String::new(); - file.read_to_string(&mut text); + file.read_to_string(&mut text).unwrap(); let loaded = serde_json::from_str(&text); return if loaded.is_ok() { Some(loaded.unwrap()) @@ -78,6 +76,9 @@ impl Settings { } pub fn get_origin(&self) -> Bytes { + if self.origin.eq("") { + return Bytes::zero32(); + } let origin = crate::from_hex(&self.origin).expect("Wrong origin in settings"); Bytes::from_bytes(origin.as_slice()) } diff --git a/src/keys.rs b/src/keys.rs index 48fe3b2..9094079 100644 --- a/src/keys.rs +++ b/src/keys.rs @@ -7,7 +7,7 @@ use rand::{thread_rng, Rng}; use std::fmt; use std::fs; use std::fs::File; -use std::io::{Read, Write}; +use std::io::Write; use std::path::Path; use serde::export::fmt::Error; use serde::{Serialize, Deserialize, Serializer, Deserializer}; @@ -26,7 +26,7 @@ pub struct Keystore { impl Keystore { pub fn new() -> Self { - let mut buf = [0u8; 64]; + let mut buf = [0u8; 32]; let mut rng = thread_rng(); rng.fill(&mut buf); let (private, public) = keypair(&buf); @@ -50,11 +50,11 @@ impl Keystore { } //TODO Implement error conditions - pub fn save(&self, filename: &str, password: &str) { + pub fn save(&self, filename: &str, _password: &str) { match File::create(Path::new(filename)) { Ok(mut f) => { //TODO implement key encryption - f.write_all(&self.seed); + f.write_all(&self.seed).expect("Error saving keystore"); } Err(_) => { println!("Error saving key file!"); } } @@ -69,10 +69,10 @@ impl Keystore { } pub fn sign(&self, message: &[u8]) -> [u8; 64] { - signature(message, &self.private_key.data) + signature(message, self.private_key.data.as_slice()) } - pub fn check(&self, message: &[u8], public_key: &[u8], signature: &[u8]) -> bool { + pub fn check(message: &[u8], public_key: &[u8], signature: &[u8]) -> bool { verify(message, public_key, signature) } @@ -117,7 +117,7 @@ impl Bytes { /// Returns a byte slice of the hash contents. pub fn as_bytes(&self) -> &[u8] { - &self.data + self.data.as_slice() } pub fn zero32() -> Self { @@ -189,3 +189,12 @@ impl<'dd> Deserialize<'dd> for Bytes { deserializer.deserialize_str(BytesVisitor) } } + +#[test] +pub fn test_signature() { + let keystore: Keystore = Keystore::new(); + let data = b"{ identity: 178135D209C697625E3EC71DA5C760382E54936F824EE5083908DA66B14ECE18,\ + confirmation: A4A0AFECD1A511825226F0D3437C6C6BDAE83554040AA7AEB49DEFEAB0AE9EA4 }"; + let signature = keystore.sign(data); + assert!(Keystore::check(data, keystore.get_public().as_bytes(), &signature), "Wrong signature!") +} diff --git a/src/main.rs b/src/main.rs index 51249b3..50fe737 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,15 @@ #![windows_subsystem = "windows"] extern crate web_view; -use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; -use rand::{Rng, RngCore}; -use serde::{Deserialize, Serialize}; +use rand::RngCore; +use serde::{Deserialize}; use web_view::*; -use alfis::{Block, Blockchain, Bytes, Context, Keystore, Settings, Transaction}; +use alfis::{Blockchain, Bytes, Context, Keystore, Settings, Transaction}; use alfis::event::Event; use alfis::miner::Miner; use alfis::p2p::Network; @@ -31,7 +30,7 @@ fn main() { None => { generate_key(KEYSTORE_DIFFICULTY, Arc::new(AtomicBool::new(true))).expect("Could not load or generate keypair") } Some(keystore) => { keystore } }; - let blockchain: Blockchain = Blockchain::new(settings.origin.clone(), settings.version); + let blockchain: Blockchain = Blockchain::new(&settings); let context: Arc> = Arc::new(Mutex::new(Context::new(settings, keystore, blockchain))); let mut miner_obj = Miner::new(context.clone()); @@ -39,7 +38,7 @@ fn main() { let miner: Arc> = Arc::new(Mutex::new(miner_obj)); let mut network = Network::new(context.clone()); - network.start(); + network.start().expect("Error starting network component"); create_genesis_if_needed(&context, &miner); run_interface(context.clone(), miner.clone()); @@ -76,8 +75,8 @@ fn run_interface(context: Arc>, miner: Arc>) { println!("Command {}", arg); match serde_json::from_str(arg).unwrap() { Loaded => { - web_view.eval("showMiningIndicator(false);"); - let mut handle = web_view.handle(); + web_view.eval("showMiningIndicator(false);").expect("Error evaluating!"); + let handle = web_view.handle(); let mut c = context.lock().unwrap(); c.bus.register(move |_uuid, e| { println!("Got event from bus {:?}", &e); @@ -89,9 +88,9 @@ fn run_interface(context: Arc>, miner: Arc>) { _ => { false } }; handle.dispatch(move |web_view| { - web_view.eval(&format!("showMiningIndicator({});", visible)); + web_view.eval(&format!("showMiningIndicator({});", visible)).expect("Error evaluating!"); return WVResult::Ok(()); - }); + }).expect("Error dispatching!"); true }); } @@ -112,11 +111,11 @@ fn run_interface(context: Arc>, miner: Arc>) { CheckDomain { name} => { let c = context.lock().unwrap(); let available = c.get_blockchain().is_domain_available(&name, &c.get_keystore()); - web_view.eval(&format!("domainAvailable({})", available)); + web_view.eval(&format!("domainAvailable({})", available)).expect("Error evaluating!"); } CreateDomain { name, records, tags } => { let keystore = { - let mut guard = context.lock().unwrap(); + let guard = context.lock().unwrap(); guard.get_keystore() }; create_domain(miner.clone(), name, records, &keystore); @@ -152,7 +151,7 @@ fn create_domain>(miner: Arc>, name: S, data: S, ke println!("Generating domain {}", name); //let rec_vector: Vec = records.into().trim().split("\n").map(|s| s.trim()).map(String::from).collect(); //let tags_vector: Vec = tags.into().trim().split(",").map(|s| s.trim()).map(String::from).collect(); - let mut transaction = { create_transaction(keystore, name, "domain".into(), data.into()) }; + let transaction = { create_transaction(keystore, name, "domain".into(), data.into()) }; let mut miner_guard = miner.lock().unwrap(); miner_guard.add_transaction(transaction); } @@ -168,10 +167,10 @@ fn create_transaction>(keystore: &Keystore, name: S, method: S, } fn create_key(context: Arc>, filename: &str, password: &str) { - let mut mining = Arc::new(AtomicBool::new(true)); + let mining = Arc::new(AtomicBool::new(true)); + { context.lock().unwrap().bus.post(Event::KeyGeneratorStarted); } for _ in 0..num_cpus::get() { let context = context.clone(); - { context.lock().unwrap().bus.post(Event::KeyGeneratorStarted); } let filename= filename.to_owned(); let password= password.to_owned(); let mining = mining.clone(); @@ -201,7 +200,7 @@ fn create_key(context: Arc>, filename: &str, password: &str) { fn generate_key(difficulty: usize, mining: Arc) -> Option { let mut rng = rand::thread_rng(); - let mut buf = [0u8; 64]; + let mut buf = [0u8; 32]; loop { rng.fill_bytes(&mut buf); let keystore = Keystore::from_bytes(&buf); @@ -213,7 +212,6 @@ fn generate_key(difficulty: usize, mining: Arc) -> Option return None; } } - None } #[derive(Deserialize)] diff --git a/src/miner.rs b/src/miner.rs index 2431431..8e378ed 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -8,15 +8,12 @@ use crypto::digest::Digest; use crypto::sha2::Sha256; use num_cpus; -use crate::{Block, Bytes, Context, hash_is_good, Keystore, Transaction}; +use crate::{Block, Bytes, Context, hash_is_good, Transaction}; use crate::event::Event; pub struct Miner { context: Arc>, - keystore: Keystore, - version: u32, transactions: Arc>>, - last_block: Option, running: Arc, mining: Arc, cond_var: Arc @@ -24,13 +21,9 @@ pub struct Miner { impl Miner { pub fn new(context: Arc>) -> Self { - let c = context.lock().unwrap(); Miner { context: context.clone(), - keystore: c.keystore.clone(), - version: c.settings.version, transactions: Arc::new(Mutex::new(Vec::new())), - last_block: c.blockchain.blocks.last().cloned(), running: Arc::new(AtomicBool::new(false)), mining: Arc::new(AtomicBool::new(false)), cond_var: Arc::new(Condvar::new()) @@ -65,19 +58,17 @@ impl Miner { let mut lock = transactions.lock().unwrap(); if lock.len() > 0 { - println!("Starting to mine some transaction"); + println!("Got new transaction to mine"); let transaction = lock.remove(0); mining.store(true, Ordering::Relaxed); Miner::mine_internal(context.clone(), transactions.clone(), transaction, mining.clone(), cond_var.clone()); } else { - println!("Waiting for transactions"); - cond_var.wait(lock); - println!("Got notified on new transaction"); + let _ = cond_var.wait(lock).expect("Error in wait lock!"); } } }); let mining = self.mining.clone(); - self.context.lock().unwrap().bus.register(move |uuid, e| { + self.context.lock().unwrap().bus.register(move |_uuid, e| { if e == Event::ActionStopMining { mining.store(false, Ordering::Relaxed); } @@ -90,13 +81,11 @@ impl Miner { } fn mine_internal(context: Arc>, transactions: Arc>>, mut transaction: Transaction, mining: Arc, cond_var: Arc) { - let mut last_block_time = 0i64; - let mut version= 0u32; - { + let version= { let mut c = context.lock().unwrap(); c.bus.post(Event::MinerStarted); - version = c.settings.version; - } + c.settings.version + }; let block = { if transaction.signature.is_zero() { // Signing it with private key from Keystore @@ -114,7 +103,6 @@ impl Miner { Block::new(0, Utc::now().timestamp(), version, Bytes::zero32(), Some(transaction.clone())) }, Some(block) => { - last_block_time = block.timestamp; // Creating a block with that signed transaction Block::new(block.index + 1, Utc::now().timestamp(), version, block.hash.clone(), Some(transaction.clone())) }, @@ -134,11 +122,10 @@ impl Miner { let cond_var = cond_var.clone(); thread::spawn(move || { live_threads.fetch_add(1, Ordering::Relaxed); - let mut count = 0u32; - match find_hash(&mut Sha256::new(), block, last_block_time, mining.clone()) { + match find_hash(&mut Sha256::new(), block, mining.clone()) { None => { println!("Mining did not find suitable hash or was stopped"); - count = live_threads.fetch_sub(1, Ordering::Relaxed); + let count = live_threads.fetch_sub(1, Ordering::Relaxed); // If this is the last thread, but mining was not stopped by another thread if count == 0 && mining.load(Ordering::Relaxed) { // If all threads came empty with mining we return transaction to the queue @@ -148,9 +135,8 @@ impl Miner { } }, Some(block) => { - count = live_threads.fetch_sub(1, Ordering::Relaxed); let mut context = context.lock().unwrap(); - context.blockchain.add_block(block); + context.blockchain.add_block(block).expect("Error adding fresh mined block!"); context.bus.post(Event::MinerStopped); mining.store(false, Ordering::Relaxed); }, @@ -160,22 +146,16 @@ impl Miner { } } -fn find_hash(digest: &mut dyn Digest, mut block: Block, prev_block_time: i64, running: Arc) -> Option { +fn find_hash(digest: &mut dyn Digest, mut block: Block, running: Arc) -> Option { let mut buf: [u8; 32] = [0; 32]; block.random = rand::random(); println!("Mining block {}", serde_json::to_string(&block).unwrap()); - //let start_difficulty = block.difficulty; for nonce in 0..std::u64::MAX { if !running.load(Ordering::Relaxed) { return None; } block.timestamp = Utc::now().timestamp(); block.nonce = nonce; - // if nonce % 1000 == 0 { - // println!("Nonce {}", nonce); - // } - // TODO uncomment for real run - //block.difficulty = start_difficulty + get_time_difficulty(prev_block_time, block.timestamp); digest.reset(); digest.input(serde_json::to_string(&block).unwrap().as_bytes()); @@ -186,13 +166,4 @@ fn find_hash(digest: &mut dyn Digest, mut block: Block, prev_block_time: i64, ru } } None -} - -fn get_time_difficulty(prev_time: i64, now: i64) -> usize { - let diff = now - prev_time; - if diff < 900_000 { - (900_000 as usize - diff as usize) / 60_000 - } else { - 0 - } } \ No newline at end of file diff --git a/src/p2p/message.rs b/src/p2p/message.rs index c5d841f..8309d72 100644 --- a/src/p2p/message.rs +++ b/src/p2p/message.rs @@ -2,7 +2,6 @@ extern crate serde; extern crate serde_json; use serde::{Deserialize, Serialize}; -use crate::p2p::peer::Peer; #[derive(Debug, Serialize, Deserialize)] pub enum Message { diff --git a/src/p2p/network.rs b/src/p2p/network.rs index 553b22d..34a331b 100644 --- a/src/p2p/network.rs +++ b/src/p2p/network.rs @@ -2,7 +2,6 @@ extern crate serde; extern crate serde_json; use std::{io, thread}; -use std::collections::HashMap; use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -11,11 +10,9 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use mio::{Events, Interest, Poll, Registry, Token}; use mio::event::Event; use mio::net::{TcpListener, TcpStream}; -use serde::{Deserialize, Serialize}; use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers}; use std::net::{SocketAddr, IpAddr, SocketAddrV4}; -use std::borrow::BorrowMut; const SERVER: Token = Token(0); const POLL_TIMEOUT: Option = Some(Duration::from_millis(3000)); @@ -88,7 +85,7 @@ impl Network { } token => { match peers.get_mut_peer(&token) { - Some(peer) => { + Some(_peer) => { match handle_connection_event(context.clone(), &mut peers, &poll.registry(), &event) { Ok(result) => { if !result { @@ -123,7 +120,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi if event.is_readable() { let data = { - let mut peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection"); + let peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection"); let mut stream = peer.get_stream(); read_message(&mut stream) }; @@ -134,8 +131,8 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi Ok(message) => { println!("Got message from socket {}: {:?}", &event.token().0, &message); let new_state = handle_message(context.clone(), message, peers, &event.token()); - let mut peer = peers.get_mut_peer(&event.token()).unwrap(); - let mut stream = peer.get_stream(); + let peer = peers.get_mut_peer(&event.token()).unwrap(); + let stream = peer.get_stream(); match new_state { State::Message { data } => { if event.is_writable() { @@ -154,7 +151,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi State::Error => {} State::Banned => {} State::Offline { .. } => { - peer.set_state(State::offline(1)); + peer.set_state(State::offline()); } } } @@ -175,7 +172,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi State::Connecting => { println!("Sending hello to socket {}", event.token().0); let data: String = { - let mut c = context.lock().unwrap(); + let c = context.lock().unwrap(); let message = Message::hand(&c.settings.origin, c.settings.version, c.settings.public); serde_json::to_string(&message).unwrap() }; @@ -191,7 +188,7 @@ fn handle_connection_event(context: Arc>, peers: &mut Peers, regi println!("Odd version of pings :)"); if from.elapsed().as_secs() >= 30 { let data: String = { - let mut c = context.lock().unwrap(); + let c = context.lock().unwrap(); let message = Message::ping(c.blockchain.height()); serde_json::to_string(&message).unwrap() }; @@ -232,7 +229,7 @@ fn read_message(stream: &mut &mut TcpStream) -> Result, Vec> { Err(ref err) if would_block(err) => break, Err(ref err) if interrupted(err) => continue, // Other errors we'll consider fatal. - Err(err) => return Err(buf), + Err(_) => return Err(buf), } } if buf.len() == data_size { @@ -244,29 +241,30 @@ fn read_message(stream: &mut &mut TcpStream) -> Result, Vec> { fn send_message(connection: &mut TcpStream, data: &Vec) { // TODO handle errors - connection.write_u32::(data.len() as u32); + connection.write_u32::(data.len() as u32).expect("Error sending message"); connection.write_all(&data).expect("Error writing to socket"); - connection.flush(); + connection.flush().expect("Error sending message"); } fn handle_message(context: Arc>, message: Message, peers: &mut Peers, token: &Token) -> State { - let my_height = { + let (my_height, my_origin, my_version) = { let context = context.lock().unwrap(); - context.blockchain.height() + (context.blockchain.height(), &context.settings.origin.clone(), context.settings.version) }; match message { - Message::Hand { origin: origin, version, public } => { - let context = context.lock().unwrap(); - if origin == context.settings.origin && version == context.settings.version { - let mut peer = peers.get_mut_peer(token).unwrap(); + Message::Hand { origin, version, public } => { + if origin.eq(my_origin) && version == my_version { + let peer = peers.get_mut_peer(token).unwrap(); peer.set_public(public); - State::message(Message::shake(&context.settings.origin, context.settings.version, true, context.blockchain.height())) + State::message(Message::shake(&origin, version, true, my_height)) } else { State::Error } } Message::Shake { origin, version, ok, height } => { - // TODO check origin and version for compatibility + if origin.ne(my_origin) || version != my_version { + return State::Error; + } if ok { if height > my_height { State::message(Message::GetBlock { index: my_height + 1u64 }) @@ -308,6 +306,7 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe } } Message::Block { index, block } => { + println!("Received block {}", index); let block: Block = match serde_json::from_str(&block) { Ok(block) => block, Err(_) => return State::Error @@ -316,8 +315,10 @@ fn handle_message(context: Arc>, message: Message, peers: &mut Pe let context = context.clone(); thread::spawn(move || { let mut context = context.lock().unwrap(); - context.blockchain.add_block(block); - context.bus.post(crate::event::Event::BlockchainChanged) + match context.blockchain.add_block(block) { + Ok(_) => { context.bus.post(crate::event::Event::BlockchainChanged); } + Err(_) => { println!("Error adding received block"); } + } }); State::idle() } diff --git a/src/p2p/peer.rs b/src/p2p/peer.rs index 7bb1cb5..6f6c4c9 100644 --- a/src/p2p/peer.rs +++ b/src/p2p/peer.rs @@ -1,7 +1,6 @@ use crate::p2p::State; use std::net::SocketAddr; use mio::net::TcpStream; -use std::sync::RwLock; #[derive(Debug)] pub struct Peer { @@ -49,9 +48,9 @@ impl Peer { self.state.disabled() } + /// If loopback address then we care about ip and port. + /// If regular address then we only care about the ip and ignore the port. pub fn equals(&self, addr: &SocketAddr) -> bool { - /// If loopback address then we care about ip and port. - /// If regular address then we only care about the ip and ignore the port. if self.addr.ip().is_loopback() { self.addr == *addr } else { diff --git a/src/p2p/peers.rs b/src/p2p/peers.rs index d31b39b..c079b97 100644 --- a/src/p2p/peers.rs +++ b/src/p2p/peers.rs @@ -80,7 +80,7 @@ impl Peers { } pub fn send_pings(&mut self, registry: &Registry, height: u64) { - for (token, mut peer) in self.peers.iter_mut() { + for (token, peer) in self.peers.iter_mut() { match peer.get_state() { State::Idle { from } => { if from.elapsed().as_secs() >= PING_PERIOD { @@ -93,7 +93,7 @@ impl Peers { }; peer.set_state(State::message(message)); - let mut stream = peer.get_stream(); + let stream = peer.get_stream(); registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap(); } } diff --git a/src/p2p/state.rs b/src/p2p/state.rs index a171180..ef169f7 100644 --- a/src/p2p/state.rs +++ b/src/p2p/state.rs @@ -9,7 +9,7 @@ pub enum State { Message { data: Vec }, Error, Banned, - Offline { from: Instant, attempts: usize }, + Offline { from: Instant }, } impl State { @@ -17,19 +17,8 @@ impl State { Self::Idle { from: Instant::now() } } - pub fn offline(attempts: usize) -> Self { - Self::Offline { attempts, from: Instant::now() } - } - - pub fn still_offline(state: Self) -> Self { - match state { - State::Offline { attempts, from } => { - Self::Offline { attempts: attempts + 1, from } - } - _ => { - Self::Offline { attempts: 1, from: Instant::now() } - } - } + pub fn offline() -> Self { + Self::Offline { from: Instant::now() } } pub fn message(message: Message) -> Self { @@ -51,7 +40,7 @@ impl State { match self { State::Error => { true } State::Banned => { true } - State::Offline { from, attempts } => { + State::Offline { from} => { from.elapsed().as_secs() < 60 // We check offline peers to become online every 5 minutes } _ => { false } diff --git a/src/simplebus.rs b/src/simplebus.rs index 063b9be..c0ed4dd 100644 --- a/src/simplebus.rs +++ b/src/simplebus.rs @@ -1,4 +1,3 @@ -use crate::event::Event; use uuid::Uuid; use std::collections::HashMap; @@ -11,7 +10,7 @@ impl Bus { Bus { listeners: HashMap::new() } } - pub fn register(&mut self, mut closure: F) -> Uuid where F: FnMut(&Uuid, T) -> bool + Send + Sync + 'static { + pub fn register(&mut self, closure: F) -> Uuid where F: FnMut(&Uuid, T) -> bool + Send + Sync + 'static { let uuid = Uuid::new_v4(); self.listeners.insert(uuid.clone(), Box::new(closure)); uuid @@ -30,7 +29,6 @@ impl Bus { mod tests { use std::sync::{Arc, Mutex}; - use std::sync::atomic::{AtomicI32, Ordering}; use std::thread; use std::time::Duration; @@ -39,8 +37,8 @@ mod tests { #[test] fn test1() { - let mut string = Arc::new(Mutex::new(String::from("start"))); - let mut bus = Arc::new(Mutex::new(Bus::new())); + let string = Arc::new(Mutex::new(String::from("start"))); + let bus = Arc::new(Mutex::new(Bus::new())); let string_copy = string.clone(); { bus.lock().unwrap().register(move |_uuid, e| { @@ -56,7 +54,7 @@ mod tests { bus2.lock().unwrap().post(Event::BlockchainChanged); }); - let mut guard = string.lock().unwrap(); + let guard = string.lock().unwrap(); thread::sleep(Duration::from_millis(100)); println!("string = {}", &guard); }