mirror of
https://github.com/lightninglabs/loop
synced 2024-11-08 01:10:29 +00:00
Merge pull request #760 from starius/sweepbatcher-rm-defaultBatchConfTarget
sweepbatcher: load from DB preserves confTarget
This commit is contained in:
commit
5c88cf86b9
@ -296,7 +296,7 @@ func testCustomSweepConfTarget(t *testing.T) {
|
||||
|
||||
errChan := make(chan error, 2)
|
||||
|
||||
batcherStore := sweepbatcher.NewStoreMock()
|
||||
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
|
||||
|
||||
batcher := sweepbatcher.NewBatcher(
|
||||
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
@ -529,7 +529,7 @@ func testPreimagePush(t *testing.T) {
|
||||
|
||||
errChan := make(chan error, 2)
|
||||
|
||||
batcherStore := sweepbatcher.NewStoreMock()
|
||||
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
|
||||
|
||||
batcher := sweepbatcher.NewBatcher(
|
||||
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
@ -950,7 +950,7 @@ func TestLoopOutMuSig2Sweep(t *testing.T) {
|
||||
|
||||
errChan := make(chan error, 2)
|
||||
|
||||
batcherStore := sweepbatcher.NewStoreMock()
|
||||
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
|
||||
|
||||
batcher := sweepbatcher.NewBatcher(
|
||||
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
|
@ -3,6 +3,7 @@ package sweepbatcher
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
@ -11,15 +12,17 @@ import (
|
||||
|
||||
// StoreMock implements a mock client swap store.
|
||||
type StoreMock struct {
|
||||
batches map[int32]dbBatch
|
||||
sweeps map[lntypes.Hash]dbSweep
|
||||
batches map[int32]dbBatch
|
||||
sweeps map[lntypes.Hash]dbSweep
|
||||
swapStore LoopOutFetcher
|
||||
}
|
||||
|
||||
// NewStoreMock instantiates a new mock store.
|
||||
func NewStoreMock() *StoreMock {
|
||||
func NewStoreMock(swapStore LoopOutFetcher) *StoreMock {
|
||||
return &StoreMock{
|
||||
batches: make(map[int32]dbBatch),
|
||||
sweeps: make(map[lntypes.Hash]dbSweep),
|
||||
batches: make(map[int32]dbBatch),
|
||||
sweeps: make(map[lntypes.Hash]dbSweep),
|
||||
swapStore: swapStore,
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,9 +93,21 @@ func (s *StoreMock) FetchBatchSweeps(ctx context.Context,
|
||||
result := []*dbSweep{}
|
||||
for _, sweep := range s.sweeps {
|
||||
sweep := sweep
|
||||
if sweep.BatchID == id {
|
||||
result = append(result, &sweep)
|
||||
if sweep.BatchID != id {
|
||||
continue
|
||||
}
|
||||
|
||||
// Load swap from loopdb.
|
||||
swap, err := s.swapStore.FetchLoopOutSwap(
|
||||
ctx, sweep.SwapHash,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch swap "+
|
||||
"for SwapHash=%v", sweep.SwapHash)
|
||||
}
|
||||
sweep.LoopOut = swap
|
||||
|
||||
result = append(result, &sweep)
|
||||
}
|
||||
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
|
@ -35,10 +35,6 @@ const (
|
||||
// fee rate is increased when an rbf is attempted.
|
||||
defaultFeeRateStep = chainfee.SatPerKWeight(100)
|
||||
|
||||
// defaultBatchConfTarget is the default confirmation target of the
|
||||
// batch transaction.
|
||||
defaultBatchConfTarget = 12
|
||||
|
||||
// batchConfHeight is the default confirmation height of the batch
|
||||
// transaction.
|
||||
batchConfHeight = 3
|
||||
@ -316,7 +312,22 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
|
||||
}
|
||||
|
||||
// NewBatchFromDB creates a new batch that already existed in storage.
|
||||
func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
|
||||
func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
|
||||
// Make sure the batch is not empty.
|
||||
if len(bk.sweeps) == 0 {
|
||||
// This should never happen, as this precondition is already
|
||||
// ensured in spinUpBatchFromDB.
|
||||
return nil, fmt.Errorf("empty batch is not allowed")
|
||||
}
|
||||
|
||||
// Assign batchConfTarget to primary sweep's confTarget.
|
||||
for _, sweep := range bk.sweeps {
|
||||
if sweep.swapHash == bk.primaryID {
|
||||
cfg.batchConfTarget = sweep.confTarget
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return &batch{
|
||||
id: bk.id,
|
||||
state: bk.state,
|
||||
@ -343,7 +354,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
|
||||
store: bk.store,
|
||||
log: bk.log,
|
||||
cfg: &cfg,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// addSweep tries to add a sweep to the batch. If this is the first sweep being
|
||||
@ -1044,6 +1055,10 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
|
||||
// If the feeRate is unset then we never published before, so we
|
||||
// retrieve the fee estimate from our wallet.
|
||||
if b.rbfCache.FeeRate == 0 {
|
||||
if b.cfg.batchConfTarget == 0 {
|
||||
b.log.Warnf("updateRbfRate called with zero " +
|
||||
"batchConfTarget")
|
||||
}
|
||||
b.log.Infof("initializing rbf fee rate for conf target=%v",
|
||||
b.cfg.batchConfTarget)
|
||||
rate, err := b.wallet.EstimateFeeRate(
|
||||
|
@ -378,7 +378,6 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
|
||||
func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
|
||||
cfg := batchConfig{
|
||||
maxTimeoutDistance: defaultMaxTimeoutDistance,
|
||||
batchConfTarget: defaultBatchConfTarget,
|
||||
}
|
||||
|
||||
switch b.chainParams {
|
||||
@ -488,10 +487,12 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
|
||||
|
||||
cfg := batchConfig{
|
||||
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
|
||||
batchConfTarget: defaultBatchConfTarget,
|
||||
}
|
||||
|
||||
newBatch := NewBatchFromDB(cfg, batchKit)
|
||||
newBatch, err := NewBatchFromDB(cfg, batchKit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
|
||||
}
|
||||
|
||||
// We add the batch to our map of batches and start it.
|
||||
b.batches[batch.id] = newBatch
|
||||
|
@ -64,7 +64,7 @@ func TestSweepBatcherBatchCreation(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -218,7 +218,7 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -355,7 +355,7 @@ func TestSweepBatcherSweepReentry(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -562,7 +562,7 @@ func TestSweepBatcherNonWalletAddr(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -727,7 +727,7 @@ func TestSweepBatcherComposite(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -1044,7 +1044,7 @@ func TestRestoringEmptyBatch(t *testing.T) {
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
_, err := batcherStore.InsertSweepBatch(ctx, &dbBatch{})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -1109,7 +1109,7 @@ func TestRestoringEmptyBatch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, batches, 1)
|
||||
|
||||
// Now make it quit by canceling the context.
|
||||
// Now make the batcher quit by canceling the context.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
@ -1158,7 +1158,7 @@ func TestHandleSweepTwice(t *testing.T) {
|
||||
|
||||
store := newLoopStoreMock()
|
||||
|
||||
batcherStore := NewStoreMock()
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
@ -1297,9 +1297,125 @@ func TestHandleSweepTwice(t *testing.T) {
|
||||
require.Equal(t, 1, len(batcher.batches[0].sweeps))
|
||||
require.Equal(t, 1, len(batcher.batches[1].sweeps))
|
||||
|
||||
// Now make it quit by canceling the context.
|
||||
// Now make the batcher quit by canceling the context.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
checkBatcherError(t, runErr)
|
||||
}
|
||||
|
||||
// TestRestoringPreservesConfTarget tests that after the batch is written to DB
|
||||
// and loaded back, its batchConfTarget value is preserved.
|
||||
func TestRestoringPreservesConfTarget(t *testing.T) {
|
||||
defer test.Guard(t)()
|
||||
|
||||
lnd := test.NewMockLnd()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
store := loopdb.NewStoreMock(t)
|
||||
|
||||
batcherStore := NewStoreMock(store)
|
||||
|
||||
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
var runErr error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
runErr = batcher.Run(ctx)
|
||||
}()
|
||||
|
||||
// Wait for the batcher to be initialized.
|
||||
<-batcher.initDone
|
||||
|
||||
// Create a sweep request.
|
||||
sweepReq := SweepRequest{
|
||||
SwapHash: lntypes.Hash{1, 1, 1},
|
||||
Value: 111,
|
||||
Outpoint: wire.OutPoint{
|
||||
Hash: chainhash.Hash{1, 1},
|
||||
Index: 1,
|
||||
},
|
||||
Notifier: &dummyNotifier,
|
||||
}
|
||||
|
||||
swap := &loopdb.LoopOutContract{
|
||||
SwapContract: loopdb.SwapContract{
|
||||
CltvExpiry: 111,
|
||||
AmountRequested: 111,
|
||||
},
|
||||
|
||||
SwapInvoice: swapInvoice,
|
||||
SweepConfTarget: 123,
|
||||
}
|
||||
|
||||
err := store.CreateLoopOut(ctx, sweepReq.SwapHash, swap)
|
||||
require.NoError(t, err)
|
||||
store.AssertLoopOutStored()
|
||||
|
||||
// Deliver sweep request to batcher.
|
||||
require.NoError(t, batcher.AddSweep(&sweepReq))
|
||||
|
||||
// Since a batch was created we check that it registered for its primary
|
||||
// sweep's spend.
|
||||
<-lnd.RegisterSpendChannel
|
||||
|
||||
// Once batcher receives sweep request it will eventually spin up a
|
||||
// batch.
|
||||
require.Eventually(t, func() bool {
|
||||
// Make sure that the sweep was stored and we have exactly one
|
||||
// active batch, with one sweep and proper batchConfTarget.
|
||||
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
|
||||
len(batcher.batches) == 1 &&
|
||||
len(batcher.batches[0].sweeps) == 1 &&
|
||||
batcher.batches[0].cfg.batchConfTarget == 123
|
||||
}, test.Timeout, eventuallyCheckFrequency)
|
||||
|
||||
// Make sure we have stored the batch.
|
||||
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, batches, 1)
|
||||
|
||||
// Now make the batcher quit by canceling the context.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
// Make sure the batcher exited without an error.
|
||||
checkBatcherError(t, runErr)
|
||||
|
||||
// Now launch it again.
|
||||
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
|
||||
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
runErr = batcher.Run(ctx)
|
||||
}()
|
||||
|
||||
// Wait for the batcher to be initialized.
|
||||
<-batcher.initDone
|
||||
|
||||
// Wait for batch to load.
|
||||
require.Eventually(t, func() bool {
|
||||
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
|
||||
len(batcher.batches) == 1 &&
|
||||
len(batcher.batches[0].sweeps) == 1
|
||||
}, test.Timeout, eventuallyCheckFrequency)
|
||||
|
||||
// Make sure batchConfTarget was preserved.
|
||||
require.Equal(t, 123, int(batcher.batches[0].cfg.batchConfTarget))
|
||||
|
||||
// Expect registration for spend notification.
|
||||
<-lnd.RegisterSpendChannel
|
||||
|
||||
// Now make the batcher quit by canceling the context.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
// Make sure the batcher exited without an error.
|
||||
checkBatcherError(t, runErr)
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func newSwapClient(config *clientConfig) *Client {
|
||||
|
||||
lndServices := config.LndServices
|
||||
|
||||
batcherStore := sweepbatcher.NewStoreMock()
|
||||
batcherStore := sweepbatcher.NewStoreMock(config.Store)
|
||||
|
||||
batcher := sweepbatcher.NewBatcher(
|
||||
config.LndServices.WalletKit, config.LndServices.ChainNotifier,
|
||||
|
Loading…
Reference in New Issue
Block a user