From 707109f5754c21485ef7ed2018911421d8876e8c Mon Sep 17 00:00:00 2001 From: Mike Goelzer Date: Wed, 12 Sep 2018 09:04:22 -0700 Subject: [PATCH 1/3] Copied over floodsub example --- pubsub/rust/.gitignore | 2 + pubsub/rust/Cargo.toml | 17 +++ pubsub/rust/Makefile.not_used | 8 ++ pubsub/rust/src/main.rs | 160 +++++++++++++++++++++++ pubsub/rust/src/test-rsa-private-key.pk8 | Bin 0 -> 1219 bytes pubsub/rust/src/test-rsa-public-key.der | Bin 0 -> 294 bytes 6 files changed, 187 insertions(+) create mode 100644 pubsub/rust/.gitignore create mode 100644 pubsub/rust/Cargo.toml create mode 100644 pubsub/rust/Makefile.not_used create mode 100644 pubsub/rust/src/main.rs create mode 100644 pubsub/rust/src/test-rsa-private-key.pk8 create mode 100644 pubsub/rust/src/test-rsa-public-key.der diff --git a/pubsub/rust/.gitignore b/pubsub/rust/.gitignore new file mode 100644 index 0000000..1e7caa9 --- /dev/null +++ b/pubsub/rust/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/pubsub/rust/Cargo.toml b/pubsub/rust/Cargo.toml new file mode 100644 index 0000000..2d77d4a --- /dev/null +++ b/pubsub/rust/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "floodsub-example" +version = "0.0.1" +authors = ["Rust Libp2p "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +tokio-codec = "0.1" +tokio-io = "0.1" +tokio-current-thread = "0.1" +tokio-stdin = "0.1" +env_logger = "0.5.4" +rand = "0.4" +libp2p = { git = "https://github.com/libp2p/rust-libp2p" } +# To use local rust-libp2p source +#libp2p = { path = "../rust-libp2p" } diff --git a/pubsub/rust/Makefile.not_used b/pubsub/rust/Makefile.not_used new file mode 100644 index 0000000..32b1c84 --- /dev/null +++ b/pubsub/rust/Makefile.not_used @@ -0,0 +1,8 @@ +SRC = src/main.rs +BIN = $(SRC:.rs=) + +all: $(SRC) + cargo build + +run: + RUST_BACKTRACE=1 cargo run diff --git a/pubsub/rust/src/main.rs b/pubsub/rust/src/main.rs new file mode 100644 index 0000000..44cc5be --- /dev/null +++ b/pubsub/rust/src/main.rs @@ -0,0 +1,160 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate env_logger; +extern crate futures; +extern crate libp2p; +extern crate rand; +extern crate tokio_current_thread; +extern crate tokio_io; +extern crate tokio_stdin; + +use futures::Stream; +use futures::future::Future; +use std::{env, mem}; +use libp2p::core::{either::EitherOutput, upgrade}; +use libp2p::core::{Multiaddr, Transport, PublicKey}; +use libp2p::peerstore::PeerId; +use libp2p::tcp::TcpConfig; +use libp2p::websocket::WsConfig; + +fn main() { + env_logger::init(); + + // Determine which address to listen to. + let listen_addr = env::args() + .nth(1) + .unwrap_or("/ip4/0.0.0.0/tcp/10050".to_owned()); + + // We start by creating a `TcpConfig` that indicates that we want TCP/IP. + let transport = TcpConfig::new() + // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. + // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be + // used for the underlying multiaddress. + .or_transport(WsConfig::new(TcpConfig::new())) + + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, + // depending on which one the remote supports. + .with_upgrade({ + let plain_text = upgrade::PlainTextConfig; + + let secio = { + let private_key = include_bytes!("test-rsa-private-key.pk8"); + let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); + libp2p::secio::SecioConfig::new( + libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap() + ) + }; + + upgrade::or( + upgrade::map(plain_text, |pt| EitherOutput::First(pt)), + upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) + ) + }) + + // On top of plaintext or secio, we will use the multiplex protocol. + .with_upgrade(libp2p::mplex::MplexConfig::new()) + // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a + // `Transport` because the output of the upgrade is not a stream but a controller for + // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into + // a `Transport`. + .map(|val, _| ((), val)) + .into_connection_reuse() + .map(|((), val), _| val); + + // We now have a `transport` variable that can be used either to dial nodes or listen to + // incoming connections, and that will automatically apply secio and multiplex on top + // of any opened stream. + + // We now prepare the protocol that we are going to negotiate with nodes that open a connection + // or substream to our server. + let my_id = { + let key = (0..2048).map(|_| rand::random::()).collect::>(); + PeerId::from_public_key(PublicKey::Rsa(key)) + }; + + let (floodsub_upgrade, floodsub_rx) = libp2p::floodsub::FloodSubUpgrade::new(my_id); + + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and + // outgoing connections for us. + let (swarm_controller, swarm_future) = libp2p::core::swarm( + transport.clone().with_upgrade(floodsub_upgrade.clone()), + |socket, _| { + println!("Successfully negotiated protocol"); + socket + }, + ); + + let address = swarm_controller + .listen_on(listen_addr.parse().expect("invalid multiaddr")) + .expect("unsupported multiaddr"); + println!("Now listening on {:?}", address); + + let topic = libp2p::floodsub::TopicBuilder::new("libp2p-demo-chat").build(); + + let floodsub_ctl = libp2p::floodsub::FloodSubController::new(&floodsub_upgrade); + floodsub_ctl.subscribe(&topic); + + let floodsub_rx = floodsub_rx.for_each(|msg| { + if let Ok(msg) = String::from_utf8(msg.data) { + println!("< {}", msg); + } + Ok(()) + }); + + let stdin = { + let mut buffer = Vec::new(); + tokio_stdin::spawn_stdin_stream_unbounded().for_each(move |msg| { + if msg != b'\r' && msg != b'\n' { + buffer.push(msg); + return Ok(()); + } else if buffer.is_empty() { + return Ok(()); + } + + let msg = String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap(); + if msg.starts_with("/dial ") { + let target: Multiaddr = msg[6..].parse().unwrap(); + println!("*Dialing {}*", target); + swarm_controller + .dial( + target, + transport.clone().with_upgrade(floodsub_upgrade.clone()), + ) + .unwrap(); + } else { + floodsub_ctl.publish(&topic, msg.into_bytes()); + } + + Ok(()) + }) + }; + + let final_fut = swarm_future + .for_each(|_| Ok(())) + .select(floodsub_rx) + .map(|_| ()) + .map_err(|e| e.0) + .select(stdin.map_err(|_| unreachable!())) + .map(|_| ()) + .map_err(|e| e.0); + tokio_current_thread::block_on_all(final_fut).unwrap(); +} diff --git a/pubsub/rust/src/test-rsa-private-key.pk8 b/pubsub/rust/src/test-rsa-private-key.pk8 new file mode 100644 index 0000000000000000000000000000000000000000..452b7af11377ca1ab27401dc29b616ef383bae72 GIT binary patch literal 1219 zcmV;!1U&mNf&{+;0RS)!1_>&LNQUrsW5^Br2+u}0)hbn0K;(V^2bVN z^6JnD0u>N`%LEsu=I@2}mws$CbWZw13NOPfhRl(fuf3{^%zE zD<5MhzfrHEn>LrHpH`#5@t7bD8o-i^V=Z;bf&k)}fueF0QupYyHZRMH-uoYB#V%=N zP5Q+;IGb9t&-r`kVG&K-9bm~s`9D}dMdRgdzdCitb$gkA$N51Fe#0l^@v)BOi=pF! z`Ee_Vz+JC$&DjIIlc`C|K9#QE8LTjRz{;TxU0W)pfj@U~wc)_w(^C2{o?Rir)+30o zR2gNwrrG`fccEYmqUfxDN}|cJc!?HM;Mby!x^aOVzSRI+m}i|$69NMP009Dm0RTb? zWm%&A0V0z$x7K_|mXHEb`;!bfnuv44sQ)ihEVaJ_*9^e z`_Uz!2bc2bA_)plRLd)vZh*u zqaF!7WvpOc@2Q4m;}isE24<*f((VOG(2WS-8{^b@!Q|$h?w!1sMshmNeumGeJt?R( z^JelR{z+T^kR;KJ8xi#F@JhN8zK5m^^X?D z(FeGrDk+4fdJTs*j;f0FkaJ!?cj$D@iO#Ee5>61+=hO1 zeFCSr6T)Xnl#9T4hUB%qvK5|615cI3Ms;J8XTP-}L{F>I*{ENia^=mQaYm^ZmSY=~}B_sjk(cKnl}2vkQJzY(GH!h_WJV#ExlBBF?MyN#H|yG5Z36 zfdHS|G=S4qRA+@;Ro?vd5N;!;{e)P)lZ}gA}fdHS(jQkNk$Uy_0jtgnPEOu>zSC&I2 z0q>-J_7Jj_5XdEvQ8)gXH2kmtO5oRwN&7!ebDW7At+Mw^knOjkiVjd*4dD5zS$p1; z{ui_P!?rM=V@our1ArXI^38s-z&Q@%ik>o?U%xoQbbJm=5!+q5rYUeBL1F{f>v_I% zKxYDhfdI?^vg%7f{7?uV0L(D$=Uk}TOx#P}#7fUFJw)onjI-GyOsQ6JP>Uv* hv60PPFuRS$Ty2a9Lxa2c4}sY^u8x&NYgA)~zP3@lPz3-0 literal 0 HcmV?d00001 diff --git a/pubsub/rust/src/test-rsa-public-key.der b/pubsub/rust/src/test-rsa-public-key.der new file mode 100644 index 0000000000000000000000000000000000000000..9e62c93ec1938e6e003e97d46d595414be26460c GIT binary patch literal 294 zcmV+>0ondAf&n5h4F(A+hDe6@4FLfG1potr0S^E$f&mHwf&l>l!*J^I$4Y1N>d*-S z6%c;Q1Q({}?;Z;rN9D3rUM+P2KG5e0NG@T=mA+iGf6Hx8P4D;7{U~7m=qCRwA7d!L zQLmz#HkYTLR-?f2m>>-rz> Date: Wed, 12 Sep 2018 16:56:01 -0700 Subject: [PATCH 2/3] Added instructions for rust --- README.md | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3c0aa77..d7bd3a0 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ node js-dht-test/index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESp **Directory**: `pubsub` -**What it demonstrates**: Two Go nodes are created and run a chat server using a shared PubSub topic. **TODO**: Should be a Go node and a JS node, once I get the two Go nodes version working. +**What it demonstrates**: Two Go peers, one JS peer, and one Rust peer are all created and run a chat server using a shared PubSub topic. Typing text in any peer sends it to all the other peers. + +(**TODO**: eliminate centralized bootstrapper; any peer should be able to bootstrap from any other peer and peers should be able to start in any order) **First terminal**: Create the bootstrapper node @@ -61,8 +63,20 @@ node index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6a This JS peer will fire off a hello message every few seconds, which the other two subscribing nodes can see. -If you return to the second terminal and type a message, the bootstrapper and JS peers will both print that message. +**Fourth terminal**: Createa a Rust peer to connect to the bootstrap node and then subscribe and publish on the topic: + +``` +cd pubsub/rust +cargo run /ip4/0.0.0.0/tcp/9879 +# Wait for it to start up +/dial /ip4/127.0.0.1/tcp/9876 +# Now type any message to publish +``` + +The Rust peer listens on the CLI-specified port (9879 in the above example), and then the `/dial` command causes it to dial out to the boostrap host. (TODO: rust-libp2p#471) It is now subscribed to the same topic as the other peers. + +If you return to the second, third or fourth terminals and type a message, the bootstrapper and the other 2 peers will all print your message. -In short, you have a chat app on a private libp2p network. +In short, you have a chat app on a private libp2p network using PubSub. _Acknowledgements: @jhiesey for DHT (content & peer routing) JS+Go interop, @stebalien for PubSub_ From 8f17eb5934cf67fa3b4ba13a377275ccaf8b5770 Mon Sep 17 00:00:00 2001 From: Mike Goelzer Date: Fri, 14 Sep 2018 04:40:30 -0700 Subject: [PATCH 3/3] Hashed topic strings to make rust work --- pubsub/js/index.js | 9 ++++++--- pubsub/pubsub-interop.go | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pubsub/js/index.js b/pubsub/js/index.js index c74fd50..ccfe64d 100644 --- a/pubsub/js/index.js +++ b/pubsub/js/index.js @@ -12,6 +12,9 @@ const defaultsDeep = require('@nodeutils/defaults-deep') const waterfall = require('async/waterfall') const parallel = require('async/parallel') +// "RDE...tao" is the hash of "libp2p-chat-demo" (for compat with Rust) +const topicName = "RDEpsjSPrAZF9JCK5REt3tao" + class MyBundle extends libp2p { constructor(_options) { const defaults = { @@ -54,13 +57,13 @@ waterfall([ fsub.start(cb) }, (cb) => { - fsub.on('libp2p-demo-chat', (data) => { + fsub.on(topicName, (data) => { const peerIdStr = data.from const peerIdTruncdStr = peerIdStr.substr(0,2) + "*" + peerIdStr.substr(peerIdStr.length-6,6) const messageStr = data.data console.log(": " + messageStr) }) - fsub.subscribe('libp2p-demo-chat') + fsub.subscribe(topicName) node.dial(bootstrapAddr, cb) } @@ -78,5 +81,5 @@ let i = 0 function pubsubloop() { i = i + 1 var s = new Buffer('Hello from JS (' + i + ')') - fsub.publish('libp2p-demo-chat', s) + fsub.publish(topicName, s) } \ No newline at end of file diff --git a/pubsub/pubsub-interop.go b/pubsub/pubsub-interop.go index 2a7fa9a..d9abde1 100644 --- a/pubsub/pubsub-interop.go +++ b/pubsub/pubsub-interop.go @@ -24,7 +24,8 @@ var ho host.Host //var dhtPtr *dht.IpfsDHT -var TopicName string = "libp2p-demo-chat" +//var TopicName string = "libp2p-demo-chat" +var TopicName string = "RDEpsjSPrAZF9JCK5REt3tao" func parseArgs() (bool, string) { usage := fmt.Sprintf("Usage: %s [-b] [PRIVATE_KEY]\n\n-b is bootstrap mode (creates DHT)\nPRIVATE_KEY is the path to a private key like '../util/private_key.bin'\n", os.Args[0]) @@ -161,8 +162,16 @@ func main() { } }() - host.Network().SetConnHandler(handleConn) - + // SetConnHandler() should not normally be called. Instead, + // use Notify() and pass it a functioon. + // The problem with SetConnHandler() is that it takes control + // of the connection. + //host.Network().SetConnHandler(handleConn) + host.Network().Notify(&inet.NotifyBundle{ + ConnectedF: func(n inet.Network, c inet.Conn) { + fmt.Println("Got a connection:", c.RemotePeer()) + }, + }) if bBootstrap { fmt.Println("Bootstrapper running. Ctrl+C to exit.") for true {