Merge pull request #4 from mgoelzer/rust-mvp

Rust mvp
master
Mike Goelzer 6 years ago committed by GitHub
commit d88a293383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -32,7 +32,9 @@ node js-dht-test/index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESp
**Directory**: `pubsub`
**What it demonstrates**: Two Go nodes are created and run a chat server using a shared PubSub topic. **TODO**: Should be a Go node and a JS node, once I get the two Go nodes version working.
**What it demonstrates**: Two Go peers, one JS peer, and one Rust peer are all created and run a chat server using a shared PubSub topic. Typing text in any peer sends it to all the other peers.
(**TODO**: eliminate centralized bootstrapper; any peer should be able to bootstrap from any other peer and peers should be able to start in any order)
**First terminal**: Create the bootstrapper node
@ -61,8 +63,20 @@ node index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6a
This JS peer will fire off a hello message every few seconds, which the other two subscribing nodes can see.
If you return to the second terminal and type a message, the bootstrapper and JS peers will both print that message.
**Fourth terminal**: Createa a Rust peer to connect to the bootstrap node and then subscribe and publish on the topic:
```
cd pubsub/rust
cargo run /ip4/0.0.0.0/tcp/9879
# Wait for it to start up
/dial /ip4/127.0.0.1/tcp/9876
# Now type any message to publish
```
The Rust peer listens on the CLI-specified port (9879 in the above example), and then the `/dial` command causes it to dial out to the boostrap host. (TODO: rust-libp2p#471) It is now subscribed to the same topic as the other peers.
If you return to the second, third or fourth terminals and type a message, the bootstrapper and the other 2 peers will all print your message.
In short, you have a chat app on a private libp2p network.
In short, you have a chat app on a private libp2p network using PubSub.
_Acknowledgements: @jhiesey for DHT (content & peer routing) JS+Go interop, @stebalien for PubSub_

@ -12,6 +12,9 @@ const defaultsDeep = require('@nodeutils/defaults-deep')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
// "RDE...tao" is the hash of "libp2p-chat-demo" (for compat with Rust)
const topicName = "RDEpsjSPrAZF9JCK5REt3tao"
class MyBundle extends libp2p {
constructor(_options) {
const defaults = {
@ -54,13 +57,13 @@ waterfall([
fsub.start(cb)
},
(cb) => {
fsub.on('libp2p-demo-chat', (data) => {
fsub.on(topicName, (data) => {
const peerIdStr = data.from
const peerIdTruncdStr = peerIdStr.substr(0,2) + "*" + peerIdStr.substr(peerIdStr.length-6,6)
const messageStr = data.data
console.log("<peer " + peerIdTruncdStr + ">: " + messageStr)
})
fsub.subscribe('libp2p-demo-chat')
fsub.subscribe(topicName)
node.dial(bootstrapAddr, cb)
}
@ -78,5 +81,5 @@ let i = 0
function pubsubloop() {
i = i + 1
var s = new Buffer('Hello from JS (' + i + ')')
fsub.publish('libp2p-demo-chat', s)
fsub.publish(topicName, s)
}

@ -24,7 +24,8 @@ var ho host.Host
//var dhtPtr *dht.IpfsDHT
var TopicName string = "libp2p-demo-chat"
//var TopicName string = "libp2p-demo-chat"
var TopicName string = "RDEpsjSPrAZF9JCK5REt3tao"
func parseArgs() (bool, string) {
usage := fmt.Sprintf("Usage: %s [-b] [PRIVATE_KEY]\n\n-b is bootstrap mode (creates DHT)\nPRIVATE_KEY is the path to a private key like '../util/private_key.bin'\n", os.Args[0])
@ -161,8 +162,16 @@ func main() {
}
}()
host.Network().SetConnHandler(handleConn)
// SetConnHandler() should not normally be called. Instead,
// use Notify() and pass it a functioon.
// The problem with SetConnHandler() is that it takes control
// of the connection.
//host.Network().SetConnHandler(handleConn)
host.Network().Notify(&inet.NotifyBundle{
ConnectedF: func(n inet.Network, c inet.Conn) {
fmt.Println("Got a connection:", c.RemotePeer())
},
})
if bBootstrap {
fmt.Println("Bootstrapper running. Ctrl+C to exit.")
for true {

@ -0,0 +1,2 @@
Cargo.lock
target/

@ -0,0 +1,17 @@
[package]
name = "floodsub-example"
version = "0.0.1"
authors = ["Rust Libp2p <libp2p@libp2p.io>"]
[dependencies]
bytes = "0.4"
futures = "0.1"
tokio-codec = "0.1"
tokio-io = "0.1"
tokio-current-thread = "0.1"
tokio-stdin = "0.1"
env_logger = "0.5.4"
rand = "0.4"
libp2p = { git = "https://github.com/libp2p/rust-libp2p" }
# To use local rust-libp2p source
#libp2p = { path = "../rust-libp2p" }

@ -0,0 +1,8 @@
SRC = src/main.rs
BIN = $(SRC:.rs=)
all: $(SRC)
cargo build
run:
RUST_BACKTRACE=1 cargo run

@ -0,0 +1,160 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
extern crate env_logger;
extern crate futures;
extern crate libp2p;
extern crate rand;
extern crate tokio_current_thread;
extern crate tokio_io;
extern crate tokio_stdin;
use futures::Stream;
use futures::future::Future;
use std::{env, mem};
use libp2p::core::{either::EitherOutput, upgrade};
use libp2p::core::{Multiaddr, Transport, PublicKey};
use libp2p::peerstore::PeerId;
use libp2p::tcp::TcpConfig;
use libp2p::websocket::WsConfig;
fn main() {
env_logger::init();
// Determine which address to listen to.
let listen_addr = env::args()
.nth(1)
.unwrap_or("/ip4/0.0.0.0/tcp/10050".to_owned());
// We start by creating a `TcpConfig` that indicates that we want TCP/IP.
let transport = TcpConfig::new()
// In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP.
// The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be
// used for the underlying multiaddress.
.or_transport(WsConfig::new(TcpConfig::new()))
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-rsa-private-key.pk8");
let public_key = include_bytes!("test-rsa-public-key.der").to_vec();
libp2p::secio::SecioConfig::new(
libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap()
)
};
upgrade::or(
upgrade::map(plain_text, |pt| EitherOutput::First(pt)),
upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream))
)
})
// On top of plaintext or secio, we will use the multiplex protocol.
.with_upgrade(libp2p::mplex::MplexConfig::new())
// The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);
// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply secio and multiplex on top
// of any opened stream.
// We now prepare the protocol that we are going to negotiate with nodes that open a connection
// or substream to our server.
let my_id = {
let key = (0..2048).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
PeerId::from_public_key(PublicKey::Rsa(key))
};
let (floodsub_upgrade, floodsub_rx) = libp2p::floodsub::FloodSubUpgrade::new(my_id);
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(floodsub_upgrade.clone()),
|socket, _| {
println!("Successfully negotiated protocol");
socket
},
);
let address = swarm_controller
.listen_on(listen_addr.parse().expect("invalid multiaddr"))
.expect("unsupported multiaddr");
println!("Now listening on {:?}", address);
let topic = libp2p::floodsub::TopicBuilder::new("libp2p-demo-chat").build();
let floodsub_ctl = libp2p::floodsub::FloodSubController::new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
let floodsub_rx = floodsub_rx.for_each(|msg| {
if let Ok(msg) = String::from_utf8(msg.data) {
println!("< {}", msg);
}
Ok(())
});
let stdin = {
let mut buffer = Vec::new();
tokio_stdin::spawn_stdin_stream_unbounded().for_each(move |msg| {
if msg != b'\r' && msg != b'\n' {
buffer.push(msg);
return Ok(());
} else if buffer.is_empty() {
return Ok(());
}
let msg = String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap();
if msg.starts_with("/dial ") {
let target: Multiaddr = msg[6..].parse().unwrap();
println!("*Dialing {}*", target);
swarm_controller
.dial(
target,
transport.clone().with_upgrade(floodsub_upgrade.clone()),
)
.unwrap();
} else {
floodsub_ctl.publish(&topic, msg.into_bytes());
}
Ok(())
})
};
let final_fut = swarm_future
.for_each(|_| Ok(()))
.select(floodsub_rx)
.map(|_| ())
.map_err(|e| e.0)
.select(stdin.map_err(|_| unreachable!()))
.map(|_| ())
.map_err(|e| e.0);
tokio_current_thread::block_on_all(final_fut).unwrap();
}
Loading…
Cancel
Save