2021-12-24 02:49:45 +00:00
|
|
|
package database
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-12-26 20:37:28 +00:00
|
|
|
"sort"
|
2021-12-25 18:33:33 +00:00
|
|
|
"sync"
|
2021-12-24 02:49:45 +00:00
|
|
|
|
|
|
|
orbitdb "berty.tech/go-orbit-db"
|
|
|
|
"berty.tech/go-orbit-db/accesscontroller"
|
2021-12-25 18:33:33 +00:00
|
|
|
"berty.tech/go-orbit-db/events"
|
2021-12-24 02:49:45 +00:00
|
|
|
"berty.tech/go-orbit-db/iface"
|
2021-12-25 18:33:33 +00:00
|
|
|
"berty.tech/go-orbit-db/stores"
|
2021-12-24 02:49:45 +00:00
|
|
|
"berty.tech/go-orbit-db/stores/documentstore"
|
|
|
|
config "github.com/ipfs/go-ipfs-config"
|
|
|
|
icore "github.com/ipfs/interface-go-ipfs-core"
|
2021-12-25 18:33:33 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
2021-12-24 02:49:45 +00:00
|
|
|
"github.com/mitchellh/mapstructure"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
|
|
"github.com/mrusme/superhighway84/models"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Database struct {
|
|
|
|
ctx context.Context
|
2021-12-26 15:55:37 +00:00
|
|
|
ConnectionString string
|
2021-12-24 02:49:45 +00:00
|
|
|
URI string
|
|
|
|
Cache string
|
|
|
|
|
|
|
|
Logger *zap.Logger
|
|
|
|
IPFSNode icore.CoreAPI
|
|
|
|
OrbitDB orbitdb.OrbitDB
|
|
|
|
Store orbitdb.DocumentStore
|
2021-12-25 18:33:33 +00:00
|
|
|
StoreEventChan <-chan events.Event
|
2021-12-24 02:49:45 +00:00
|
|
|
}
|
|
|
|
|
2021-12-26 15:55:37 +00:00
|
|
|
func (db *Database) init() (error) {
|
2021-12-24 02:49:45 +00:00
|
|
|
var err error
|
|
|
|
|
|
|
|
db.OrbitDB, err = orbitdb.NewOrbitDB(db.ctx, db.IPFSNode, &orbitdb.NewOrbitDBOptions{
|
|
|
|
Directory: &db.Cache,
|
|
|
|
Logger: db.Logger,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
ac := &accesscontroller.CreateAccessControllerOptions{
|
|
|
|
Access: map[string][]string{
|
|
|
|
"write": {
|
|
|
|
"*",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-12-26 07:02:15 +00:00
|
|
|
// addr, err := db.OrbitDB.DetermineAddress(db.ctx, db.Name, "docstore", &orbitdb.DetermineAddressOptions{})
|
|
|
|
// if err != nil {
|
|
|
|
// return err
|
|
|
|
// }
|
|
|
|
// db.URI = addr.String()
|
2021-12-24 02:49:45 +00:00
|
|
|
|
2021-12-26 07:02:15 +00:00
|
|
|
storetype := "docstore"
|
2021-12-26 15:55:37 +00:00
|
|
|
db.Store, err = db.OrbitDB.Docs(db.ctx, db.ConnectionString, &orbitdb.CreateDBOptions{
|
2021-12-24 02:49:45 +00:00
|
|
|
AccessController: ac,
|
|
|
|
StoreType: &storetype,
|
|
|
|
StoreSpecificOpts: documentstore.DefaultStoreOptsForMap("id"),
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-12-25 18:33:33 +00:00
|
|
|
db.StoreEventChan = db.Store.Subscribe(db.ctx)
|
2021-12-24 02:49:45 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-25 18:33:33 +00:00
|
|
|
func(db *Database) connectToPeers() error {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
peerInfos, err := config.DefaultBootstrapPeers()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Add(len(peerInfos))
|
|
|
|
for _, peerInfo := range peerInfos {
|
|
|
|
go func(peerInfo *peer.AddrInfo) {
|
|
|
|
defer wg.Done()
|
|
|
|
err := db.IPFSNode.Swarm().Connect(db.ctx, *peerInfo)
|
|
|
|
if err != nil {
|
|
|
|
db.Logger.Debug("failed to connect", zap.String("peerID", peerInfo.ID.String()), zap.Error(err))
|
|
|
|
} else {
|
|
|
|
db.Logger.Debug("connected!", zap.String("peerID", peerInfo.ID.String()))
|
|
|
|
}
|
|
|
|
}(&peerInfo)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-24 02:49:45 +00:00
|
|
|
func NewDatabase(
|
|
|
|
ctx context.Context,
|
2021-12-26 15:55:37 +00:00
|
|
|
dbConnectionString string,
|
2021-12-24 02:49:45 +00:00
|
|
|
dbCache string,
|
|
|
|
logger *zap.Logger,
|
|
|
|
) (*Database, error) {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
db := new(Database)
|
|
|
|
db.ctx = ctx
|
2021-12-26 15:55:37 +00:00
|
|
|
db.ConnectionString = dbConnectionString
|
2021-12-24 02:49:45 +00:00
|
|
|
db.Cache = dbCache
|
|
|
|
db.Logger = logger
|
|
|
|
|
|
|
|
defaultPath, err := config.PathRoot()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := setupPlugins(defaultPath); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
db.IPFSNode, err = createNode(ctx, defaultPath)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-12-25 18:33:33 +00:00
|
|
|
return db, nil
|
|
|
|
}
|
2021-12-24 02:49:45 +00:00
|
|
|
|
2021-12-26 07:02:15 +00:00
|
|
|
func (db *Database) Connect(onReady func(address string)) (error) {
|
2021-12-25 18:33:33 +00:00
|
|
|
var err error
|
|
|
|
|
2021-12-26 07:02:15 +00:00
|
|
|
// if db.Init {
|
2021-12-24 02:49:45 +00:00
|
|
|
err = db.init()
|
|
|
|
if err != nil {
|
2021-12-25 18:33:33 +00:00
|
|
|
return err
|
2021-12-24 02:49:45 +00:00
|
|
|
}
|
2021-12-26 07:02:15 +00:00
|
|
|
// } else {
|
|
|
|
// err = db.open()
|
|
|
|
// if err != nil {
|
|
|
|
// return err
|
|
|
|
// }
|
|
|
|
// }
|
2021-12-24 02:49:45 +00:00
|
|
|
|
2021-12-25 18:33:33 +00:00
|
|
|
// go func() {
|
|
|
|
err = db.connectToPeers()
|
2021-12-24 02:49:45 +00:00
|
|
|
if err != nil {
|
|
|
|
db.Logger.Debug("failed to connect: %s", zap.Error(err))
|
|
|
|
} else {
|
|
|
|
db.Logger.Debug("connected to peer!")
|
|
|
|
}
|
2021-12-25 18:33:33 +00:00
|
|
|
// }()
|
|
|
|
|
2021-12-26 07:02:15 +00:00
|
|
|
// log.Println(db.Store.ReplicationStatus().GetBuffered())
|
|
|
|
// log.Println(db.Store.ReplicationStatus().GetQueued())
|
|
|
|
// log.Println(db.Store.ReplicationStatus().GetProgress())
|
|
|
|
|
|
|
|
db.Logger.Info("running ...")
|
|
|
|
|
2021-12-25 18:33:33 +00:00
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
for ev := range db.StoreEventChan {
|
2021-12-26 07:02:15 +00:00
|
|
|
db.Logger.Debug("got event", zap.Any("event", ev))
|
2021-12-25 18:33:33 +00:00
|
|
|
switch ev.(type) {
|
|
|
|
case *stores.EventReady:
|
2021-12-26 15:55:37 +00:00
|
|
|
db.URI = db.Store.Address().String()
|
|
|
|
onReady(db.URI)
|
2021-12-25 18:33:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
err = db.Store.Load(db.ctx, -1)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: clean up
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-12-24 02:49:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) Disconnect() {
|
|
|
|
db.OrbitDB.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) SubmitArticle(article *models.Article) (error) {
|
|
|
|
entity := structToMap(&article)
|
|
|
|
entity["type"] = "article"
|
|
|
|
|
|
|
|
_, err := db.Store.Put(db.ctx, entity)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) GetArticleByID(id string) (models.Article, error) {
|
|
|
|
entity, err := db.Store.Get(db.ctx, id, &iface.DocumentStoreGetOptions{CaseInsensitive: false})
|
|
|
|
if err != nil {
|
|
|
|
return models.Article{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var article models.Article
|
|
|
|
err = mapstructure.Decode(entity[0], &article)
|
|
|
|
if err != nil {
|
|
|
|
return models.Article{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return article, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *Database) ListArticles() ([]models.Article, error) {
|
|
|
|
var articles []models.Article
|
|
|
|
|
2021-12-26 20:37:28 +00:00
|
|
|
_, err := db.Store.Query(db.ctx, func(e interface{})(bool, error) {
|
2021-12-24 02:49:45 +00:00
|
|
|
entity := e.(map[string]interface{})
|
|
|
|
if entity["type"] == "article" {
|
2021-12-26 20:37:28 +00:00
|
|
|
var article models.Article
|
|
|
|
err := mapstructure.Decode(entity, &article)
|
|
|
|
if err == nil {
|
|
|
|
articles = append(articles, article)
|
|
|
|
}
|
|
|
|
return true, err
|
2021-12-24 02:49:45 +00:00
|
|
|
}
|
|
|
|
return false, nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return articles, err
|
|
|
|
}
|
|
|
|
|
2021-12-26 20:37:28 +00:00
|
|
|
sort.SliceStable(articles, func(i, j int) bool {
|
|
|
|
return articles[i].Date > articles[j].Date
|
|
|
|
})
|
2021-12-24 02:49:45 +00:00
|
|
|
|
|
|
|
return articles, nil
|
|
|
|
}
|
|
|
|
|