Implemented block adding check. Cleared many warnings, cleaned code.

pull/2/head
Revertron 3 years ago
parent 1331f44b0e
commit abb2455d1c

@ -2,7 +2,7 @@
"chain_name": "test",
"origin": "",
"version": 0,
"key_file": "default3.key",
"key_file": "default.key",
"listen": "127.0.0.1:4244",
"public": true,
"peers": [

@ -4,10 +4,7 @@ extern crate num_bigint;
extern crate num_traits;
use std::fmt::Debug;
use chrono::Utc;
use serde::{Serialize, Deserialize};
use num_bigint::BigUint;
use num_traits::One;
use crypto::sha2::Sha256;
use crypto::digest::Digest;
use crate::keys::Bytes;

@ -1,12 +1,11 @@
use chrono::Utc;
use sqlite::{Connection, Error, Readable, State, Statement};
use sqlite::{Connection, State, Statement};
use crate::{Block, Bytes, Keystore, Transaction};
use crate::{Block, Bytes, Keystore, Transaction, Settings};
const DB_NAME: &str = "blockchain.db";
pub struct Blockchain {
origin: String,
origin: Bytes,
pub version: u32,
pub blocks: Vec<Block>,
last_block: Option<Block>,
@ -14,7 +13,10 @@ pub struct Blockchain {
}
impl Blockchain {
pub fn new(origin: String, version: u32) -> Self {
pub fn new(settings: &Settings) -> Self {
let origin = settings.get_origin();
let version = settings.version;
let db = sqlite::open(DB_NAME).expect("Unable to open blockchain DB");
let mut blockchain = Blockchain{ origin, version, blocks: Vec::new(), last_block: None, db};
blockchain.init_db();
@ -59,62 +61,65 @@ impl Blockchain {
}
}
pub fn add_block(&mut self, block: Block) {
if self.check_block(&block, &self.last_block) {
println!("Adding block:\n{:?}", &block);
self.blocks.push(block.clone());
self.last_block = Some(block.clone());
let transaction = block.transaction.clone();
pub fn add_block(&mut self, block: Block) -> Result<(), &str> {
if !self.check_block(&block, &self.last_block) {
println!("Bad block found, ignoring:\n{:?}", &block);
return Err("Bad block found, ignoring");
}
println!("Adding block:\n{:?}", &block);
self.blocks.push(block.clone());
self.last_block = Some(block.clone());
let transaction = block.transaction.clone();
{
// Adding block to DB
let mut statement = self.db.prepare("INSERT INTO blocks (\
{
// Adding block to DB
let mut statement = self.db.prepare("INSERT INTO blocks (\
id, timestamp, version, difficulty, random,\
nonce, 'transaction', prev_block_hash, hash)\
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);").unwrap();
statement.bind(1, block.index as i64);
statement.bind(2, block.timestamp as i64);
statement.bind(3, block.version as i64);
statement.bind(4, block.difficulty as i64);
statement.bind(5, block.random as i64);
statement.bind(6, block.nonce as i64);
match &transaction {
None => { statement.bind(7, ""); }
Some(transaction) => {
statement.bind(7, transaction.to_string().as_ref() as &str);
}
}
statement.bind(8, block.prev_block_hash.as_bytes());
statement.bind(9, block.hash.as_bytes());
statement.next().expect("Error adding block to DB");
}
statement.bind(1, block.index as i64).expect("Error in bind");
statement.bind(2, block.timestamp as i64).expect("Error in bind");
statement.bind(3, block.version as i64).expect("Error in bind");
statement.bind(4, block.difficulty as i64).expect("Error in bind");
statement.bind(5, block.random as i64).expect("Error in bind");
statement.bind(6, block.nonce as i64).expect("Error in bind");
match &transaction {
None => {}
None => { statement.bind(7, "").expect("Error in bind"); }
Some(transaction) => {
self.add_transaction(transaction);
statement.bind(7, transaction.to_string().as_ref() as &str).expect("Error in bind");
}
}
} else {
println!("Bad block found, ignoring:\n{:?}", &block);
statement.bind(8, block.prev_block_hash.as_bytes()).expect("Error in bind");
statement.bind(9, block.hash.as_bytes()).expect("Error in bind");
statement.next().expect("Error adding block to DB");
}
match &transaction {
None => {
Err("Error adding transaction!")
}
Some(transaction) => {
self.add_transaction(transaction);
Ok(())
}
}
}
fn add_transaction(&mut self, t: &Transaction) {
let mut statement = self.db.prepare("INSERT INTO transactions (identity, confirmation, method, data, pub_key, signature) VALUES (?, ?, ?, ?, ?, ?)").unwrap();
statement.bind(1, t.identity.as_bytes());
statement.bind(2, t.confirmation.as_bytes());
statement.bind(3, t.method.as_ref() as &str);
statement.bind(4, t.data.as_ref() as &str);
statement.bind(5, t.pub_key.as_bytes());
statement.bind(6, t.signature.as_bytes());
statement.bind(1, t.identity.as_bytes()).expect("Error in bind");
statement.bind(2, t.confirmation.as_bytes()).expect("Error in bind");
statement.bind(3, t.method.as_ref() as &str).expect("Error in bind");
statement.bind(4, t.data.as_ref() as &str).expect("Error in bind");
statement.bind(5, t.pub_key.as_bytes()).expect("Error in bind");
statement.bind(6, t.signature.as_bytes()).expect("Error in bind");
statement.next().expect("Error adding transaction to DB");
}
pub fn get_block(&self, index: u64) -> Option<Block> {
match self.db.prepare("SELECT * FROM blocks WHERE id=? LIMIT 1;") {
Ok(mut statement) => {
statement.bind(1, index as i64);
statement.bind(1, index as i64).expect("Error in bind");
while statement.next().unwrap() == State::Row {
return match Self::get_block_from_statement(&mut statement) {
None => {
@ -142,7 +147,7 @@ impl Blockchain {
}
let identity_hash = Transaction::hash_identity(domain);
let mut statement = self.db.prepare("SELECT pub_key FROM transactions WHERE identity = ? ORDER BY id DESC LIMIT 1;").unwrap();
statement.bind(1, identity_hash.as_bytes());
statement.bind(1, identity_hash.as_bytes()).expect("Error in bind");
while let State::Row = statement.next().unwrap() {
let pub_key = Bytes::from_bytes(statement.read::<Vec<u8>>(0).unwrap().as_slice());
if !pub_key.eq(&keystore.get_public()) {
@ -159,7 +164,7 @@ impl Blockchain {
// Checking for available zone, for this domain
let identity_hash = Transaction::hash_identity(parts.first().unwrap());
let mut statement = self.db.prepare("SELECT identity FROM transactions WHERE identity = ? ORDER BY id DESC LIMIT 1;").unwrap();
statement.bind(1, identity_hash.as_bytes());
statement.bind(1, identity_hash.as_bytes()).expect("Error in bind");
while let State::Row = statement.next().unwrap() {
// If there is such a zone
return true;
@ -196,15 +201,36 @@ impl Blockchain {
}*/
fn check_block(&self, block: &Block, prev_block: &Option<Block>) -> bool {
// TODO check if it is already stored, or its height is more than we need
if !Self::check_block_hash(block) {
if !check_block_hash(block) {
println!("{:?} has wrong hash! Ignoring!", &block);
return false;
}
if prev_block.is_none() {
return true;
// TODO make transaction not Optional
let transaction = block.transaction.as_ref().unwrap();
if !check_transaction_signature(&transaction) {
println!("{:?} has wrong signature! Ignoring block!", &transaction);
return false;
}
match prev_block {
None => {
if block.index != 0 {
return false;
}
if self.origin.is_zero() && block.index > 0 {
panic!("Error adding block {} without origin! Please, fill in origin in config!", block.index);
}
return block.prev_block_hash == prev_block.as_ref().unwrap().hash;
self.origin.is_zero() || block.hash.eq(&self.origin)
}
Some(prev) => {
if block.index != prev.index + 1 {
println!("Discarding block with index {} as not needed now", block.index);
return false;
}
block.prev_block_hash.eq(&prev.hash)
}
}
}
fn get_block_from_statement(statement: &mut Statement) -> Option<Block> {
@ -219,12 +245,18 @@ impl Blockchain {
let hash = Bytes::from_bytes(statement.read::<Vec<u8>>(8).unwrap().as_slice());
Some(Block::from_all_params(index, timestamp, version, difficulty, random, nonce, prev_block_hash, hash, transaction))
}
}
pub fn check_block_hash(block: &Block) -> bool {
// We need to clear Hash value to rehash it without it for check :(
let mut copy: Block = block.clone();
copy.hash = Bytes::default();
let data = serde_json::to_string(&copy).unwrap();
Block::hash(data.as_bytes()) == block.hash
}
pub fn check_block_hash(block: &Block) -> bool {
let mut copy: Block = block.clone();
copy.hash = Bytes::default();
let data = serde_json::to_string(&copy).unwrap();
Block::hash(data.as_bytes()) == block.hash
}
pub fn check_transaction_signature(transaction: &Transaction) -> bool {
let mut copy = transaction.clone();
copy.signature = Bytes::zero64();
let data = copy.get_bytes();
Keystore::check(data.as_slice(), copy.pub_key.as_bytes(), transaction.signature.as_bytes())
}

@ -85,12 +85,12 @@ impl fmt::Debug for Transaction {
impl Serialize for Transaction {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error> where S: Serializer {
let mut structure = serializer.serialize_struct("Transaction", 6).unwrap();
structure.serialize_field("identity", &self.identity);
structure.serialize_field("confirmation", &self.confirmation);
structure.serialize_field("method", &self.method);
structure.serialize_field("data", &self.data);
structure.serialize_field("pub_key", &self.pub_key);
structure.serialize_field("signature", &self.signature);
structure.serialize_field("identity", &self.identity)?;
structure.serialize_field("confirmation", &self.confirmation)?;
structure.serialize_field("method", &self.method)?;
structure.serialize_field("data", &self.data)?;
structure.serialize_field("pub_key", &self.pub_key)?;
structure.serialize_field("signature", &self.signature)?;
structure.end()
}
}

@ -1,8 +1,6 @@
use crate::{Keystore, Blockchain, Bus, Bytes};
use crate::event::Event;
use std::collections::HashMap;
use serde::{Serialize, Deserialize, Serializer, Deserializer};
use serde::de::Error;
use serde::{Serialize, Deserialize};
use std::fs::File;
use std::io::Read;
@ -65,7 +63,7 @@ impl Settings {
match File::open(file_name) {
Ok(mut file) => {
let mut text = String::new();
file.read_to_string(&mut text);
file.read_to_string(&mut text).unwrap();
let loaded = serde_json::from_str(&text);
return if loaded.is_ok() {
Some(loaded.unwrap())
@ -78,6 +76,9 @@ impl Settings {
}
pub fn get_origin(&self) -> Bytes {
if self.origin.eq("") {
return Bytes::zero32();
}
let origin = crate::from_hex(&self.origin).expect("Wrong origin in settings");
Bytes::from_bytes(origin.as_slice())
}

@ -7,7 +7,7 @@ use rand::{thread_rng, Rng};
use std::fmt;
use std::fs;
use std::fs::File;
use std::io::{Read, Write};
use std::io::Write;
use std::path::Path;
use serde::export::fmt::Error;
use serde::{Serialize, Deserialize, Serializer, Deserializer};
@ -26,7 +26,7 @@ pub struct Keystore {
impl Keystore {
pub fn new() -> Self {
let mut buf = [0u8; 64];
let mut buf = [0u8; 32];
let mut rng = thread_rng();
rng.fill(&mut buf);
let (private, public) = keypair(&buf);
@ -50,11 +50,11 @@ impl Keystore {
}
//TODO Implement error conditions
pub fn save(&self, filename: &str, password: &str) {
pub fn save(&self, filename: &str, _password: &str) {
match File::create(Path::new(filename)) {
Ok(mut f) => {
//TODO implement key encryption
f.write_all(&self.seed);
f.write_all(&self.seed).expect("Error saving keystore");
}
Err(_) => { println!("Error saving key file!"); }
}
@ -69,10 +69,10 @@ impl Keystore {
}
pub fn sign(&self, message: &[u8]) -> [u8; 64] {
signature(message, &self.private_key.data)
signature(message, self.private_key.data.as_slice())
}
pub fn check(&self, message: &[u8], public_key: &[u8], signature: &[u8]) -> bool {
pub fn check(message: &[u8], public_key: &[u8], signature: &[u8]) -> bool {
verify(message, public_key, signature)
}
@ -117,7 +117,7 @@ impl Bytes {
/// Returns a byte slice of the hash contents.
pub fn as_bytes(&self) -> &[u8] {
&self.data
self.data.as_slice()
}
pub fn zero32() -> Self {
@ -189,3 +189,12 @@ impl<'dd> Deserialize<'dd> for Bytes {
deserializer.deserialize_str(BytesVisitor)
}
}
#[test]
pub fn test_signature() {
let keystore: Keystore = Keystore::new();
let data = b"{ identity: 178135D209C697625E3EC71DA5C760382E54936F824EE5083908DA66B14ECE18,\
confirmation: A4A0AFECD1A511825226F0D3437C6C6BDAE83554040AA7AEB49DEFEAB0AE9EA4 }";
let signature = keystore.sign(data);
assert!(Keystore::check(data, keystore.get_public().as_bytes(), &signature), "Wrong signature!")
}

@ -1,16 +1,15 @@
#![windows_subsystem = "windows"]
extern crate web_view;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use rand::{Rng, RngCore};
use serde::{Deserialize, Serialize};
use rand::RngCore;
use serde::{Deserialize};
use web_view::*;
use alfis::{Block, Blockchain, Bytes, Context, Keystore, Settings, Transaction};
use alfis::{Blockchain, Bytes, Context, Keystore, Settings, Transaction};
use alfis::event::Event;
use alfis::miner::Miner;
use alfis::p2p::Network;
@ -31,7 +30,7 @@ fn main() {
None => { generate_key(KEYSTORE_DIFFICULTY, Arc::new(AtomicBool::new(true))).expect("Could not load or generate keypair") }
Some(keystore) => { keystore }
};
let blockchain: Blockchain = Blockchain::new(settings.origin.clone(), settings.version);
let blockchain: Blockchain = Blockchain::new(&settings);
let context: Arc<Mutex<Context>> = Arc::new(Mutex::new(Context::new(settings, keystore, blockchain)));
let mut miner_obj = Miner::new(context.clone());
@ -39,7 +38,7 @@ fn main() {
let miner: Arc<Mutex<Miner>> = Arc::new(Mutex::new(miner_obj));
let mut network = Network::new(context.clone());
network.start();
network.start().expect("Error starting network component");
create_genesis_if_needed(&context, &miner);
run_interface(context.clone(), miner.clone());
@ -76,8 +75,8 @@ fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>) {
println!("Command {}", arg);
match serde_json::from_str(arg).unwrap() {
Loaded => {
web_view.eval("showMiningIndicator(false);");
let mut handle = web_view.handle();
web_view.eval("showMiningIndicator(false);").expect("Error evaluating!");
let handle = web_view.handle();
let mut c = context.lock().unwrap();
c.bus.register(move |_uuid, e| {
println!("Got event from bus {:?}", &e);
@ -89,9 +88,9 @@ fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>) {
_ => { false }
};
handle.dispatch(move |web_view| {
web_view.eval(&format!("showMiningIndicator({});", visible));
web_view.eval(&format!("showMiningIndicator({});", visible)).expect("Error evaluating!");
return WVResult::Ok(());
});
}).expect("Error dispatching!");
true
});
}
@ -112,11 +111,11 @@ fn run_interface(context: Arc<Mutex<Context>>, miner: Arc<Mutex<Miner>>) {
CheckDomain { name} => {
let c = context.lock().unwrap();
let available = c.get_blockchain().is_domain_available(&name, &c.get_keystore());
web_view.eval(&format!("domainAvailable({})", available));
web_view.eval(&format!("domainAvailable({})", available)).expect("Error evaluating!");
}
CreateDomain { name, records, tags } => {
let keystore = {
let mut guard = context.lock().unwrap();
let guard = context.lock().unwrap();
guard.get_keystore()
};
create_domain(miner.clone(), name, records, &keystore);
@ -152,7 +151,7 @@ fn create_domain<S: Into<String>>(miner: Arc<Mutex<Miner>>, name: S, data: S, ke
println!("Generating domain {}", name);
//let rec_vector: Vec<String> = records.into().trim().split("\n").map(|s| s.trim()).map(String::from).collect();
//let tags_vector: Vec<String> = tags.into().trim().split(",").map(|s| s.trim()).map(String::from).collect();
let mut transaction = { create_transaction(keystore, name, "domain".into(), data.into()) };
let transaction = { create_transaction(keystore, name, "domain".into(), data.into()) };
let mut miner_guard = miner.lock().unwrap();
miner_guard.add_transaction(transaction);
}
@ -168,10 +167,10 @@ fn create_transaction<S: Into<String>>(keystore: &Keystore, name: S, method: S,
}
fn create_key(context: Arc<Mutex<Context>>, filename: &str, password: &str) {
let mut mining = Arc::new(AtomicBool::new(true));
let mining = Arc::new(AtomicBool::new(true));
{ context.lock().unwrap().bus.post(Event::KeyGeneratorStarted); }
for _ in 0..num_cpus::get() {
let context = context.clone();
{ context.lock().unwrap().bus.post(Event::KeyGeneratorStarted); }
let filename= filename.to_owned();
let password= password.to_owned();
let mining = mining.clone();
@ -201,7 +200,7 @@ fn create_key(context: Arc<Mutex<Context>>, filename: &str, password: &str) {
fn generate_key(difficulty: usize, mining: Arc<AtomicBool>) -> Option<Keystore> {
let mut rng = rand::thread_rng();
let mut buf = [0u8; 64];
let mut buf = [0u8; 32];
loop {
rng.fill_bytes(&mut buf);
let keystore = Keystore::from_bytes(&buf);
@ -213,7 +212,6 @@ fn generate_key(difficulty: usize, mining: Arc<AtomicBool>) -> Option<Keystore>
return None;
}
}
None
}
#[derive(Deserialize)]

@ -8,15 +8,12 @@ use crypto::digest::Digest;
use crypto::sha2::Sha256;
use num_cpus;
use crate::{Block, Bytes, Context, hash_is_good, Keystore, Transaction};
use crate::{Block, Bytes, Context, hash_is_good, Transaction};
use crate::event::Event;
pub struct Miner {
context: Arc<Mutex<Context>>,
keystore: Keystore,
version: u32,
transactions: Arc<Mutex<Vec<Transaction>>>,
last_block: Option<Block>,
running: Arc<AtomicBool>,
mining: Arc<AtomicBool>,
cond_var: Arc<Condvar>
@ -24,13 +21,9 @@ pub struct Miner {
impl Miner {
pub fn new(context: Arc<Mutex<Context>>) -> Self {
let c = context.lock().unwrap();
Miner {
context: context.clone(),
keystore: c.keystore.clone(),
version: c.settings.version,
transactions: Arc::new(Mutex::new(Vec::new())),
last_block: c.blockchain.blocks.last().cloned(),
running: Arc::new(AtomicBool::new(false)),
mining: Arc::new(AtomicBool::new(false)),
cond_var: Arc::new(Condvar::new())
@ -65,19 +58,17 @@ impl Miner {
let mut lock = transactions.lock().unwrap();
if lock.len() > 0 {
println!("Starting to mine some transaction");
println!("Got new transaction to mine");
let transaction = lock.remove(0);
mining.store(true, Ordering::Relaxed);
Miner::mine_internal(context.clone(), transactions.clone(), transaction, mining.clone(), cond_var.clone());
} else {
println!("Waiting for transactions");
cond_var.wait(lock);
println!("Got notified on new transaction");
let _ = cond_var.wait(lock).expect("Error in wait lock!");
}
}
});
let mining = self.mining.clone();
self.context.lock().unwrap().bus.register(move |uuid, e| {
self.context.lock().unwrap().bus.register(move |_uuid, e| {
if e == Event::ActionStopMining {
mining.store(false, Ordering::Relaxed);
}
@ -90,13 +81,11 @@ impl Miner {
}
fn mine_internal(context: Arc<Mutex<Context>>, transactions: Arc<Mutex<Vec<Transaction>>>, mut transaction: Transaction, mining: Arc<AtomicBool>, cond_var: Arc<Condvar>) {
let mut last_block_time = 0i64;
let mut version= 0u32;
{
let version= {
let mut c = context.lock().unwrap();
c.bus.post(Event::MinerStarted);
version = c.settings.version;
}
c.settings.version
};
let block = {
if transaction.signature.is_zero() {
// Signing it with private key from Keystore
@ -114,7 +103,6 @@ impl Miner {
Block::new(0, Utc::now().timestamp(), version, Bytes::zero32(), Some(transaction.clone()))
},
Some(block) => {
last_block_time = block.timestamp;
// Creating a block with that signed transaction
Block::new(block.index + 1, Utc::now().timestamp(), version, block.hash.clone(), Some(transaction.clone()))
},
@ -134,11 +122,10 @@ impl Miner {
let cond_var = cond_var.clone();
thread::spawn(move || {
live_threads.fetch_add(1, Ordering::Relaxed);
let mut count = 0u32;
match find_hash(&mut Sha256::new(), block, last_block_time, mining.clone()) {
match find_hash(&mut Sha256::new(), block, mining.clone()) {
None => {
println!("Mining did not find suitable hash or was stopped");
count = live_threads.fetch_sub(1, Ordering::Relaxed);
let count = live_threads.fetch_sub(1, Ordering::Relaxed);
// If this is the last thread, but mining was not stopped by another thread
if count == 0 && mining.load(Ordering::Relaxed) {
// If all threads came empty with mining we return transaction to the queue
@ -148,9 +135,8 @@ impl Miner {
}
},
Some(block) => {
count = live_threads.fetch_sub(1, Ordering::Relaxed);
let mut context = context.lock().unwrap();
context.blockchain.add_block(block);
context.blockchain.add_block(block).expect("Error adding fresh mined block!");
context.bus.post(Event::MinerStopped);
mining.store(false, Ordering::Relaxed);
},
@ -160,22 +146,16 @@ impl Miner {
}
}
fn find_hash(digest: &mut dyn Digest, mut block: Block, prev_block_time: i64, running: Arc<AtomicBool>) -> Option<Block> {
fn find_hash(digest: &mut dyn Digest, mut block: Block, running: Arc<AtomicBool>) -> Option<Block> {
let mut buf: [u8; 32] = [0; 32];
block.random = rand::random();
println!("Mining block {}", serde_json::to_string(&block).unwrap());
//let start_difficulty = block.difficulty;
for nonce in 0..std::u64::MAX {
if !running.load(Ordering::Relaxed) {
return None;
}
block.timestamp = Utc::now().timestamp();
block.nonce = nonce;
// if nonce % 1000 == 0 {
// println!("Nonce {}", nonce);
// }
// TODO uncomment for real run
//block.difficulty = start_difficulty + get_time_difficulty(prev_block_time, block.timestamp);
digest.reset();
digest.input(serde_json::to_string(&block).unwrap().as_bytes());
@ -186,13 +166,4 @@ fn find_hash(digest: &mut dyn Digest, mut block: Block, prev_block_time: i64, ru
}
}
None
}
fn get_time_difficulty(prev_time: i64, now: i64) -> usize {
let diff = now - prev_time;
if diff < 900_000 {
(900_000 as usize - diff as usize) / 60_000
} else {
0
}
}

@ -2,7 +2,6 @@ extern crate serde;
extern crate serde_json;
use serde::{Deserialize, Serialize};
use crate::p2p::peer::Peer;
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {

@ -2,7 +2,6 @@ extern crate serde;
extern crate serde_json;
use std::{io, thread};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@ -11,11 +10,9 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use mio::{Events, Interest, Poll, Registry, Token};
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use serde::{Deserialize, Serialize};
use crate::{Context, Block, p2p::Message, p2p::State, p2p::Peer, p2p::Peers};
use std::net::{SocketAddr, IpAddr, SocketAddrV4};
use std::borrow::BorrowMut;
const SERVER: Token = Token(0);
const POLL_TIMEOUT: Option<Duration> = Some(Duration::from_millis(3000));
@ -88,7 +85,7 @@ impl Network {
}
token => {
match peers.get_mut_peer(&token) {
Some(peer) => {
Some(_peer) => {
match handle_connection_event(context.clone(), &mut peers, &poll.registry(), &event) {
Ok(result) => {
if !result {
@ -123,7 +120,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
if event.is_readable() {
let data = {
let mut peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection");
let peer = peers.get_mut_peer(&event.token()).expect("Error getting peer for connection");
let mut stream = peer.get_stream();
read_message(&mut stream)
};
@ -134,8 +131,8 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
Ok(message) => {
println!("Got message from socket {}: {:?}", &event.token().0, &message);
let new_state = handle_message(context.clone(), message, peers, &event.token());
let mut peer = peers.get_mut_peer(&event.token()).unwrap();
let mut stream = peer.get_stream();
let peer = peers.get_mut_peer(&event.token()).unwrap();
let stream = peer.get_stream();
match new_state {
State::Message { data } => {
if event.is_writable() {
@ -154,7 +151,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
State::Error => {}
State::Banned => {}
State::Offline { .. } => {
peer.set_state(State::offline(1));
peer.set_state(State::offline());
}
}
}
@ -175,7 +172,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
State::Connecting => {
println!("Sending hello to socket {}", event.token().0);
let data: String = {
let mut c = context.lock().unwrap();
let c = context.lock().unwrap();
let message = Message::hand(&c.settings.origin, c.settings.version, c.settings.public);
serde_json::to_string(&message).unwrap()
};
@ -191,7 +188,7 @@ fn handle_connection_event(context: Arc<Mutex<Context>>, peers: &mut Peers, regi
println!("Odd version of pings :)");
if from.elapsed().as_secs() >= 30 {
let data: String = {
let mut c = context.lock().unwrap();
let c = context.lock().unwrap();
let message = Message::ping(c.blockchain.height());
serde_json::to_string(&message).unwrap()
};
@ -232,7 +229,7 @@ fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
Err(ref err) if would_block(err) => break,
Err(ref err) if interrupted(err) => continue,
// Other errors we'll consider fatal.
Err(err) => return Err(buf),
Err(_) => return Err(buf),
}
}
if buf.len() == data_size {
@ -244,29 +241,30 @@ fn read_message(stream: &mut &mut TcpStream) -> Result<Vec<u8>, Vec<u8>> {
fn send_message(connection: &mut TcpStream, data: &Vec<u8>) {
// TODO handle errors
connection.write_u32::<BigEndian>(data.len() as u32);
connection.write_u32::<BigEndian>(data.len() as u32).expect("Error sending message");
connection.write_all(&data).expect("Error writing to socket");
connection.flush();
connection.flush().expect("Error sending message");
}
fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Peers, token: &Token) -> State {
let my_height = {
let (my_height, my_origin, my_version) = {
let context = context.lock().unwrap();
context.blockchain.height()
(context.blockchain.height(), &context.settings.origin.clone(), context.settings.version)
};
match message {
Message::Hand { origin: origin, version, public } => {
let context = context.lock().unwrap();
if origin == context.settings.origin && version == context.settings.version {
let mut peer = peers.get_mut_peer(token).unwrap();
Message::Hand { origin, version, public } => {
if origin.eq(my_origin) && version == my_version {
let peer = peers.get_mut_peer(token).unwrap();
peer.set_public(public);
State::message(Message::shake(&context.settings.origin, context.settings.version, true, context.blockchain.height()))
State::message(Message::shake(&origin, version, true, my_height))
} else {
State::Error
}
}
Message::Shake { origin, version, ok, height } => {
// TODO check origin and version for compatibility
if origin.ne(my_origin) || version != my_version {
return State::Error;
}
if ok {
if height > my_height {
State::message(Message::GetBlock { index: my_height + 1u64 })
@ -308,6 +306,7 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
}
}
Message::Block { index, block } => {
println!("Received block {}", index);
let block: Block = match serde_json::from_str(&block) {
Ok(block) => block,
Err(_) => return State::Error
@ -316,8 +315,10 @@ fn handle_message(context: Arc<Mutex<Context>>, message: Message, peers: &mut Pe
let context = context.clone();
thread::spawn(move || {
let mut context = context.lock().unwrap();
context.blockchain.add_block(block);
context.bus.post(crate::event::Event::BlockchainChanged)
match context.blockchain.add_block(block) {
Ok(_) => { context.bus.post(crate::event::Event::BlockchainChanged); }
Err(_) => { println!("Error adding received block"); }
}
});
State::idle()
}

@ -1,7 +1,6 @@
use crate::p2p::State;
use std::net::SocketAddr;
use mio::net::TcpStream;
use std::sync::RwLock;
#[derive(Debug)]
pub struct Peer {
@ -49,9 +48,9 @@ impl Peer {
self.state.disabled()
}
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
pub fn equals(&self, addr: &SocketAddr) -> bool {
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
if self.addr.ip().is_loopback() {
self.addr == *addr
} else {

@ -80,7 +80,7 @@ impl Peers {
}
pub fn send_pings(&mut self, registry: &Registry, height: u64) {
for (token, mut peer) in self.peers.iter_mut() {
for (token, peer) in self.peers.iter_mut() {
match peer.get_state() {
State::Idle { from } => {
if from.elapsed().as_secs() >= PING_PERIOD {
@ -93,7 +93,7 @@ impl Peers {
};
peer.set_state(State::message(message));
let mut stream = peer.get_stream();
let stream = peer.get_stream();
registry.reregister(stream, token.clone(), Interest::WRITABLE).unwrap();
}
}

@ -9,7 +9,7 @@ pub enum State {
Message { data: Vec<u8> },
Error,
Banned,
Offline { from: Instant, attempts: usize },
Offline { from: Instant },
}
impl State {
@ -17,19 +17,8 @@ impl State {
Self::Idle { from: Instant::now() }
}
pub fn offline(attempts: usize) -> Self {
Self::Offline { attempts, from: Instant::now() }
}
pub fn still_offline(state: Self) -> Self {
match state {
State::Offline { attempts, from } => {
Self::Offline { attempts: attempts + 1, from }
}
_ => {
Self::Offline { attempts: 1, from: Instant::now() }
}
}
pub fn offline() -> Self {
Self::Offline { from: Instant::now() }
}
pub fn message(message: Message) -> Self {
@ -51,7 +40,7 @@ impl State {
match self {
State::Error => { true }
State::Banned => { true }
State::Offline { from, attempts } => {
State::Offline { from} => {
from.elapsed().as_secs() < 60 // We check offline peers to become online every 5 minutes
}
_ => { false }

@ -1,4 +1,3 @@
use crate::event::Event;
use uuid::Uuid;
use std::collections::HashMap;
@ -11,7 +10,7 @@ impl<T: Clone> Bus<T> {
Bus { listeners: HashMap::new() }
}
pub fn register<F>(&mut self, mut closure: F) -> Uuid where F: FnMut(&Uuid, T) -> bool + Send + Sync + 'static {
pub fn register<F>(&mut self, closure: F) -> Uuid where F: FnMut(&Uuid, T) -> bool + Send + Sync + 'static {
let uuid = Uuid::new_v4();
self.listeners.insert(uuid.clone(), Box::new(closure));
uuid
@ -30,7 +29,6 @@ impl<T: Clone> Bus<T> {
mod tests {
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;
use std::time::Duration;
@ -39,8 +37,8 @@ mod tests {
#[test]
fn test1() {
let mut string = Arc::new(Mutex::new(String::from("start")));
let mut bus = Arc::new(Mutex::new(Bus::new()));
let string = Arc::new(Mutex::new(String::from("start")));
let bus = Arc::new(Mutex::new(Bus::new()));
let string_copy = string.clone();
{
bus.lock().unwrap().register(move |_uuid, e| {
@ -56,7 +54,7 @@ mod tests {
bus2.lock().unwrap().post(Event::BlockchainChanged);
});
let mut guard = string.lock().unwrap();
let guard = string.lock().unwrap();
thread::sleep(Duration::from_millis(100));
println!("string = {}", &guard);
}

Loading…
Cancel
Save