Merge pull request #3 from mgoelzer/pubsub-mvp

Pubsub mvp
master
Mike Goelzer 6 years ago committed by GitHub
commit b3bbc810fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

2
.gitignore vendored

@ -1,3 +1,5 @@
dht-interop
pubsub-interop
content-dht-provide-find/js-dht-test/node_modules/
util/private-key-gen
pubsub/js/node_modules

@ -19,14 +19,10 @@ Note that the node ID of `dht-interop` is always `Qm...6aJ9oRuEzWa` because it i
**Second terminal:** run the command printed out by dht-interop, replacing 127.0.0.1 with the IP of the server where dht-interop is listening. Example:
First time only:
```
cd js-dht-test
npm install
```
Running the Node.js program:
```
cd content-dht-provide-find/js-dht-test
npm install # first time only
node js-dht-test/index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6aJ9oRuEzWa
```
@ -38,6 +34,35 @@ node js-dht-test/index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESp
**What it demonstrates**: Two Go nodes are created and run a chat server using a shared PubSub topic. **TODO**: Should be a Go node and a JS node, once I get the two Go nodes version working.
TODO: show how to run
**First terminal**: Create the bootstrapper node
```
cd pubsub
./pubsub-interop -b ../util/private_key.bin.bootstrapper.Wa
```
The bootstrapper creates a new libp2p node, subscribes to the shared topic string, spawns a go routine to emit any publishes to that topic, and then waits forever.
**Second terminal**: Create a go peer to connect to bootstrapper and publish on the topic
```
cd pubsub
./pubsub-interop ../util/private_key.bin.peer.Sk
```
This peer, which is not in bootstrapper mode, creates a node, subscribes to the shared topic string, spawns the same go routine, and then loops forever requesting user input and publishing each line to the topic.
**Third terminal**: Create a JS peer to connect to bootstrap and publish on topic
```
cd pubsub/js
npm install # first time only
node index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6aJ9oRuEzWa
```
This JS peer will fire off a hello message every few seconds, which the other two subscribing nodes can see.
If you return to the second terminal and type a message, the bootstrapper and JS peers will both print that message.
In short, you have a chat app on a private libp2p network.
_Acknowledgements: @jhiesey for DHT (content & peer routing) JS+Go interop, @stebalien for PubSub_

@ -0,0 +1,11 @@
SRC = pubsub-interop.go
BIN = $(SRC:.go=)
DHT_SERVER = libp2p-bootstrap.goelzer.io
all: $(SRC)
go build -o $(BIN) $(SRC)
install:
-ssh $(DHT_SERVER) killall -9 dht-interop
scp $(SRC:.go=) $(DHT_SERVER):~/
ssh $(DHT_SERVER) ./$(BIN)

@ -0,0 +1,82 @@
'use strict'
const libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const PeerInfo = require('peer-info')
const FloodSub = require('libp2p-floodsub')
const CID = require('cids')
const KadDHT = require('libp2p-kad-dht')
const defaultsDeep = require('@nodeutils/defaults-deep')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
class MyBundle extends libp2p {
constructor(_options) {
const defaults = {
modules: {
transport: [TCP],
streamMuxer: [Mplex],
connEncryption: [SECIO],
},
}
super(defaultsDeep(_options, defaults))
}
}
function createNode(callback) {
let node
waterfall([
(cb) => PeerInfo.create(cb),
(peerInfo, cb) => {
peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
node = new MyBundle({
peerInfo
})
node.start(cb)
}
], (err) => callback(err, node))
}
let fsub;
let node;
const bootstrapAddr = process.argv[2];
waterfall([
(cb) => createNode(cb),
(node_, cb) => {
node = node_
console.log("My ID: " + node.peerInfo.id._idB58String)
fsub = new FloodSub(node)
fsub.start(cb)
},
(cb) => {
fsub.on('libp2p-demo-chat', (data) => {
const peerIdStr = data.from
const peerIdTruncdStr = peerIdStr.substr(0,2) + "*" + peerIdStr.substr(peerIdStr.length-6,6)
const messageStr = data.data
console.log("<peer " + peerIdTruncdStr + ">: " + messageStr)
})
fsub.subscribe('libp2p-demo-chat')
node.dial(bootstrapAddr, cb)
}
], (err) => {
if (err) {
console.log('Error:', err)
throw err
}
console.log("Connected to:", bootstrapAddr)
setInterval(pubsubloop, 3000);
})
let i = 0
function pubsubloop() {
i = i + 1
var s = new Buffer('Hello from JS (' + i + ')')
fsub.publish('libp2p-demo-chat', s)
}

@ -0,0 +1 @@
../peer-id/src/bin.js

@ -0,0 +1 @@
../pem-jwk/bin/pem-jwk.js

@ -0,0 +1 @@
../rsa-unpack/bin/cmd.js

@ -0,0 +1 @@
../semver/bin/semver

@ -0,0 +1 @@
../sha.js/bin.js

1
pubsub/js/node_modules/.bin/uuid generated vendored

@ -0,0 +1 @@
../uuid/bin/uuid

@ -0,0 +1,23 @@
{
"name": "js-dht-test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"async": "^2.6.1",
"cids": "^0.5.3",
"libp2p": "^0.23.1",
"libp2p-kad-dht": "^0.10.2",
"libp2p-mplex": "^0.8.0",
"libp2p-secio": "^0.10.0",
"libp2p-tcp": "^0.12.1",
"peer-info": "^0.14.1",
"libp2p-floodsub": "^0.15.0"
}
}

@ -0,0 +1,190 @@
package main
import (
"bufio"
"context"
"fmt"
"io/ioutil"
"os"
_ "time"
// ipfsaddr "github.com/ipfs/go-ipfs-addr"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-floodsub"
ma "github.com/multiformats/go-multiaddr"
)
var ho host.Host
//var dhtPtr *dht.IpfsDHT
var TopicName string = "libp2p-demo-chat"
func parseArgs() (bool, string) {
usage := fmt.Sprintf("Usage: %s [-b] [PRIVATE_KEY]\n\n-b is bootstrap mode (creates DHT)\nPRIVATE_KEY is the path to a private key like '../util/private_key.bin'\n", os.Args[0])
var bBootstrap bool = false
var privKeyFilePath string
var args []string = os.Args[1:]
if (len(args) == 0) || (len(args) > 2) {
fmt.Printf("Error: wrong number of arguments\n\n%s", usage)
os.Exit(1)
}
if args[0] == "-b" {
bBootstrap = true
args = args[1:]
}
privKeyFilePath = args[0]
return bBootstrap, privKeyFilePath
}
func handleConn(conn inet.Conn) {
ctx := context.Background()
h := ho
fmt.Printf("<NOTICE> Got connection from %v\n", conn.RemoteMultiaddr().String())
_ = h
_ = ctx
}
func main() {
ctx := context.Background()
bBootstrap, privKeyFilePath := parseArgs()
fmt.Printf("Starting up in ")
if bBootstrap {
fmt.Printf("bootstrapper mode")
} else {
fmt.Printf("peer mode")
}
fmt.Printf(" with private key '%s'\n", privKeyFilePath)
//
// Read the private key
//
var privBytes []byte
privBytes, err := ioutil.ReadFile(privKeyFilePath)
if err != nil {
fmt.Println("ioutil.ReadFile: failed: %v", err)
panic(err)
}
var priv crypto.PrivKey
priv, err = crypto.UnmarshalPrivateKey(privBytes)
if err != nil {
fmt.Println("crypto.UnmarshalPrivateKey: failed: %v", err)
panic(err)
}
//
// Construct our libp2p host
//
var host host.Host
if bBootstrap {
host, err = libp2p.New(ctx,
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9876"),
libp2p.Identity(priv),
)
} else {
host, err = libp2p.New(ctx,
libp2p.Identity(priv),
)
}
if err != nil {
fmt.Println("libp2p.New: failed: %v", err)
panic(err)
}
// ho = host
//fmt.Printf("To connect, run:\n")
//fmt.Printf("node js-dht-test/index.js %s/ipfs/%s\n", host.Addrs()[0], host.ID().Pretty())
//
// Construct a floodsub instance for this host
//
fsub, err := floodsub.NewFloodSub(ctx, host)
if err != nil {
fmt.Println("Error (floodsub.NewFloodSub): %v", err)
panic(err)
}
//
// If we are the bootstrap node, don't try to connec to any peers.
// Else: try to connect to the bootstrap node.
//
const bootstrapAddrIP4Str string = "127.0.0.1"
if !bBootstrap {
var bootstrapMultiAddr ma.Multiaddr
var pinfo *peerstore.PeerInfo
bootstrapMultiAddrStr := fmt.Sprintf("/ip4/%s/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6aJ9oRuEzWa", bootstrapAddrIP4Str)
fmt.Printf("bootstrapping to '%s'...\n", bootstrapMultiAddrStr)
bootstrapMultiAddr, err := ma.NewMultiaddr(bootstrapMultiAddrStr)
if err != nil {
fmt.Println("Error (ma.NewMultiaddr): %v", err)
panic(err)
}
pinfo, err = peerstore.InfoFromP2pAddr(bootstrapMultiAddr)
if err != nil {
fmt.Println("Error (ma.NewMultiaddr): %v", err)
panic(err)
}
if err := host.Connect(ctx, *pinfo); err != nil {
fmt.Println("bootstrapping to peer failed: ", err)
}
}
//
// Subscribe to the topic and wait for messages published on that topic
//
sub, err := fsub.Subscribe(TopicName)
if err != nil {
fmt.Println("Error (fsub.Subscribe): %v", err)
panic(err)
}
// Go and listen for messages from them, and print them to the screen
go func() {
for {
msg, err := sub.Next(ctx)
if err != nil {
fmt.Println("Error (sub.Next): %v", err)
panic(err)
}
fmt.Printf("%s: %s\n", msg.GetFrom(), string(msg.GetData()))
}
}()
host.Network().SetConnHandler(handleConn)
if bBootstrap {
fmt.Println("Bootstrapper running. Ctrl+C to exit.")
for true {
}
} else {
// Now, wait for input from the user, and send that out!
fmt.Println("Type something and hit enter to send to other subscribers:")
scan := bufio.NewScanner(os.Stdin)
for scan.Scan() {
if err := fsub.Publish(TopicName, scan.Bytes()); err != nil {
panic(err)
}
}
}
// //
// // Construct a DHT for peer discovery if we are the bootstrap node.
// // Else: construct a DHT client for peer discovery and connect to bootstrap node.
// //
// d, err := dht.New(ctx, host, dhtopts.Client(false))
// if err != nil {
// panic(err)
// }
// dhtPtr = d
}
Loading…
Cancel
Save