Not yet functional start to pubsub

master
Mike Goelzer 6 years ago
parent 470dda7784
commit 46d39a629d
No known key found for this signature in database
GPG Key ID: EDAC46A37751AD6D

1
.gitignore vendored

@ -2,3 +2,4 @@ dht-interop
pubsub-interop
content-dht-provide-find/js-dht-test/node_modules/
util/private-key-gen
pubsub/js/node_modules

@ -19,14 +19,10 @@ Note that the node ID of `dht-interop` is always `Qm...6aJ9oRuEzWa` because it i
**Second terminal:** run the command printed out by dht-interop, replacing 127.0.0.1 with the IP of the server where dht-interop is listening. Example:
First time only:
```
cd content-dht-provide-find/js-dht-test
npm install
```
Running the Node.js program:
```
cd content-dht-provide-find/js-dht-test
npm install # first time only
node js-dht-test/index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6aJ9oRuEzWa
```
@ -60,7 +56,8 @@ This peer, which is not in bootstrapper mode, creates a node, subscribes to the
**Third terminal**: Create a JS peer to connect to bootstrap and publish on topic
```
cd pubsub/js
npm install # first time only
node index.js /ip4/127.0.0.1/tcp/9876/ipfs/QmehVYruznbyDZuHBV4vEHESpDevMoAovET6aJ9oRuEzWa
```
_Acknowledgements: @jhiesey for DHT (content & peer routing) JS+Go interop, @stebalien for PubSub_

@ -0,0 +1,107 @@
'use strict'
const libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const PeerInfo = require('peer-info')
const FloodSub = require('libp2p-floodsub')
const CID = require('cids')
const KadDHT = require('libp2p-kad-dht')
const defaultsDeep = require('@nodeutils/defaults-deep')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
var fsub;
class MyBundle extends libp2p {
constructor (_options) {
const defaults = {
modules: {
transport: [ TCP ],
streamMuxer: [ Mplex ],
connEncryption: [ SECIO ],
// // we add the DHT module that will enable Peer and Content Routing
// dht: KadDHT
},
config: {
// dht: {
// kBucketSize: 20
// },
// EXPERIMENTAL: {
// dht: true
// }
}
}
super(defaultsDeep(_options, defaults))
}
}
function createNode (callback) {
let node
waterfall([
(cb) => PeerInfo.create(cb),
(peerInfo, cb) => {
peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
node = new MyBundle({
peerInfo
})
node.start(cb)
}
], (err) => callback(err, node))
}
parallel([
(cb) => createNode(cb),
], (err, nodes) => {
if (err) { throw err }
const node1 = nodes[0]
const bootstrapAddr = process.argv[2]
console.log('Connecting to:', bootstrapAddr)
parallel([
(cb) => node1.dial(bootstrapAddr, cb),
// Set up of the cons might take time
(cb) => setTimeout(cb, 3000)
], (err) => { if (err) { throw err }
setTimeout( function() {
console.log("My ID: " + node1.peerInfo.id._idB58String)
fsub = new FloodSub(node1)
console.log("Initialized fsub\n")
fsub.start((err) => {
if (err) {
console.log('Error:', err)
}
fsub.on('libp2p-demo-chat', (data) => {
console.log(">>> Got some data:")
console.log(" From: " + data.from);
console.log(" Data: " + data.data);
console.log(" SeqNo: " + data.seqno);
console.log(" topicIDs: " + data.topicIDs);
console.log(">>> END -- data\n")
})
fsub.subscribe('libp2p-demo-chat')
fsub.publish('libp2p-demo-chat', new Buffer('Hello from JS (0)'))
})
}, 3000)
})
})
var i = 0
function pubsubloop() {
i = i + 1
var s = new Buffer('Hello from JS ('+i+')')
fsub.publish('libp2p-demo-chat',s)
}
setInterval(pubsubloop,10000);

@ -0,0 +1 @@
../peer-id/src/bin.js

@ -0,0 +1 @@
../pem-jwk/bin/pem-jwk.js

@ -0,0 +1 @@
../rsa-unpack/bin/cmd.js

@ -0,0 +1 @@
../semver/bin/semver

@ -0,0 +1 @@
../sha.js/bin.js

1
pubsub/js/node_modules/.bin/uuid generated vendored

@ -0,0 +1 @@
../uuid/bin/uuid

@ -0,0 +1,23 @@
{
"name": "js-dht-test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"async": "^2.6.1",
"cids": "^0.5.3",
"libp2p": "^0.23.1",
"libp2p-kad-dht": "^0.10.2",
"libp2p-mplex": "^0.8.0",
"libp2p-secio": "^0.10.0",
"libp2p-tcp": "^0.12.1",
"peer-info": "^0.14.1",
"libp2p-floodsub": "^0.15.0"
}
}

@ -6,10 +6,12 @@ import (
"fmt"
"io/ioutil"
"os"
_ "time"
// ipfsaddr "github.com/ipfs/go-ipfs-addr"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-crypto"
@ -18,8 +20,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
//var ho h.Host
var ho host.Host
//var dhtPtr *dht.IpfsDHT
var TopicName string = "libp2p-demo-chat"
func parseArgs() (bool, string) {
@ -39,6 +43,14 @@ func parseArgs() (bool, string) {
return bBootstrap, privKeyFilePath
}
func handleConn(conn inet.Conn) {
ctx := context.Background()
h := ho
fmt.Printf("<NOTICE> Got connection from %v\n", conn.RemoteMultiaddr().String())
_ = h
_ = ctx
}
func main() {
ctx := context.Background()
@ -149,6 +161,8 @@ func main() {
}
}()
host.Network().SetConnHandler(handleConn)
if bBootstrap {
fmt.Println("Bootstrapper running. Ctrl+C to exit.")
for true {

Loading…
Cancel
Save