mirror of
https://github.com/lightninglabs/loop
synced 2024-11-17 21:25:56 +00:00
multi: typed callback in ExecTx
Provide a wrapped store type, exposing ExecTx method with a subset interface in the callback argument. BaseDB interfaces in instantout, reservation and sweepbatcher use ExecTx with their subset Querier instead of whole sqlc.Querier (*sqlc.Queries). This is needed to make the packages more reusable, so they don't depend on methods of *sqlc.Queries they don't use.
This commit is contained in:
parent
d341448568
commit
1947b90842
@ -112,7 +112,7 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
|
||||
|
||||
dbFixture := loopdb.NewTestDB(t)
|
||||
|
||||
store := NewSQLStore(dbFixture)
|
||||
store := NewSQLStore(loopdb.NewTypedStore[Querier](dbFixture))
|
||||
|
||||
mockReservationClient := new(mockReservationClient)
|
||||
|
||||
|
@ -52,7 +52,7 @@ type BaseDB interface {
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// SQLStore manages the reservations in the database.
|
||||
@ -92,7 +92,7 @@ func (r *SQLStore) CreateReservation(ctx context.Context,
|
||||
}
|
||||
|
||||
return r.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.CreateReservation(ctx, args)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -132,7 +132,7 @@ func (r *SQLStore) UpdateReservation(ctx context.Context,
|
||||
}
|
||||
|
||||
return r.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.UpdateReservation(ctx, updateArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -148,7 +148,7 @@ func (r *SQLStore) GetReservation(ctx context.Context,
|
||||
|
||||
var reservation *Reservation
|
||||
err := r.baseDb.ExecTx(ctx, loopdb.NewSqlReadOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
var err error
|
||||
reservationRow, err := q.GetReservation(
|
||||
ctx, reservationId[:],
|
||||
@ -192,7 +192,7 @@ func (r *SQLStore) ListReservations(ctx context.Context) ([]*Reservation,
|
||||
var result []*Reservation
|
||||
|
||||
err := r.baseDb.ExecTx(ctx, loopdb.NewSqlReadOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
var err error
|
||||
|
||||
reservations, err := q.GetReservations(ctx)
|
||||
|
@ -19,7 +19,7 @@ func TestSqlStore(t *testing.T) {
|
||||
testDb := loopdb.NewTestDB(t)
|
||||
defer testDb.Close()
|
||||
|
||||
store := NewSQLStore(testDb)
|
||||
store := NewSQLStore(loopdb.NewTypedStore[Querier](testDb))
|
||||
|
||||
// Create a reservation and store it.
|
||||
reservation := &Reservation{
|
||||
|
@ -63,7 +63,7 @@ type InstantOutBaseDB interface {
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// ReservationStore is the interface that is required to load the reservations
|
||||
@ -139,7 +139,7 @@ func (s *SQLStore) CreateInstantLoopOut(ctx context.Context,
|
||||
}
|
||||
|
||||
return s.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.InsertSwap(ctx, swapArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -211,7 +211,7 @@ func (s *SQLStore) UpdateInstantLoopOut(ctx context.Context,
|
||||
}
|
||||
|
||||
return s.baseDb.ExecTx(ctx, loopdb.NewSqlWriteOpts(),
|
||||
func(q *sqlc.Queries) error {
|
||||
func(q Querier) error {
|
||||
err := q.UpdateInstantOut(ctx, updateParams)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -417,7 +417,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)
|
||||
sweeperDb := sweepbatcher.NewSQLStore(
|
||||
loopdb.NewTypedStore[sweepbatcher.Querier](baseDb),
|
||||
chainParams,
|
||||
)
|
||||
|
||||
// Create an instance of the loop client library.
|
||||
swapClient, clientCleanup, err := getClient(
|
||||
@ -501,7 +504,9 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
)
|
||||
// Create the reservation and instantout managers.
|
||||
if d.cfg.EnableExperimental {
|
||||
reservationStore := reservation.NewSQLStore(baseDb)
|
||||
reservationStore := reservation.NewSQLStore(
|
||||
loopdb.NewTypedStore[reservation.Querier](baseDb),
|
||||
)
|
||||
reservationConfig := &reservation.Config{
|
||||
Store: reservationStore,
|
||||
Wallet: d.lnd.WalletKit,
|
||||
@ -516,7 +521,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
|
||||
|
||||
// Create the instantout services.
|
||||
instantOutStore := instantout.NewSQLStore(
|
||||
baseDb, clock.NewDefaultClock(), reservationStore,
|
||||
loopdb.NewTypedStore[instantout.Querier](baseDb),
|
||||
clock.NewDefaultClock(), reservationStore,
|
||||
d.lnd.ChainParams,
|
||||
)
|
||||
instantOutConfig := &instantout.Config{
|
||||
|
@ -32,7 +32,10 @@ func view(config *Config, lisCfg *ListenerCfg) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sweeperDb := sweepbatcher.NewSQLStore(baseDb, chainParams)
|
||||
sweeperDb := sweepbatcher.NewSQLStore(
|
||||
loopdb.NewTypedStore[sweepbatcher.Querier](baseDb),
|
||||
chainParams,
|
||||
)
|
||||
|
||||
swapClient, cleanup, err := getClient(
|
||||
config, swapDb, sweeperDb, &lnd.LndServices,
|
||||
|
49
loopdb/typed_store.go
Normal file
49
loopdb/typed_store.go
Normal file
@ -0,0 +1,49 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/lightninglabs/loop/loopdb/sqlc"
|
||||
)
|
||||
|
||||
// BatchedQuerier implements all DB queries and ExecTx on *sqlc.Queries.
|
||||
// It is implemented by BaseDB, SqliteSwapStore, etc.
|
||||
type BatchedQuerier interface {
|
||||
sqlc.Querier
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of
|
||||
// a db transaction. The db transaction is embedded in a `*sqlc.Queries`
|
||||
// that txBody needs to use when executing each one of the queries that
|
||||
// need to be applied atomically.
|
||||
ExecTx(ctx context.Context, txOptions TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
}
|
||||
|
||||
// TypedStore is similar to BaseDB but provides parameterized ExecTx.
|
||||
// It is used in other packages expecting ExecTx operating on subset of methods.
|
||||
type TypedStore[Q any] struct {
|
||||
BatchedQuerier
|
||||
}
|
||||
|
||||
// NewTypedStore wraps a db, replacing generic ExecTx method with the typed one.
|
||||
func NewTypedStore[Q any](db BatchedQuerier) *TypedStore[Q] {
|
||||
// Make sure *sqlc.Queries can be casted to Q.
|
||||
_ = any((*sqlc.Queries)(nil)).(Q)
|
||||
|
||||
return &TypedStore[Q]{
|
||||
BatchedQuerier: db,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecTx will execute the passed txBody, operating upon generic parameter Q
|
||||
// (usually a storage interface) in a single transaction. The set of TxOptions
|
||||
// are passed in to allow the caller to specify if a transaction is read-only.
|
||||
func (s *TypedStore[Q]) ExecTx(ctx context.Context,
|
||||
txOptions TxOptions, txBody func(Q) error) error {
|
||||
|
||||
return s.BatchedQuerier.ExecTx(ctx, txOptions,
|
||||
func(q *sqlc.Queries) error {
|
||||
return txBody(any(q).(Q))
|
||||
},
|
||||
)
|
||||
}
|
@ -64,7 +64,7 @@ type BaseDB interface {
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
txBody func(Querier) error) error
|
||||
}
|
||||
|
||||
// SQLStore manages the reservations in the database.
|
||||
@ -120,7 +120,7 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
|
||||
// for batches that have no sweeps and so we'd not be able to resume.
|
||||
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
|
||||
readOpts := loopdb.NewSqlWriteOpts()
|
||||
return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
|
||||
return s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error {
|
||||
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -151,7 +151,7 @@ func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) (
|
||||
readOpts := loopdb.NewSqlReadOpts()
|
||||
var sweeps []*dbSweep
|
||||
|
||||
err := s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
|
||||
err := s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error {
|
||||
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user