You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bit4sat/storage/upload_model.go

566 lines
12 KiB
Go

package storage
import (
"database/sql"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"log"
"math/bits"
"git.sp4ke.com/sp4ke/bit4sat/db"
"git.sp4ke.com/sp4ke/bit4sat/ln"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/mediocregopher/radix/v3"
)
var DB = db.DB
const (
//TODO: sync upload status from redis
// TODO: status is currently handled in cache not here
//
// stored: if file was stored
DBFileUploadsSchema = `
CREATE TABLE IF NOT EXISTS file_uploads (
file_upload_id serial PRIMARY KEY,
upload_id text NOT NULL references uploads(upload_id),
sha256 varchar(64) NOT NULL,
file_name varchar(255) NOT NULL,
file_type varchar(255) DEFAULT '',
file_size integer NOT NULL,
file_ext varchar(255) DEFAULT '',
stored boolean DEFAULT 'false',
UNIQUE (upload_id, sha256)
);
`
// Unique entry per upload
//
// store: all files for upload where stored
DBUploadsSchema = `
CREATE TABLE IF NOT EXISTS uploads (
upload_id text PRIMARY KEY,
download_id text DEFAULT '',
created timestamp,
ask_fee boolean NOT NULL DEFAULT 'false',
ask_currency varchar(3) DEFAULT '',
ask_amount real DEFAULT 0,
status integer NOT NULL DEFAULT 0,
settled boolean DEFAULT 'false',
admin_token text DEFAULT '',
invoice_rhash varchar(64) references invoices(rhash)
);
`
DBInvoicesSchema = `
CREATE TABLE IF NOT EXISTS invoices (
rhash varchar(64) PRIMARY KEY,
payload text DEFAULT ''
);
`
QSetFileStored = `UPDATE file_uploads
SET stored = :stored
WHERE upload_id = :upload_id AND sha256 = :sha256`
QGetByHashID = `SELECT upload_id,
sha256,
file_name,
file_type,
file_size,
file_ext,
stored
FROM file_uploads WHERE sha256 = $1 AND upload_id = $2`
)
type UpStatus uint32
func (st UpStatus) MarshalJSON() ([]byte, error) {
res := map[string]string{}
res["pay_status"] = st.PrintPayStatus()
res["store_status"] = st.PrintStoreStatus()
return json.Marshal(res)
}
func (st *UpStatus) UnmarshalBinary(b []byte) error {
//fmt.Printf("%#v\n", b)
// first 4 bits are reseved
// Single byte will be for 0 value
if len(b) == 1 {
*st = 0
return nil
}
view := binary.BigEndian.Uint32(b)
view = bits.Reverse32(view)
//fmt.Printf("%32b\n", view)
//log.Printf("%d\n", view)
*st = UpStatus(view)
return nil
}
// Get list of positions set
func (st UpStatus) GetFlagPositions() []int {
var i uint32
var setBits []int
for i = 31; i > 0; i-- {
if st&(1<<i) != 0 {
setBits = append(setBits, int(i))
}
}
return setBits
}
func (st UpStatus) PrintStoreStatus() string {
if st.Stored() {
return UploadStatus[UpStored]
} else if st.StoreFail() {
return UploadStatus[WaitStore]
} else {
return UploadStatus[WaitStore]
}
}
func (st UpStatus) PrintPayStatus() string {
if st.Paid() {
return UploadStatus[UpPaid]
}
if st.Expired() {
return UploadStatus[UpPayExpired]
}
return UploadStatus[WaitPay]
}
func (st UpStatus) IsNew() bool {
return (st & UpNew) != 0
}
func (st UpStatus) WaitPay() bool {
return (!st.Paid()) && (!st.Expired())
}
func (st UpStatus) Stored() bool {
return (st & UpStored) != 0
}
func (st UpStatus) StoreFail() bool {
return (st & UpStoreFail) != 0
}
func (st UpStatus) Paid() bool {
return (st & UpPaid) != 0
}
func (st UpStatus) Expired() bool {
return (st & UpPayExpired) != 0
}
func (st UpStatus) GetStoreStatus() UpStatus {
return st & StoreMask
}
func (st UpStatus) GetPayStatus() UpStatus {
return st & PayMask
}
// First 4 bits are reserved for easier parsing from redis
const (
UpPayExpired UpStatus = 1 << (32 - 1 - iota)
UpPaid
UpStored // All files for this upload where stored
UpStoreFail
// Only used for printing
WaitStore
WaitPay
UpNew = UpStatus(0)
PayMask = UpPaid | UpPayExpired
StoreMask = UpStored
)
var UploadStatus = map[UpStatus]string{
UpNew: "new upload",
// Payment
UpPayExpired: "expired",
UpPaid: "paid",
WaitPay: "waiting",
// Storage
WaitStore: "waiting storage",
UpStored: "stored",
}
var (
ErrDoesNotExist = errors.New("does not exist")
ErrAlreadyExists = errors.New("already exists")
)
type FileUpload struct {
ID string `db:"file_upload_id"`
UploadId string `db:"upload_id"`
SHA256 string `db:"sha256"`
FileName string `db:"file_name"`
FileType string `db:"file_type"`
FileSize int64 `db:"file_size"`
FileExt string `db:"file_ext"`
Stored bool `db:"stored"`
}
type Upload struct {
UploadId string `db:"upload_id"`
DownloadID string `db:"download_id"`
AskFee bool `db:"ask_fee"`
AskCurrency string `db:"ask_currency"`
AskAmount float64 `db:"ask_amount"`
InvoiceRhash string `db:"invoice_rhash"` // used as id
Settled bool `db:"settled"`
UploadStatus uint32 `db:"status"` // upload flag status
AdminToekn string `db:"admin_token"`
}
// TODO: sync from redis to db
//func SyncUploadStatusToDB(){
//}
func GetDownloadId(uploadId string) (string, error) {
key := fmt.Sprintf("download_for_upload_%s", uploadId)
// Try redis
var downloadId string
var exists bool
// Check if exists
err := DB.Redis.Do(radix.FlatCmd(&exists, "EXISTS", key))
if err != nil {
return "", err
}
//if !exists get from sql and store in redis
if !exists {
log.Printf("redis: dlId %s not found for, trying form db", uploadId)
query := `SELECT download_id from uploads WHERE upload_id = $1`
err = DB.Sql.Get(&downloadId, query, uploadId)
if err != nil {
return "", err
}
if len(downloadId) == 0 {
return "", fmt.Errorf("download id missing in %s", uploadId)
}
// Store it back on redis cache
err = DB.Redis.Do(radix.FlatCmd(nil, "SET", key, downloadId))
} else {
err = DB.Redis.Do(radix.FlatCmd(&downloadId, "GET", key))
}
return downloadId, err
}
func GetUploadIdForDlId(dlId string) (string, error) {
key := fmt.Sprintf("upload_for_download_%s", dlId)
// Try redis
var upId string
var exists bool
// check exists on redis
err := DB.Redis.Do(radix.FlatCmd(&exists, "EXISTS", key))
if err != nil {
return "", err
}
// if not get from sql and store on redis
if !exists {
log.Printf("redis: upId %s not found , trying form db", dlId)
query := `SELECT upload_id from uploads WHERE download_id = $1`
err = DB.Sql.Get(&upId, query, dlId)
if err != nil {
return "", err
}
if len(upId) == 0 {
return "", fmt.Errorf("upload id missing in %s", dlId)
}
// Store it back on redis cache
err = DB.Redis.Do(radix.FlatCmd(nil, "SET", key, upId))
} else {
err = DB.Redis.Do(radix.FlatCmd(&upId, "GET", key))
}
return upId, err
}
func GetUploadInvoice(uploadId string) (*ln.Invoice, error) {
var invoice ln.Invoice
uploadInvoiceKey := fmt.Sprintf("upload_%s_invoice", uploadId)
err := DB.Redis.Do(radix.FlatCmd(&invoice, "GET", uploadInvoiceKey))
if err != nil {
return nil, err
}
return &invoice, nil
}
func GetUploadIdInvoiceId(uploadId string) (string, error) {
invoice, err := GetUploadInvoice(uploadId)
return invoice.RHash, err
}
func SetUploadStatus(id string, status UpStatus) error {
//log.Printf("setting upload status for %s", id)
key := fmt.Sprintf("upload_status_%s", id)
if status == UpNew {
return DB.Redis.Do(radix.FlatCmd(nil, "SETBIT", key, 31, 0))
}
//log.Println("setting upload status for bit positions ", status.GetFlagPositions())
// get bit positions
for _, offset := range status.GetFlagPositions() {
//log.Printf("setting bit at position %d", offset)
err := DB.Redis.Do(radix.FlatCmd(nil,
"SETBIT", key, offset, 1))
if err != nil {
return err
}
}
// Store on sql db
query := `UPDATE uploads SET status = $1 WHERE upload_id = $2`
_, err := DB.Sql.Exec(query, uint32(status), id)
if err != nil {
return err
}
return nil
}
func GetUploadStatus(id string) (status UpStatus, err error) {
//log.Println("Getting upload status")
key := fmt.Sprintf("upload_status_%s", id)
err = DB.Redis.Do(radix.FlatCmd(&status, "GET", key))
return
}
func SetUploadAdminToken(uploadId, token string) error {
key := fmt.Sprintf("upload_admin_%s", uploadId)
err := DB.Redis.Do(radix.FlatCmd(nil, "SET", key, token))
if err != nil {
return err
}
query := `UPDATE uploads SET admin_token = $1 WHERE upload_id = $2`
_, err = DB.Sql.Exec(query, token, uploadId)
return err
}
func GetUploadAdminToken(uploadId string) (string, error) {
var token string
key := fmt.Sprintf("upload_admin_%s", uploadId)
err := DB.Redis.Do(radix.FlatCmd(&token, "GET", key))
return token, err
}
func SetUploadInvoice(uploadId string, invoice *ln.Invoice) error {
uploadInvoiceKey := fmt.Sprintf("upload_%s_invoice", uploadId)
invoiceJson, err := json.Marshal(invoice)
if err != nil {
return err
}
err = DB.Redis.Do(radix.FlatCmd(nil, "SET", uploadInvoiceKey, invoiceJson))
if err != nil {
return err
}
// Set inverse relation
invoiceUploadKey := fmt.Sprintf("invoice_%s_upload", invoice.RHash)
err = DB.Redis.Do(radix.FlatCmd(nil, "SET", invoiceUploadKey, uploadId))
if err != nil {
return err
}
// SQL insert
tx, err := DB.Sql.Beginx()
if err != nil {
return err
}
// Add invoice to invoices table
query := `INSERT INTO invoices(rhash, payload)
VALUES($1,$2)
ON CONFLICT DO NOTHING
`
_, err = tx.Exec(query, invoice.RHash, invoiceJson)
if err != nil {
return err
}
// Set invoice to upload
query = `UPDATE uploads SET invoice_rhash = $1 WHERE upload_id = $2`
_, err = tx.Exec(query, invoice.RHash, uploadId)
if err != nil {
return err
}
return tx.Commit()
}
func GetUploadIdForInvoice(invoiceId string) (string, error) {
key := fmt.Sprintf("invoice_%s_upload", invoiceId)
err := DB.Redis.Do(radix.FlatCmd(&invoiceId, "GET", key))
return invoiceId, err
}
// Returns true if id exists in DB
func IdExists(id string) (exists bool, err error) {
key := fmt.Sprintf("upload_status_%s", id)
err = DB.Redis.Do(radix.Cmd(&exists, "EXISTS", key))
return
}
// Get a file by upload id and hash
func GetByHashID(sha256 string, id string) (*FileUpload, error) {
var up FileUpload
err := DB.Sql.Get(&up, QGetByHashID, sha256, id)
if err == sql.ErrNoRows {
return nil, ErrDoesNotExist
}
if err != nil {
return nil, err
}
return &up, nil
}
func (u *FileUpload) TxSetFileStored(tx *sqlx.Tx) error {
u.Stored = true
_, err := tx.NamedExec(QSetFileStored, u)
if err != nil {
return err
}
return nil
}
func (u *FileUpload) TxWrite(tx *sqlx.Tx) error {
query := `INSERT INTO file_uploads
(upload_id, sha256, file_name, file_type, file_size, file_ext)
VALUES
(:upload_id, :sha256, :file_name, :file_type, :file_size, :file_ext)
`
_, err := tx.NamedExec(query, u)
if pqError, ok := err.(*pq.Error); ok {
// unique constraint
if pqError.Code == "23505" {
return ErrAlreadyExists
}
}
if err != nil {
return err
}
return nil
}
func (u *Upload) TxWrite(tx *sqlx.Tx) error {
query := `INSERT INTO uploads
(upload_id, download_id, ask_fee, ask_currency, settled)
VALUES
(:upload_id, :download_id, :ask_fee, :ask_currency, :settled)`
_, err := tx.NamedExec(query, u)
if pqError, ok := err.(*pq.Error); ok {
// unique constraint
if pqError.Code == "23505" {
return ErrAlreadyExists
}
}
if err != nil {
return err
}
return nil
}
func (u *Upload) Write() error {
query := `INSERT INTO uploads
(upload_id, download_id, ask_fee, ask_currency, settled)
VALUES
(:upload_id, :download_id, :ask_fee, :ask_currency, :settled)`
_, err := DB.Sql.NamedExec(query, u)
if pqError, ok := err.(*pq.Error); ok {
// unique constraint
if pqError.Code == "23505" {
return ErrAlreadyExists
}
}
if err != nil {
return err
}
return nil
}
func init() {
_, err := DB.Sql.Exec(DBInvoicesSchema)
if err != nil {
log.Fatal(err)
}
_, err = DB.Sql.Exec(DBUploadsSchema)
if err != nil {
log.Fatal(err)
}
_, err = DB.Sql.Exec(DBFileUploadsSchema)
if err != nil {
log.Fatal(err)
}
}