Working redis with encoding

use-redis
Chakib Benziane 5 years ago
parent 8dce666783
commit aa892eca4c

@ -3,92 +3,50 @@ package db
import (
"fmt"
"log"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"git.sp4ke.com/sp4ke/bit4sat/utils"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/mediocregopher/radix/v3"
"github.com/mediocregopher/radix/v3/resp/resp2"
)
const (
DBName = "bit4sat.sqlite"
DBPath = "db-storage"
DBPathEnv = "BIT4SAT_DB_PATH"
DBPragma = ` PRAGMA foreign_keys = ON; `
RedisHost = "redis"
RedisHostEnv = "REDIS_HOST"
RedisPort = 6379
)
var (
DBOptions = map[string]string{
"_journal_mode": "WAL",
}
DB *Database
RuntimeRedisHost string
DB *radix.Pool
)
type Database struct {
Sql *sqlx.DB
Redis *radix.Pool
}
func (d *Database) Open() error {
dsnOptions := &url.Values{}
for k, v := range DBOptions {
dsnOptions.Set(k, v)
}
// Get db base path
path, set := os.LookupEnv(DBPathEnv)
if !set {
path = DBPath
}
// Create path if not exists
err := utils.Mkdir(path)
if err != nil {
log.Fatal(err)
}
path = filepath.Join(path, DBName)
//path = fmt.Sprintf("%s/%s", path, DBName)
dsn := fmt.Sprintf("file:%s?%s", path, dsnOptions.Encode())
func RedisMarshal(v interface{}) string {
log.Printf("Opening sqlite db %s\n", dsn)
d.Sql, err = sqlx.Open("sqlite3", dsn)
if err != nil {
log.Fatal(err)
data := resp2.Any{
I: v,
MarshalBulkString: true,
MarshalNoArrayHeaders: true,
}
marshalled := &strings.Builder{}
data.MarshalRESP(marshalled)
// Execute Pragmas
d.Sql.MustExec(DBPragma)
return nil
return marshalled.String()
}
func RollbackTx(tx *sqlx.Tx, callback func()) {
log.Println("Rolling back transactions !")
err := tx.Rollback()
if err != nil {
log.Fatal(err)
}
func init() {
var err error
if callback != nil {
callback()
RuntimeRedisHost, set := os.LookupEnv(RedisHostEnv)
if !set {
RuntimeRedisHost = RedisHost
}
}
func init() {
DB = &Database{}
DB.Open()
var err error
DB.Redis, err = radix.NewPool("tcp", "redis:6379", 10)
redisLoc := fmt.Sprintf("%s:%s", RuntimeRedisHost, strconv.Itoa(RedisPort))
DB, err = radix.NewPool("tcp", redisLoc, 10)
if err != nil {
log.Fatal(err)
}
}

@ -13,6 +13,7 @@ services:
environment:
- GO111MODULE=on
- BIT4SAT_DB_PATH=/sqlite
- BIT4SAT_STORAGE_PATH=/storage
- GOPATH=/go
#deploy:
@ -25,6 +26,7 @@ services:
- $PWD:/src
- gocache:/go
- ./db-storage:/sqlite
- file-storage:/storage
working_dir: /src

@ -0,0 +1,23 @@
package encoding
import (
"bytes"
"encoding/gob"
)
func ToBytes(v interface{}) ([]byte, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(v); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func FromBytes(v []byte, dst interface{}) error {
buffer := bytes.NewBuffer(v)
dec := gob.NewDecoder(buffer)
return dec.Decode(dst)
}

@ -3,8 +3,10 @@ module git.sp4ke.com/sp4ke/bit4sat
go 1.12
require (
github.com/fatih/structs v1.1.0
github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a
github.com/gin-gonic/gin v1.3.0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/jmoiron/sqlx v1.2.0
github.com/mattn/go-sqlite3 v1.10.0
github.com/mediocregopher/radix/v3 v3.2.3

@ -1,4 +1,6 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a h1:zBycVvXa03SIX+jdMv8wGu9TMDMWdN8EhaR1FoeKHNo=
github.com/gin-contrib/cors v0.0.0-20190301062745-f9e10995c85a/go.mod h1:pL2kNE+DgDU+eQ+dary5bX0Z6LPP8nR6Mqs1iejILw4=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 h1:AzN37oI0cOS+cougNAV9szl6CVoj2RYwzS3DpUQNtlY=
@ -8,6 +10,8 @@ github.com/gin-gonic/gin v1.3.0/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=

@ -3,9 +3,8 @@ package main
import "git.sp4ke.com/sp4ke/bit4sat/db"
func main() {
defer db.DB.Sql.Close()
defer db.DB.Close()
api := NewAPI()
api.Run()
}

@ -25,7 +25,7 @@ func GetStoreDestination(filename string) string {
return filepath.Join(RuntimeStoragePath, filename)
}
func StoreFormFile(src multipart.File, filename string) error {
func storeFormFile(src multipart.File, filename string) error {
filePath := GetStoreDestination(filename)
// If file exists just return
exists, err := CheckFileExists(filePath)
@ -67,6 +67,15 @@ func CheckFileExists(file string) (bool, error) {
return false, err
}
func removeFiles(paths []string) {
for _, path := range paths {
err := os.Remove(path)
if err != nil {
log.Fatal(err)
}
}
}
func init() {
path, set := os.LookupEnv(StoragePathEnv)
if !set {

@ -7,9 +7,7 @@ import (
"io"
"log"
"net/http"
"os"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/utils"
"github.com/gin-gonic/gin"
"github.com/segmentio/ksuid"
@ -28,34 +26,21 @@ func (ctrl UploadCtrl) New(c *gin.Context) {
// Create unique id
id := ksuid.New()
tx, err := db.DB.Sql.Beginx()
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
}
up := NewUpload()
up.ID = id.String()
up.Status = UpNew
for _, file := range uploadForm.Files {
up := &Upload{}
up.ID = id.String()
upfile := &UpFile{}
log.Println(file.Name)
up.FileName, up.FileExt = utils.CleanFileName(file.Name)
upfile.FileName, upfile.FileExt = utils.CleanFileName(file.Name)
upfile.FileSize = file.Size
upfile.FileType = file.Type
up.FileSize = file.Size
up.FileType = file.Type
up.SHA256 = file.SHA256
up.Status = UpNew
err := up.TxWrite(tx)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
db.RollbackTx(tx, nil)
return
}
up.Files[SHA256(file.SHA256)] = upfile
}
err = tx.Commit()
err := up.Write()
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
return
@ -93,18 +78,28 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) {
return
}
files := form.File["upload[]"]
tx, err := db.DB.Sql.Beginx()
// Get the file's metadata from upload collection
up, err := GetByID(id)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
utils.JSONErrPriv(c, http.StatusNotFound, err)
return
}
log.Printf("Found matching upload %s", up.ID)
if up.Status == UpStored {
utils.JSONErr(c, http.StatusConflict, "already uploaded")
return
}
files := form.File["upload[]"]
var storedFiles []string
for _, file := range files {
log.Printf("Handling %s", file.Filename)
// Get the file's sha256
// Get the form's file sha256
hasher := sha256.New()
formFd, err := file.Open()
@ -115,69 +110,65 @@ func (ctrl UploadCtrl) Upload(c *gin.Context) {
c, http.StatusInternalServerError,
fmt.Sprintf("Could not read file %s", file.Filename),
)
db.RollbackTx(tx, nil)
return
}
_, err = io.Copy(hasher, formFd)
if err != nil {
utils.JSONErr(c, http.StatusInternalServerError,
"")
db.RollbackTx(tx, nil)
utils.JSONErr(c, http.StatusInternalServerError, "")
return
}
sum256 := hex.EncodeToString(hasher.Sum(nil))
// Get the file's metadata from upload table
up, err := GetByHashID(sum256, id)
if err != nil {
utils.JSONErrPriv(c, http.StatusNotFound, err)
db.RollbackTx(tx, nil)
// Check sha256 same as present metadata
fileMeta, same := up.Files[SHA256(sum256)]
if !same {
utils.JSONErr(c, http.StatusConflict,
fmt.Sprintf("wrong hash for %s", file.Filename),
)
return
}
log.Printf("Found received file's %s metadata in local database\n", up.FileName)
//log.Println(up)
// Store file
if up.Status == UpStored {
utils.JSONErr(c, http.StatusConflict,
fmt.Sprintf("%s already uploaded", up.FileName))
return
}
//
//
log.Println("Storing file")
err = StoreFormFile(formFd, up.SHA256+up.FileExt)
log.Printf("Storing file %s", file.Filename)
err = storeFormFile(formFd, sum256+fileMeta.FileExt)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
db.RollbackTx(tx, nil)
removeFiles(storedFiles)
return
}
log.Printf("%s stored at %s", up.FileName,
GetStoreDestination(up.SHA256+up.FileExt))
storedFiles = append(storedFiles, GetStoreDestination(sum256+fileMeta.FileExt))
log.Printf("%s stored at %s", fileMeta.FileName,
GetStoreDestination(sum256+fileMeta.FileExt))
// Setting status to stored
log.Println("Updating upload status to stored")
up.Status = UpStored
err = up.TxSetState(tx, UpStored)
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
db.RollbackTx(tx, func() {
err := os.Remove(GetStoreDestination(up.SHA256 + up.FileExt))
if err != nil {
log.Println(err)
}
})
return
}
//log.Println("Updating upload status to stored")
//up.Status = UpStored
//err = up.TxSetState(tx, UpStored)
//if err != nil {
//utils.JSONErrPriv(c, http.StatusInternalServerError, err)
//db.RollbackTx(tx, func() {
//err := os.Remove(GetStoreDestination(up.SHA256 + up.FileExt))
//if err != nil {
//log.Println(err)
//}
//})
//return
//}
}
err = tx.Commit()
up.Status = UpStored
// Commit
err = up.Write()
if err != nil {
utils.JSONErrPriv(c, http.StatusInternalServerError, err)
removeFiles(storedFiles)
return
}

@ -1,58 +1,15 @@
package storage
import (
"database/sql"
"errors"
"log"
"fmt"
"git.sp4ke.com/sp4ke/bit4sat/db"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"git.sp4ke.com/sp4ke/bit4sat/encoding"
"github.com/mediocregopher/radix/v3"
)
var DB = db.DB
const (
DBUploadSchema = `
CREATE TABLE IF NOT EXISTS "upload" (
upload_id TEXT NOT NULL,
sha256 TEXT NOT NULL,
file_name TEXT NOT NULL,
file_type TEXT DEFAULT '',
file_size INTEGER NOT NULL,
file_ext TEXT DEFAULT '',
status INTEGER DEFAULT 0,
FOREIGN KEY (status) REFERENCES upload_status(type),
UNIQUE (upload_id, sha256)
);
`
DBUploadView = `
CREATE VIEW IF NOT EXISTS "upload_with_status" AS SELECT * FROM upload JOIN upload_status ON
upload.status = upload_status.type
`
DBUploadStatusSchema = `CREATE TABLE IF NOT EXISTS upload_status
(
type INTEGER PRIMARY KEY,
status TEXT NOT NULL UNIQUE
)
`
QNewUpload = `INSERT INTO upload
(upload_id, sha256, file_name, file_type, file_size, file_ext, status)
VALUES
(:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext, :status)`
QSetStatus = `UPDATE upload SET status = :status WHERE upload_id = :upload_id `
QGetByHashID = `SELECT * FROM upload
WHERE
sha256 = ?
AND
upload_id = ?`
)
const (
UpNew = iota
UpStored
@ -61,129 +18,71 @@ const (
)
var UploadStatus = map[int]string{
UpNew: "new upload",
UpNew: "new",
UpStored: "stored",
UpWaitingPayment: "waiting payment",
UpWaitingPayment: "waiting-payment",
UpReady: "ready",
}
var (
ErrDoesNotExist = errors.New("does not exist")
ErrAlreadyExists = errors.New("already exists")
)
type SHA256 string
type Upload struct {
ID string `db:"upload_id"`
SHA256 string `db:"sha256"`
FileName string `db:"file_name"`
FileType string `db:"file_type"`
FileSize int64 `db:"file_size"`
FileExt string `db:"file_ext"`
Status int `db:"status"`
func (sha SHA256) MarshalText() ([]byte, error) {
return []byte(sha), nil
}
// Returns true if id exists in DB
func IdExists(id string) (exists bool, err error) {
qUploadExists := `
SELECT EXISTS (SELECT upload_id FROM upload where upload_id = ?)
`
err = DB.Sql.Get(&exists, qUploadExists, id)
// No result found is also no result
if err == sql.ErrNoRows {
err = nil
}
return
type Upload struct {
ID string
Status uint8
Files map[SHA256]*UpFile
}
// Get a file by upload id and hash
func GetByHashID(sha256 string, id string) (*Upload, error) {
var up Upload
err := DB.Sql.Get(&up, QGetByHashID, sha256, id)
if err == sql.ErrNoRows {
return nil, ErrDoesNotExist
}
if err != nil {
return nil, err
}
return &up, nil
type UpFile struct {
FileName string
FileType string
FileSize int64
FileExt string
}
func (u *Upload) TxSetState(tx *sqlx.Tx, status int) error {
_, err := tx.NamedExec(QSetStatus, u)
if err != nil {
return err
func NewUpload() *Upload {
files := make(map[SHA256]*UpFile)
return &Upload{
Files: files,
}
return nil
}
func (u *Upload) TxWrite(tx *sqlx.Tx) error {
// Returns true if id exists in DB
func IdExists(id string) (exists bool, err error) {
key := fmt.Sprintf("upload:%s", id)
_, err := tx.NamedExec(QNewUpload, u)
err = DB.Do(radix.Cmd(&exists, "EXISTS", key))
sqlErr, isSqlErr := err.(sqlite3.Error)
return
}
if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint {
return ErrAlreadyExists
}
// Get a file by upload id and hash
func GetByID(id string) (*Upload, error) {
var buf []byte
key := fmt.Sprintf("upload:%s", id)
err := DB.Do(radix.Cmd(&buf, "GET", key))
if err != nil {
return err
return nil, err
}
return nil
var up Upload
err = encoding.FromBytes(buf, &up)
return &up, err
}
func (u *Upload) Write() error {
_, err := DB.Sql.NamedExec(QNewUpload, u)
sqlErr, isSqlErr := err.(sqlite3.Error)
if isSqlErr && sqlErr.Code == sqlite3.ErrConstraint {
return ErrAlreadyExists
}
// Create a new upload hash upload:id --> [Files]
key := fmt.Sprintf("upload:%s", u.ID)
raw, err := encoding.ToBytes(u)
if err != nil {
return err
}
return nil
}
func init() {
_, err := DB.Sql.Exec(DBUploadSchema)
if err != nil {
log.Fatal(err)
}
_, err = DB.Sql.Exec(DBUploadView)
if err != nil {
log.Fatal(err)
}
_, err = DB.Sql.Exec(DBUploadStatusSchema)
if err != nil {
log.Fatal(err)
}
// Populate status types
query := `INSERT INTO upload_status (type, status) VALUES(?,?)`
for k, v := range UploadStatus {
_, err := DB.Sql.Exec(query, k, v)
if err != nil {
sqlErr, ok := err.(sqlite3.Error)
if ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintUnique {
log.Panic(err)
}
if !ok {
log.Panic(err)
}
}
}
return DB.Do(radix.FlatCmd(nil, "SET", key, raw))
}

@ -59,7 +59,7 @@ async function newUpload(files){
postMessage({msg: 'upload-id', id: id})
// Send the files
//upload.send()
upload.send()
})
}

Loading…
Cancel
Save