mirror of
https://github.com/lightninglabs/loop
synced 2024-11-16 00:12:52 +00:00
Merge pull request #781 from starius/sweepbatcher-exectx
loopdb: parameterize ExecTx with subset of methods used by a package
This commit is contained in:
commit
a4e01027ea
@ -112,7 +112,7 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
|
||||
|
||||
dbFixture := loopdb.NewTestDB(t)
|
||||
|
||||
store := NewSQLStore(dbFixture)
|
||||
store := NewSQLStore(loopdb.NewTypedStore[Querier](dbFixture))
|
||||
|
||||
mockReservationClient := new(mockReservationClient)
|
||||
|
||||
|
@ -16,9 +16,9 @@ import (
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
)
|
||||
|
||||
// BaseDB is the interface that contains all the queries generated
|
||||
// Querier is the interface that contains all the queries generated
|
||||
// by sqlc for the reservation table.
|
||||
type BaseDB interface {
|
||||
type Querier interface {
|
||||
// CreateReservation stores the reservation in the database.
|
||||
CreateReservation(ctx context.Context,
|
||||
arg sqlc.CreateReservationParams) error
|
||||
@ -35,14 +35,24 @@ type BaseDB interface {
|
||||
// made.
|
||||
GetReservations(ctx context.Context) ([]sqlc.Reservation, error)
|
||||
|
||||
// UpdateReservation inserts a new reservation update.
|
||||
// InsertReservationUpdate inserts a new reservation update.
|
||||
InsertReservationUpdate(ctx context.Context,
|
||||
arg sqlc.InsertReservationUpdateParams) error
|
||||
|
||||
// UpdateReservation updates a reservation.
|
||||
UpdateReservation(ctx context.Context,
|
||||
arg sqlc.UpdateReservationParams) error
|
||||
}
|
||||
|
||||
// BaseDB is the interface that contains all the queries generated
|
||||
// by sqlc for the reservation table and transaction functionality.
|
||||
type BaseDB interface {
|
||||
Querier
|
||||
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// SQLStore manages the reservations in the database.
|
||||
@ -82,7 +92,7 @@ func (r *SQLStore) CreateReservation(ctx context.Context,
|
||||
}
|
||||
|
||||
return r.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.CreateReservation(ctx, args)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -122,7 +132,7 @@ func (r *SQLStore) UpdateReservation(ctx context.Context,
|
||||
}
|
||||
|
||||
return r.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.UpdateReservation(ctx, updateArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -138,7 +148,7 @@ func (r *SQLStore) GetReservation(ctx context.Context,
|
||||
|
||||
var reservation *Reservation
|
||||
err := r.baseDb.ExecTx(ctx, loopdb.NewSqlReadOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
var err error
|
||||
reservationRow, err := q.GetReservation(
|
||||
ctx, reservationId[:],
|
||||
@ -182,7 +192,7 @@ func (r *SQLStore) ListReservations(ctx context.Context) ([]*Reservation,
|
||||
var result []*Reservation
|
||||
|
||||
err := r.baseDb.ExecTx(ctx, loopdb.NewSqlReadOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
var err error
|
||||
|
||||
reservations, err := q.GetReservations(ctx)
|
||||
|
@ -19,7 +19,7 @@ func TestSqlStore(t *testing.T) {
|
||||
testDb := loopdb.NewTestDB(t)
|
||||
defer testDb.Close()
|
||||
|
||||
store := NewSQLStore(testDb)
|
||||
store := NewSQLStore(loopdb.NewTypedStore[Querier](testDb))
|
||||
|
||||
// Create a reservation and store it.
|
||||
reservation := &Reservation{
|
||||
|
@ -21,9 +21,9 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
)
|
||||
|
||||
// InstantOutBaseDB is the interface that contains all the queries generated
|
||||
// Querier is the interface that contains all the queries generated
|
||||
// by sqlc for the instantout table.
|
||||
type InstantOutBaseDB interface {
|
||||
type Querier interface {
|
||||
// InsertSwap inserts a new base swap.
|
||||
InsertSwap(ctx context.Context, arg sqlc.InsertSwapParams) error
|
||||
|
||||
@ -53,11 +53,17 @@ type InstantOutBaseDB interface {
|
||||
// GetInstantOutSwaps retrieves all instant out swaps.
|
||||
GetInstantOutSwaps(ctx context.Context) ([]sqlc.GetInstantOutSwapsRow,
|
||||
error)
|
||||
}
|
||||
|
||||
// InstantOutBaseDB is the interface that contains all the queries generated
|
||||
// by sqlc for the instantout table and transaction functionality.
|
||||
type InstantOutBaseDB interface {
|
||||
Querier
|
||||
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// ReservationStore is the interface that is required to load the reservations
|
||||
@ -133,7 +139,7 @@ func (s *SQLStore) CreateInstantLoopOut(ctx context.Context,
|
||||
}
|
||||
|
||||
return s.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.InsertSwap(ctx, swapArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -205,7 +211,7 @@ func (s *SQLStore) UpdateInstantLoopOut(ctx context.Context,
|
||||
}
|
||||
|
||||
return s.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.UpdateInstantOut(ctx, updateParams)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -417,7 +417,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)
|
||||
sweeperDb := sweepbatcher.NewSQLStore(
|
||||
loopdb.NewTypedStore[sweepbatcher.Querier](baseDb),
|
||||
chainParams,
|
||||
)
|
||||
|
||||
// Create an instance of the loop client library.
|
||||
swapClient, clientCleanup, err := getClient(
|
||||
@ -501,7 +504,9 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
)
|
||||
// Create the reservation and instantout managers.
|
||||
if d.cfg.EnableExperimental {
|
||||
reservationStore := reservation.NewSQLStore(baseDb)
|
||||
reservationStore := reservation.NewSQLStore(
|
||||
loopdb.NewTypedStore[reservation.Querier](baseDb),
|
||||
)
|
||||
reservationConfig := &reservation.Config{
|
||||
Store: reservationStore,
|
||||
Wallet: d.lnd.WalletKit,
|
||||
@ -516,7 +521,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
|
||||
// Create the instantout services.
|
||||
instantOutStore := instantout.NewSQLStore(
|
||||
baseDb, clock.NewDefaultClock(), reservationStore,
|
||||
loopdb.NewTypedStore[instantout.Querier](baseDb),
|
||||
clock.NewDefaultClock(), reservationStore,
|
||||
d.lnd.ChainParams,
|
||||
)
|
||||
instantOutConfig := &instantout.Config{
|
||||
|
@ -32,7 +32,10 @@ func view(config *Config, lisCfg *ListenerCfg) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)
|
||||
sweeperDb := sweepbatcher.NewSQLStore(
|
||||
loopdb.NewTypedStore[sweepbatcher.Querier](baseDb),
|
||||
chainParams,
|
||||
)
|
||||
|
||||
swapClient, cleanup, err := getClient(
|
||||
config, swapDb, sweeperDb, &lnd.LndServices,
|
||||
|
@ -18,13 +18,13 @@ import (
|
||||
)
|
||||
|
||||
// FetchLoopOutSwaps returns all swaps currently in the store.
|
||||
func (s *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut,
|
||||
func (db *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut,
|
||||
error) {
|
||||
|
||||
var loopOuts []*LoopOut
|
||||
|
||||
err := s.ExecTx(ctx, NewSqlReadOpts(), func(*sqlc.Queries) error {
|
||||
swaps, err := s.Queries.GetLoopOutSwaps(ctx)
|
||||
err := db.ExecTx(ctx, NewSqlReadOpts(), func(tx *sqlc.Queries) error {
|
||||
swaps, err := tx.GetLoopOutSwaps(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -32,7 +32,7 @@ func (s *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut,
|
||||
loopOuts = make([]*LoopOut, len(swaps))
|
||||
|
||||
for i, swap := range swaps {
|
||||
updates, err := s.Queries.GetSwapUpdates(
|
||||
updates, err := tx.GetSwapUpdates(
|
||||
ctx, swap.SwapHash,
|
||||
)
|
||||
if err != nil {
|
||||
@ -40,7 +40,7 @@ func (s *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut,
|
||||
}
|
||||
|
||||
loopOut, err := ConvertLoopOutRow(
|
||||
s.network, sqlc.GetLoopOutSwapRow(swap),
|
||||
db.network, sqlc.GetLoopOutSwapRow(swap),
|
||||
updates,
|
||||
)
|
||||
if err != nil {
|
||||
@ -60,24 +60,24 @@ func (s *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut,
|
||||
}
|
||||
|
||||
// FetchLoopOutSwap returns the loop out swap with the given hash.
|
||||
func (s *BaseDB) FetchLoopOutSwap(ctx context.Context,
|
||||
func (db *BaseDB) FetchLoopOutSwap(ctx context.Context,
|
||||
hash lntypes.Hash) (*LoopOut, error) {
|
||||
|
||||
var loopOut *LoopOut
|
||||
|
||||
err := s.ExecTx(ctx, NewSqlReadOpts(), func(*sqlc.Queries) error {
|
||||
swap, err := s.Queries.GetLoopOutSwap(ctx, hash[:])
|
||||
err := db.ExecTx(ctx, NewSqlReadOpts(), func(tx *sqlc.Queries) error {
|
||||
swap, err := tx.GetLoopOutSwap(ctx, hash[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updates, err := s.Queries.GetSwapUpdates(ctx, swap.SwapHash)
|
||||
updates, err := tx.GetSwapUpdates(ctx, swap.SwapHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
loopOut, err = ConvertLoopOutRow(
|
||||
s.network, swap, updates,
|
||||
db.network, swap, updates,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -93,11 +93,11 @@ func (s *BaseDB) FetchLoopOutSwap(ctx context.Context,
|
||||
}
|
||||
|
||||
// CreateLoopOut adds an initiated swap to the store.
|
||||
func (s *BaseDB) CreateLoopOut(ctx context.Context, hash lntypes.Hash,
|
||||
func (db *BaseDB) CreateLoopOut(ctx context.Context, hash lntypes.Hash,
|
||||
swap *LoopOutContract) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
insertArgs := loopToInsertArgs(
|
||||
hash, &swap.SwapContract,
|
||||
)
|
||||
@ -131,11 +131,11 @@ func (s *BaseDB) CreateLoopOut(ctx context.Context, hash lntypes.Hash,
|
||||
}
|
||||
|
||||
// BatchCreateLoopOut adds multiple initiated swaps to the store.
|
||||
func (s *BaseDB) BatchCreateLoopOut(ctx context.Context,
|
||||
func (db *BaseDB) BatchCreateLoopOut(ctx context.Context,
|
||||
swaps map[lntypes.Hash]*LoopOutContract) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
for swapHash, swap := range swaps {
|
||||
swap := swap
|
||||
|
||||
@ -174,20 +174,20 @@ func (s *BaseDB) BatchCreateLoopOut(ctx context.Context,
|
||||
// UpdateLoopOut stores a new event for a target loop out swap. This
|
||||
// appends to the event log for a particular swap as it goes through
|
||||
// the various stages in its lifetime.
|
||||
func (s *BaseDB) UpdateLoopOut(ctx context.Context, hash lntypes.Hash,
|
||||
func (db *BaseDB) UpdateLoopOut(ctx context.Context, hash lntypes.Hash,
|
||||
time time.Time, state SwapStateData) error {
|
||||
|
||||
return s.updateLoop(ctx, hash, time, state)
|
||||
return db.updateLoop(ctx, hash, time, state)
|
||||
}
|
||||
|
||||
// FetchLoopInSwaps returns all swaps currently in the store.
|
||||
func (s *BaseDB) FetchLoopInSwaps(ctx context.Context) (
|
||||
func (db *BaseDB) FetchLoopInSwaps(ctx context.Context) (
|
||||
[]*LoopIn, error) {
|
||||
|
||||
var loopIns []*LoopIn
|
||||
|
||||
err := s.ExecTx(ctx, NewSqlReadOpts(), func(*sqlc.Queries) error {
|
||||
swaps, err := s.Queries.GetLoopInSwaps(ctx)
|
||||
err := db.ExecTx(ctx, NewSqlReadOpts(), func(tx *sqlc.Queries) error {
|
||||
swaps, err := tx.GetLoopInSwaps(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -195,12 +195,12 @@ func (s *BaseDB) FetchLoopInSwaps(ctx context.Context) (
|
||||
loopIns = make([]*LoopIn, len(swaps))
|
||||
|
||||
for i, swap := range swaps {
|
||||
updates, err := s.Queries.GetSwapUpdates(ctx, swap.SwapHash)
|
||||
updates, err := tx.GetSwapUpdates(ctx, swap.SwapHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
loopIn, err := s.convertLoopInRow(
|
||||
loopIn, err := db.convertLoopInRow(
|
||||
swap, updates,
|
||||
)
|
||||
if err != nil {
|
||||
@ -220,11 +220,11 @@ func (s *BaseDB) FetchLoopInSwaps(ctx context.Context) (
|
||||
}
|
||||
|
||||
// CreateLoopIn adds an initiated swap to the store.
|
||||
func (s *BaseDB) CreateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
func (db *BaseDB) CreateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
swap *LoopInContract) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
insertArgs := loopToInsertArgs(
|
||||
hash, &swap.SwapContract,
|
||||
)
|
||||
@ -257,11 +257,11 @@ func (s *BaseDB) CreateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
}
|
||||
|
||||
// BatchCreateLoopIn adds multiple initiated swaps to the store.
|
||||
func (s *BaseDB) BatchCreateLoopIn(ctx context.Context,
|
||||
func (db *BaseDB) BatchCreateLoopIn(ctx context.Context,
|
||||
swaps map[lntypes.Hash]*LoopInContract) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
for swapHash, swap := range swaps {
|
||||
swap := swap
|
||||
|
||||
@ -301,10 +301,10 @@ func (s *BaseDB) BatchCreateLoopIn(ctx context.Context,
|
||||
// UpdateLoopIn stores a new event for a target loop in swap. This
|
||||
// appends to the event log for a particular swap as it goes through
|
||||
// the various stages in its lifetime.
|
||||
func (s *BaseDB) UpdateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
func (db *BaseDB) UpdateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
time time.Time, state SwapStateData) error {
|
||||
|
||||
return s.updateLoop(ctx, hash, time, state)
|
||||
return db.updateLoop(ctx, hash, time, state)
|
||||
}
|
||||
|
||||
// PutLiquidityParams writes the serialized `manager.Parameters` bytes
|
||||
@ -312,10 +312,10 @@ func (s *BaseDB) UpdateLoopIn(ctx context.Context, hash lntypes.Hash,
|
||||
//
|
||||
// NOTE: it's the caller's responsibility to encode the param. Atm,
|
||||
// it's encoding using the proto package's `Marshal` method.
|
||||
func (s *BaseDB) PutLiquidityParams(ctx context.Context,
|
||||
func (db *BaseDB) PutLiquidityParams(ctx context.Context,
|
||||
params []byte) error {
|
||||
|
||||
err := s.Queries.UpsertLiquidityParams(ctx, params)
|
||||
err := db.Queries.UpsertLiquidityParams(ctx, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -328,11 +328,11 @@ func (s *BaseDB) PutLiquidityParams(ctx context.Context,
|
||||
//
|
||||
// NOTE: it's the caller's responsibility to decode the param. Atm,
|
||||
// it's decoding using the proto package's `Unmarshal` method.
|
||||
func (s *BaseDB) FetchLiquidityParams(ctx context.Context) ([]byte,
|
||||
func (db *BaseDB) FetchLiquidityParams(ctx context.Context) ([]byte,
|
||||
error) {
|
||||
|
||||
var params []byte
|
||||
params, err := s.Queries.FetchLiquidityParams(ctx)
|
||||
params, err := db.Queries.FetchLiquidityParams(ctx)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return params, nil
|
||||
} else if err != nil {
|
||||
@ -348,11 +348,11 @@ var _ SwapStore = (*BaseDB)(nil)
|
||||
|
||||
// updateLoop updates the swap with the given hash by inserting a new update
|
||||
// in the swap_updates table.
|
||||
func (s *BaseDB) updateLoop(ctx context.Context, hash lntypes.Hash,
|
||||
func (db *BaseDB) updateLoop(ctx context.Context, hash lntypes.Hash,
|
||||
time time.Time, state SwapStateData) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
updateParams := sqlc.InsertSwapUpdateParams{
|
||||
SwapHash: hash[:],
|
||||
UpdateTimestamp: time.UTC(),
|
||||
@ -376,11 +376,11 @@ func (s *BaseDB) updateLoop(ctx context.Context, hash lntypes.Hash,
|
||||
}
|
||||
|
||||
// BatchInsertUpdate inserts multiple swap updates to the store.
|
||||
func (s *BaseDB) BatchInsertUpdate(ctx context.Context,
|
||||
func (db *BaseDB) BatchInsertUpdate(ctx context.Context,
|
||||
updateData map[lntypes.Hash][]BatchInsertUpdateData) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return s.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
for swapHash, updates := range updateData {
|
||||
for _, update := range updates {
|
||||
updateParams := sqlc.InsertSwapUpdateParams{
|
||||
@ -409,11 +409,11 @@ func (s *BaseDB) BatchInsertUpdate(ctx context.Context,
|
||||
|
||||
// BatchUpdateLoopOutSwapCosts updates the swap costs for a batch of loop out
|
||||
// swaps.
|
||||
func (b *BaseDB) BatchUpdateLoopOutSwapCosts(ctx context.Context,
|
||||
func (db *BaseDB) BatchUpdateLoopOutSwapCosts(ctx context.Context,
|
||||
costs map[lntypes.Hash]SwapCost) error {
|
||||
|
||||
writeOpts := NewSqlWriteOpts()
|
||||
return b.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
return db.ExecTx(ctx, writeOpts, func(tx *sqlc.Queries) error {
|
||||
for swapHash, cost := range costs {
|
||||
lastUpdateID, err := tx.GetLastUpdateID(
|
||||
ctx, swapHash[:],
|
||||
@ -440,10 +440,10 @@ func (b *BaseDB) BatchUpdateLoopOutSwapCosts(ctx context.Context,
|
||||
}
|
||||
|
||||
// HasMigration returns true if the migration with the given ID has been done.
|
||||
func (b *BaseDB) HasMigration(ctx context.Context, migrationID string) (
|
||||
func (db *BaseDB) HasMigration(ctx context.Context, migrationID string) (
|
||||
bool, error) {
|
||||
|
||||
migration, err := b.GetMigration(ctx, migrationID)
|
||||
migration, err := db.GetMigration(ctx, migrationID)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return false, err
|
||||
}
|
||||
@ -452,8 +452,8 @@ func (b *BaseDB) HasMigration(ctx context.Context, migrationID string) (
|
||||
}
|
||||
|
||||
// SetMigration marks the migration with the given ID as done.
|
||||
func (b *BaseDB) SetMigration(ctx context.Context, migrationID string) error {
|
||||
return b.InsertMigration(ctx, sqlc.InsertMigrationParams{
|
||||
func (db *BaseDB) SetMigration(ctx context.Context, migrationID string) error {
|
||||
return db.InsertMigration(ctx, sqlc.InsertMigrationParams{
|
||||
MigrationID: migrationID,
|
||||
MigrationTs: sql.NullTime{
|
||||
Time: time.Now().UTC(),
|
||||
@ -627,7 +627,7 @@ func ConvertLoopOutRow(network *chaincfg.Params, row sqlc.GetLoopOutSwapRow,
|
||||
|
||||
// convertLoopInRow converts a database row containing a loop in swap to a
|
||||
// LoopIn struct.
|
||||
func (s *BaseDB) convertLoopInRow(row sqlc.GetLoopInSwapsRow,
|
||||
func (db *BaseDB) convertLoopInRow(row sqlc.GetLoopInSwapsRow,
|
||||
updates []sqlc.SwapUpdate) (*LoopIn, error) {
|
||||
|
||||
htlcKeys, err := fetchHtlcKeys(
|
||||
|
@ -194,7 +194,7 @@ func (db *BaseDB) BeginTx(ctx context.Context,
|
||||
}
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
|
||||
// transaction. The db transaction is embedded in a `*postgres.Queries` that
|
||||
// transaction. The db transaction is embedded in a `*sqlc.Queries` that
|
||||
// txBody needs to use when executing each one of the queries that need to be
|
||||
// applied atomically.
|
||||
func (db *BaseDB) ExecTx(ctx context.Context, txOptions TxOptions,
|
||||
@ -224,9 +224,9 @@ func (db *BaseDB) ExecTx(ctx context.Context, txOptions TxOptions,
|
||||
|
||||
// FixFaultyTimestamps fixes faulty timestamps in the database, caused
|
||||
// by using milliseconds instead of seconds as the publication deadline.
|
||||
func (b *BaseDB) FixFaultyTimestamps(ctx context.Context) error {
|
||||
func (db *BaseDB) FixFaultyTimestamps(ctx context.Context) error {
|
||||
// Manually fetch all the loop out swaps.
|
||||
rows, err := b.DB.QueryContext(
|
||||
rows, err := db.DB.QueryContext(
|
||||
ctx, "SELECT swap_hash, swap_invoice, publication_deadline FROM loopout_swaps",
|
||||
)
|
||||
if err != nil {
|
||||
@ -262,7 +262,7 @@ func (b *BaseDB) FixFaultyTimestamps(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err := b.BeginTx(ctx, &SqliteTxOptions{})
|
||||
tx, err := db.BeginTx(ctx, &SqliteTxOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -283,7 +283,7 @@ func (b *BaseDB) FixFaultyTimestamps(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
payReq, err := zpay32.Decode(swap.SwapInvoice, b.network)
|
||||
payReq, err := zpay32.Decode(swap.SwapInvoice, db.network)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
49
loopdb/typed_store.go
Normal file
49
loopdb/typed_store.go
Normal file
@ -0,0 +1,49 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/lightninglabs/loop/loopdb/sqlc"
|
||||
)
|
||||
|
||||
// BatchedQuerier implements all DB queries and ExecTx on *sqlc.Queries.
|
||||
// It is implemented by BaseDB, SqliteSwapStore, etc.
|
||||
type BatchedQuerier interface {
|
||||
sqlc.Querier
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of
|
||||
// a db transaction. The db transaction is embedded in a `*sqlc.Queries`
|
||||
// that txBody needs to use when executing each one of the queries that
|
||||
// need to be applied atomically.
|
||||
ExecTx(ctx context.Context, txOptions TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
}
|
||||
|
||||
// TypedStore is similar to BaseDB but provides parameterized ExecTx.
|
||||
// It is used in other packages expecting ExecTx operating on subset of methods.
|
||||
type TypedStore[Q any] struct {
|
||||
BatchedQuerier
|
||||
}
|
||||
|
||||
// NewTypedStore wraps a db, replacing generic ExecTx method with the typed one.
|
||||
func NewTypedStore[Q any](db BatchedQuerier) *TypedStore[Q] {
|
||||
// Make sure *sqlc.Queries can be casted to Q.
|
||||
_ = any((*sqlc.Queries)(nil)).(Q)
|
||||
|
||||
return &TypedStore[Q]{
|
||||
BatchedQuerier: db,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecTx will execute the passed txBody, operating upon generic parameter Q
|
||||
// (usually a storage interface) in a single transaction. The set of TxOptions
|
||||
// are passed in to allow the caller to specify if a transaction is read-only.
|
||||
func (s *TypedStore[Q]) ExecTx(ctx context.Context,
|
||||
txOptions TxOptions, txBody func(Q) error) error {
|
||||
|
||||
return s.BatchedQuerier.ExecTx(ctx, txOptions,
|
||||
func(q *sqlc.Queries) error {
|
||||
return txBody(any(q).(Q))
|
||||
},
|
||||
)
|
||||
}
|
@ -15,7 +15,9 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
)
|
||||
|
||||
type BaseDB interface {
|
||||
// Querier is the interface that contains all the queries generated
|
||||
// by sqlc for sweep batcher.
|
||||
type Querier interface {
|
||||
// ConfirmBatch confirms a batch by setting the state to confirmed.
|
||||
ConfirmBatch(ctx context.Context, id int32) error
|
||||
|
||||
@ -52,11 +54,17 @@ type BaseDB interface {
|
||||
// UpsertSweep inserts a sweep into the database, or updates an existing
|
||||
// sweep if it already exists.
|
||||
UpsertSweep(ctx context.Context, arg sqlc.UpsertSweepParams) error
|
||||
}
|
||||
|
||||
// BaseDB is the interface that contains all the queries generated
|
||||
// by sqlc for sweep batcher and transaction functionality.
|
||||
type BaseDB interface {
|
||||
Querier
|
||||
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// SQLStore manages the reservations in the database.
|
||||
@ -112,7 +120,7 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
|
||||
// for batches that have no sweeps and so we'd not be able to resume.
|
||||
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
|
||||
readOpts := loopdb.NewSqlWriteOpts()
|
||||
return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
|
||||
return s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error {
|
||||
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -143,7 +151,7 @@ func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) (
|
||||
readOpts := loopdb.NewSqlReadOpts()
|
||||
var sweeps []*dbSweep
|
||||
|
||||
err := s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
|
||||
err := s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error {
|
||||
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user