Imported jhiesey's DHT interop code

jhiesey-dht-interop-closure-instead-of-globals
Mike Goelzer 6 years ago
parent d78794be23
commit 0c8d70120e
No known key found for this signature in database
GPG Key ID: EDAC46A37751AD6D

3
.gitignore vendored

@ -1 +1,2 @@
go-peer
dht-interop
js-dht-test/node_modules/

@ -1,4 +1,4 @@
SRC = go-peer.go
SRC = dht-interop.go
BIN = $(SRC:.go=)
DHT_SERVER = libp2p-bootstrap.goelzer.io
@ -7,4 +7,4 @@ all: $(SRC)
install:
scp $(SRC:.go=) $(DHT_SERVER):~/
ssh $(DHT_SERVER) ./$(BIN) --bootstrap
ssh $(DHT_SERVER) ./$(BIN)

@ -1,5 +0,0 @@
# libp2p-demo-go-js-rust
Simple libp2p demo chat app for later use as training material
*WIP*

@ -0,0 +1,15 @@
0. Help me fix node "Error: Cannot find module 'libp2p'"
Answer:
cd js-dht-test
npm install
2. Let's run it on bootstrap box (go) and locally (js)
WORKS!
3. Explain to me how you generated peer CIDs
One is the hash of a pubsub topic
The other is just mine
All this is example does is verify that two peers can find each other's content

@ -0,0 +1,94 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/libp2p/go-libp2p"
h "github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/opts"
"github.com/libp2p/go-libp2p-net"
"github.com/ipfs/go-cid"
)
var ho h.Host
var dhtPtr *dht.IpfsDHT
func handleConn(conn net.Conn) {
ctx := context.Background()
d := *dhtPtr
provideCid, err := cid.Decode("zb2rhXqLbdjpXnJG99QsjM6Nc6xaDKgEr2FfugDJynE7H2NR6")
if err != nil {
panic(err)
}
findCid, err := cid.Decode("QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL")
if err != nil {
panic(err)
}
time.Sleep(5 * time.Second)
// First, announce ourselves as participating in this topic
fmt.Println("announcing ourselves...")
tctx, _ := context.WithTimeout(ctx, time.Second*10)
if err := d.Provide(tctx, provideCid, true); err != nil {
panic(err)
}
fmt.Printf("Local node %s is providing %s\n", ho.ID().Pretty(), provideCid)
// Now, look for others who have announced
fmt.Println("searching for other peers...")
tctx, _ = context.WithTimeout(ctx, time.Second*10)
providers, err := d.FindProviders(tctx, findCid)
if err != nil {
panic(err)
}
if len(providers) != 0 {
provider := providers[0]
fmt.Printf("Remote node %s is providing %s\n", provider.ID.Pretty(), findCid)
time.Sleep(5 * time.Second)
os.Exit(0)
} else {
fmt.Printf("no remote providers!\n")
}
}
func main() {
ctx := context.Background()
//
// Set up a libp2p host.
//
host, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9876"))
if err != nil {
fmt.Println("libp2p.New: failed: %v",err)
panic(err)
}
host.Network().SetConnHandler(handleConn)
ho = host
fmt.Printf("To connect, run:\n")
fmt.Printf("node js-dht-test/index.js %s/ipfs/%s\n", host.Addrs()[0], host.ID().Pretty())
//
// Construct a DHT for discovery.
//
d, err := dht.New(ctx, host, dhtopts.Client(false) )
if err != nil {
panic(err)
}
dhtPtr = d
select {}
}

@ -1,250 +0,0 @@
package main
import (
"bufio"
"context"
"fmt"
"os"
"time"
"strings"
"net"
"github.com/multiformats/go-multihash"
"github.com/libp2p/go-floodsub"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/opts"
_ "github.com/libp2p/go-libp2p-peerstore"
"github.com/ipfs/go-cid"
// "github.com/ipfs/go-datastore"
_ "github.com/ipfs/go-ipfs-addr"
)
const bootstrapHostname string = "libp2p-bootstrap.goelzer.io"
func usage() {
fmt.Printf("Usage: %s [OPTIONS]\n\n", os.Args[0]);
fmt.Printf(" --bootstrapper Start a DHT; initial peer only.\n");
fmt.Printf(" --peer=HOST Connect to IP to join the libp2p swarm\n");
//fmt.Printf(" --port=N Listen and connect on N\n");
fmt.Printf(" --help Display this message\n");
fmt.Printf("Note that --bootstrapper and --peer are mutually exclusive.\n");
fmt.Printf("\nThis program demonstrates a libp2p swarm. The first node\n");
fmt.Printf("is started with `--bootstrapper`. Add a second node with\n");
fmt.Printf("`--peer=IP_OF_FIRST_NODE`.\n\n");
}
func printMutuallyExclusiveErrorAndDie() {
fmt.Printf("Error: --bootstrapper and --peer are mutually exclusive\n\n");
os.Exit(1);
}
func parseArgs(isBootstrap *bool, peerAddrStr *string) {
*isBootstrap = false;
*peerAddrStr = "";
for _,arg := range os.Args[1:] {
if (arg == "--help") {
// --help = print usage and die
usage();
os.Exit(1);
} else if (arg == "--bootstrapper") {
// Bootstrap mode '--bootstrapper' => we create a DHT
if (*peerAddrStr != "") {
printMutuallyExclusiveErrorAndDie();
}
*isBootstrap = true;
} else if (strings.HasPrefix(arg,"--peer=")) {
// Peer mode: won't create a DHT but instead connect to peer IP
if (*isBootstrap == true) {
printMutuallyExclusiveErrorAndDie();
}
*peerAddrStr = arg[7:];
} else {
fmt.Printf("Invalid argument: '%s'\n\n",arg);
os.Exit(1);
}
}
}
func hostToIpStr(hostname string) string {
IPAddr, err := net.ResolveIPAddr("ip", hostname)
if err != nil {
fmt.Println("Unable to resolve '%s'", hostname)
os.Exit(1)
}
return IPAddr.String()
}
func main() {
var isBootstrap bool;
var peerAddrStr string;
parseArgs(&isBootstrap, &peerAddrStr);
ctx := context.Background()
//
// Set up a libp2p host.
//
host, err := libp2p.New(ctx, libp2p.Defaults, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/5555") )
if err != nil {
fmt.Println("libp2p.New: failed: %v",err)
panic(err)
}
//fmt.Println("My adress: /ip4/159.89.221.55/tcp/5555/ipfs/%s", host.ID().Pretty() )
PubSubTopicName := "libp2p-go-js-rust-chat"
_ = PubSubTopicName //TODO
//
// Construct ourselves a pubsub instance using that libp2p host.
//
fsub, err := floodsub.NewFloodSub(ctx, host)
if err != nil {
panic(err)
}
_ = fsub //TODO
//
// Construct a DHT for discovery (client only for peer mode)
//
var Dht *dht.IpfsDHT;
if (isBootstrap) {
fmt.Println("Bootstrap Mode");
var err error
Dht, err = dht.New(ctx, host, dhtopts.Client(false) )
if err != nil {
panic(err)
}
fmt.Println("Success: host id '%s'",host.ID().Pretty() );
} else {
s := peerAddrStr;
peerAddrStr = hostToIpStr(s);
fmt.Printf("Peer Mode (peer address = '%s' ('%s')\n",s,peerAddrStr);
var err error
Dht, err = dht.New(ctx, host, dhtopts.Client(true) )
if err != nil {
panic(err)
}
bootstrapPeers := []string{
//TODO: this is wrong; need to know bootstrap host's pretty ID,
// not the pretty ID of ourselves!
fmt.Sprintf("/ip4/159.89.221.55/tcp/5555/ipfs/%s", host.ID().Pretty()),
}
fmt.Println("bootstrapping...")
for _, addr := range bootstrapPeers {
iaddr, _ := ipfsaddr.ParseString(addr)
pinfo, _ := peerstore.InfoFromP2pAddr(iaddr.Multiaddr())
if err := host.Connect(ctx, *pinfo); err != nil {
fmt.Println("bootstrapping to peer failed: ", err)
}
}
}
_ = Dht
// These are the IPFS bootstrap nodes:
//
// bootstrapPeers := []string{
// "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
// "/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",
// "/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",
// "/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",
// "/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",
// }
//// bootstrapPeers := []string{
//// fmt.Sprintf("/ip4/159.89.221.55/tcp/5555/ipfs/%s", host.ID().Pretty()),
//// }
//
// fmt.Println("bootstrapping...")
// for _, addr := range bootstrapPeers {
// iaddr, _ := ipfsaddr.ParseString(addr)
//
// pinfo, _ := peerstore.InfoFromP2pAddr(iaddr.Multiaddr())
//
// if err := host.Connect(ctx, *pinfo); err != nil {
// fmt.Println("bootstrapping to peer failed: ", err)
// }
// }
// Using the sha256 of our "topic" as our rendezvous value
c, _ := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum([]byte(PubSubTopicName))
// First, announce ourselves as participating in this topic
fmt.Println("announcing ourselves...")
tctx, _ := context.WithTimeout(ctx, time.Second*10)
if err := Dht.Provide(tctx, c, true); err != nil {
panic(err)
}
// // Now, look for others who have announced
// fmt.Println("searching for other peers...")
// tctx, _ = context.WithTimeout(ctx, time.Second*10)
// peers, err := dht.FindProviders(tctx, c)
// if err != nil {
// panic(err)
// }
// fmt.Printf("Found %d peers!\n", len(peers))
//
// // Now connect to them!
// for _, p := range peers {
// if p.ID == host.ID() {
// // No sense connecting to ourselves
// continue
// }
//
// tctx, _ := context.WithTimeout(ctx, time.Second*5)
// if err := host.Connect(tctx, p); err != nil {
// fmt.Println("failed to connect to peer: ", err)
// }
// }
//
// fmt.Println("bootstrapping and discovery complete!")
//
sub, err := fsub.Subscribe(PubSubTopicName)
if err != nil {
panic(err)
}
// Go and listen for messages from them, and print them to the screen
go func() {
for {
msg, err := sub.Next(ctx)
if err != nil {
panic(err)
}
fmt.Printf("%s: %s\n", msg.GetFrom(), string(msg.GetData()))
}
}()
// Now, wait for input from the user, and send that out!
fmt.Println("Type something and hit enter to send:")
scan := bufio.NewScanner(os.Stdin)
for scan.Scan() {
if err := fsub.Publish(PubSubTopicName, scan.Bytes()); err != nil {
panic(err)
}
}
}

@ -0,0 +1,96 @@
'use strict'
const libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const PeerInfo = require('peer-info')
const CID = require('cids')
const KadDHT = require('libp2p-kad-dht')
const defaultsDeep = require('@nodeutils/defaults-deep')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
class MyBundle extends libp2p {
constructor (_options) {
const defaults = {
modules: {
transport: [ TCP ],
streamMuxer: [ Mplex ],
connEncryption: [ SECIO ],
// we add the DHT module that will enable Peer and Content Routing
dht: KadDHT
},
config: {
dht: {
kBucketSize: 20
},
EXPERIMENTAL: {
dht: true
}
}
}
super(defaultsDeep(_options, defaults))
}
}
function createNode (callback) {
let node
waterfall([
(cb) => PeerInfo.create(cb),
(peerInfo, cb) => {
peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
node = new MyBundle({
peerInfo
})
node.start(cb)
}
], (err) => callback(err, node))
}
parallel([
(cb) => createNode(cb),
], (err, nodes) => {
if (err) { throw err }
const node1 = nodes[0]
const bootstrapAddr = process.argv[2]
console.log('Connecting to:', bootstrapAddr)
parallel([
(cb) => node1.dial(bootstrapAddr, cb),
// Set up of the cons might take time
(cb) => setTimeout(cb, 300)
], (err) => {
if (err) { throw err }
const provideCid = new CID('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL')
const findCid = new CID('zb2rhXqLbdjpXnJG99QsjM6Nc6xaDKgEr2FfugDJynE7H2NR6')
node1.contentRouting.provide(provideCid, (err) => {
if (err) { throw err }
console.log('Local node %s is providing %s', node1.peerInfo.id.toB58String(), provideCid.toBaseEncodedString())
setTimeout(() => {
node1.contentRouting.findProviders(findCid, 10000, (err, providers) => {
if (err) { throw err }
if (providers.length !== 0) {
const provider = providers[0]
// console.log(provider)
console.log('Remote node %s is providing %s', provider.id.toB58String(), findCid.toBaseEncodedString())
setTimeout(() => {
process.exit(0)
}, 5000)
} else {
console.log('No remote providers found!')
}
})
}, 5000)
})
})
})

@ -0,0 +1,22 @@
{
"name": "js-dht-test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"async": "^2.6.1",
"cids": "^0.5.3",
"libp2p": "^0.23.1",
"libp2p-kad-dht": "^0.10.2",
"libp2p-mplex": "^0.8.0",
"libp2p-secio": "^0.10.0",
"libp2p-tcp": "^0.12.1",
"peer-info": "^0.14.1"
}
}
Loading…
Cancel
Save