2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-09 19:10:47 +00:00
loop/sweepbatcher/sweep_batcher_test.go
Boris Nagaev c34d04e2f7
sweepbatcher: fix race condition when stopping
The race was detected in CI and locally when running with -race.
It happened between the following calls:

WARNING: DATA RACE
Write at 0x00c0003e6638 by goroutine 1374:
  runtime.racewrite()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Wait()
      sweepbatcher/sweep_batch.go:463 +0x6e
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).Run.func1()
      sweepbatcher/sweep_batcher.go:272 +0x10e

Previous read at 0x00c0003e6638 by goroutine 1388:
  runtime.raceread()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).monitorConfirmations()
      sweepbatcher/sweep_batch.go:1144 +0x285
  github.com/lightninglabs/loop/sweepbatcher.(*batch).handleSpend()
      sweepbatcher/sweep_batch.go:1309 +0x10e4
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Run()
      sweepbatcher/sweep_batch.go:526 +0xb04
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).spinUpBatch.func1()
      sweepbatcher/sweep_batcher.go:455 +0xbd

The race was caused because wg.Add(1) and wg.Wait() were running from different
goroutines (one goroutine was running batch.Run() and another - batcher.Run()).

To avoid this scenario, wg.Wait() call was moved into batch.Run() call, so it
waits itself for its children goroutines, after which the channel b.finished
is closed, and it serves a signal for external waiters (the batcher, calling
batch.Wait()).

Also the channel batch.stopped was renamed to batch.stopping to better reflect
its nature.

Added TestSweepBatcherCloseDuringAdding to make sure adding a sweep during
shutting down does not cause a crash. The test did not catch the original
race condition.
2024-06-18 11:34:43 -03:00

1637 lines
42 KiB
Go

package sweepbatcher
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/test"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/stretchr/testify/require"
)
const (
swapInvoice = "lntb1230n1pjjszzgpp5j76f03wrkya4sm4gxv6az5nmz5aqsvmn4" +
"tpguu2sdvdyygedqjgqdq9xyerxcqzzsxqr23ssp5rwzmwtfjmsgranfk8sr" +
"4p4gcgmvyd42uug8pxteg2mkk23ndvkqs9qyyssq44ruk3ex59cmv4dm6k4v" +
"0kc6c0gcqjs0gkljfyd6c6uatqa2f67xlx3pcg5tnvcae5p3jju8ra77e87d" +
"vhhs0jrx53wnc0fq9rkrhmqqelyx7l"
eventuallyCheckFrequency = 100 * time.Millisecond
ntfnBufferSize = 1024
)
func testMuSig2SignSweep(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte,
prevoutMap map[wire.OutPoint]*wire.TxOut) (
[]byte, []byte, error) {
return nil, nil, nil
}
var dummyNotifier = SpendNotifier{
SpendChan: make(chan *SpendDetail, ntfnBufferSize),
SpendErrChan: make(chan error, ntfnBufferSize),
QuitChan: make(chan bool, ntfnBufferSize),
}
func checkBatcherError(t *testing.T, err error) {
if !errors.Is(err, context.Canceled) &&
!errors.Is(err, ErrBatcherShuttingDown) &&
!errors.Is(err, ErrBatchShuttingDown) {
require.NoError(t, err)
}
}
// TestSweepBatcherBatchCreation tests that sweep requests enter the expected
// batch based on their timeout distance.
func TestSweepBatcherBatchCreation(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Create a sweep request.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Insert the same swap twice, this should be a noop.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Create a second sweep request that has a timeout distance less than
// our configured threshold.
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 222,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 2,
},
Notifier: &dummyNotifier,
}
swap2 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance - 1,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2)
require.NoError(t, err)
store.AssertLoopOutStored()
require.NoError(t, batcher.AddSweep(&sweepReq2))
// Batcher should not create a second batch as timeout distance is small
// enough.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Create a third sweep request that has more timeout distance than
// the default.
sweepReq3 := SweepRequest{
SwapHash: lntypes.Hash{3, 3, 3},
Value: 333,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{3, 3},
Index: 3,
},
Notifier: &dummyNotifier,
}
swap3 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance + 1,
AmountRequested: 333,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3)
require.NoError(t, err)
store.AssertLoopOutStored()
require.NoError(t, batcher.AddSweep(&sweepReq3))
// Batcher should create a second batch as timeout distance is greater
// than the threshold
require.Eventually(t, func() bool {
return len(batcher.batches) == 2
}, test.Timeout, eventuallyCheckFrequency)
// Since the second batch got created we check that it registered its
// primary sweep's spend.
<-lnd.RegisterSpendChannel
require.Eventually(t, func() bool {
// Verify that each batch has the correct number of sweeps
// in it.
for _, batch := range batcher.batches {
switch batch.primarySweepID {
case sweepReq1.SwapHash:
if len(batch.sweeps) != 2 {
return false
}
case sweepReq3.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
}
}
return true
}, test.Timeout, eventuallyCheckFrequency)
// Check that all sweeps were stored.
require.True(t, batcherStore.AssertSweepStored(sweepReq1.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq2.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq3.SwapHash))
}
// TestSweepBatcherSimpleLifecycle tests the simple lifecycle of the batches
// that are created and run by the batcher.
func TestSweepBatcherSimpleLifecycle(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Create a sweep request.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
SweepConfTarget: 111,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Eventually request will be consumed and a new batch will spin up.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// When batch is successfully created it will execute it's first step,
// which leads to a spend monitor of the primary sweep.
<-lnd.RegisterSpendChannel
// Find the batch and assign it to a local variable for easier access.
batch := &batch{}
for _, btch := range batcher.batches {
if btch.primarySweepID == sweepReq1.SwapHash {
batch = btch
}
}
require.Eventually(t, func() bool {
// Batch should have the sweep stored.
return len(batch.sweeps) == 1
}, test.Timeout, eventuallyCheckFrequency)
// The primary sweep id should be that of the first inserted sweep.
require.Equal(t, batch.primarySweepID, sweepReq1.SwapHash)
err = lnd.NotifyHeight(601)
require.NoError(t, err)
// After receiving a height notification the batch will step again,
// leading to a new spend monitoring.
require.Eventually(t, func() bool {
return batch.currentHeight == 601
}, test.Timeout, eventuallyCheckFrequency)
// Create the spending tx that will trigger the spend monitor of the
// batch.
spendingTx := &wire.MsgTx{
Version: 1,
// Since the spend monitor is registered on the primary sweep's
// outpoint we insert that outpoint here.
TxIn: []*wire.TxIn{
{
PreviousOutPoint: sweepReq1.Outpoint,
},
},
TxOut: []*wire.TxOut{
{
PkScript: []byte{3, 2, 1},
},
},
}
spendingTxHash := spendingTx.TxHash()
// Mock the spend notification that spends the swap.
spendDetail := &chainntnfs.SpendDetail{
SpentOutPoint: &sweepReq1.Outpoint,
SpendingTx: spendingTx,
SpenderTxHash: &spendingTxHash,
SpenderInputIndex: 0,
SpendingHeight: 601,
}
// We notify the spend.
lnd.SpendChannel <- spendDetail
// After receiving the spend, the batch is now monitoring for confs.
<-lnd.RegisterConfChannel
// The batch should eventually read the spend notification and progress
// its state to closed.
require.Eventually(t, func() bool {
return batch.state == Closed
}, test.Timeout, eventuallyCheckFrequency)
err = lnd.NotifyHeight(604)
require.NoError(t, err)
// We mock the tx confirmation notification.
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
Tx: spendingTx,
}
// Eventually the batch receives the confirmation notification and
// confirms itself.
require.Eventually(t, func() bool {
return batch.isComplete()
}, test.Timeout, eventuallyCheckFrequency)
}
// TestSweepBatcherSweepReentry tests that when an old version of the batch tx
// gets confirmed the sweep leftovers are sent back to the batcher.
func TestSweepBatcherSweepReentry(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Create some sweep requests with timeouts not too far away, in order
// to enter the same batch.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
SweepConfTarget: 111,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 222,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 2,
},
Notifier: &dummyNotifier,
}
swap2 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
SweepConfTarget: 111,
}
err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2)
require.NoError(t, err)
store.AssertLoopOutStored()
sweepReq3 := SweepRequest{
SwapHash: lntypes.Hash{3, 3, 3},
Value: 333,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{3, 3},
Index: 3,
},
Notifier: &dummyNotifier,
}
swap3 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 333,
},
SwapInvoice: swapInvoice,
SweepConfTarget: 111,
}
err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3)
require.NoError(t, err)
store.AssertLoopOutStored()
// Feed the sweeps to the batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// After inserting the primary (first) sweep, a spend monitor should be
// registered.
<-lnd.RegisterSpendChannel
require.NoError(t, batcher.AddSweep(&sweepReq2))
require.NoError(t, batcher.AddSweep(&sweepReq3))
// Batcher should create a batch for the sweeps.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Find the batch and store it in a local variable for easier access.
b := &batch{}
for _, btch := range batcher.batches {
if btch.primarySweepID == sweepReq1.SwapHash {
b = btch
}
}
// Batcher should contain all sweeps.
require.Eventually(t, func() bool {
return len(b.sweeps) == 3
}, test.Timeout, eventuallyCheckFrequency)
// Verify that the batch has a primary sweep id that matches the first
// inserted sweep, sweep1.
require.Equal(t, b.primarySweepID, sweepReq1.SwapHash)
// Create the spending tx. In order to simulate an older version of the
// batch transaction being confirmed, we only insert the primary sweep's
// outpoint as a TxIn. This means that the other two sweeps did not
// appear in the spending transaction. (This simulates a possible
// scenario caused by RBF replacements.)
spendingTx := &wire.MsgTx{
Version: 1,
TxIn: []*wire.TxIn{
{
PreviousOutPoint: sweepReq1.Outpoint,
},
},
TxOut: []*wire.TxOut{
{
Value: int64(sweepReq1.Value.ToUnit(
btcutil.AmountSatoshi,
)),
PkScript: []byte{3, 2, 1},
},
},
}
spendingTxHash := spendingTx.TxHash()
spendDetail := &chainntnfs.SpendDetail{
SpentOutPoint: &sweepReq1.Outpoint,
SpendingTx: spendingTx,
SpenderTxHash: &spendingTxHash,
SpenderInputIndex: 0,
SpendingHeight: 601,
}
// Send the spending notification to the mock channel.
lnd.SpendChannel <- spendDetail
// After receiving the spend notification the batch should progress to
// the next step, which is monitoring for confirmations.
<-lnd.RegisterConfChannel
// Eventually the batch reads the notification and proceeds to a closed
// state.
require.Eventually(t, func() bool {
return b.state == Closed
}, test.Timeout, eventuallyCheckFrequency)
// While handling the spend notification the batch should detect that
// some sweeps did not appear in the spending tx, therefore it redirects
// them back to the batcher and the batcher inserts them in a new batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 2
}, test.Timeout, eventuallyCheckFrequency)
// Since second batch was created we check that it registered for its
// primary sweep's spend.
<-lnd.RegisterSpendChannel
// We mock the confirmation notification.
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
Tx: spendingTx,
}
// Eventually the batch receives the confirmation notification,
// gracefully exits and the batcher deletes it.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Find the other batch, which includes the sweeps that did not appear
// in the spending tx.
b = &batch{}
for _, btch := range batcher.batches {
b = btch
}
// After all the sweeps enter, it should contain 2 sweeps.
require.Eventually(t, func() bool {
return len(b.sweeps) == 2
}, test.Timeout, eventuallyCheckFrequency)
// The batch should be in an open state.
require.Equal(t, b.state, Open)
}
// TestSweepBatcherNonWalletAddr tests that sweep requests that sweep to a non
// wallet address enter individual batches.
func TestSweepBatcherNonWalletAddr(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Create a sweep request.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
IsExternalAddr: true,
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Insert the same swap twice, this should be a noop.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Create a second sweep request that has a timeout distance less than
// our configured threshold.
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 222,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 2,
},
Notifier: &dummyNotifier,
}
swap2 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance - 1,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
IsExternalAddr: true,
}
err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2)
require.NoError(t, err)
store.AssertLoopOutStored()
require.NoError(t, batcher.AddSweep(&sweepReq2))
// Batcher should create a second batch as first batch is a non wallet
// addr batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 2
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Create a third sweep request that has more timeout distance than
// the default.
sweepReq3 := SweepRequest{
SwapHash: lntypes.Hash{3, 3, 3},
Value: 333,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{3, 3},
Index: 3,
},
Notifier: &dummyNotifier,
}
swap3 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance + 1,
AmountRequested: 333,
},
SwapInvoice: swapInvoice,
IsExternalAddr: true,
}
err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3)
require.NoError(t, err)
store.AssertLoopOutStored()
require.NoError(t, batcher.AddSweep(&sweepReq3))
// Batcher should create a new batch as timeout distance is greater than
// the threshold
require.Eventually(t, func() bool {
return len(batcher.batches) == 3
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
require.Eventually(t, func() bool {
// Verify that each batch has the correct number of sweeps
// in it.
for _, batch := range batcher.batches {
switch batch.primarySweepID {
case sweepReq1.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
case sweepReq2.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
case sweepReq3.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
}
}
return true
}, test.Timeout, eventuallyCheckFrequency)
// Check that all sweeps were stored.
require.True(t, batcherStore.AssertSweepStored(sweepReq1.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq2.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq3.SwapHash))
}
// TestSweepBatcherComposite tests that sweep requests that sweep to both wallet
// addresses and non-wallet addresses enter the correct batches.
func TestSweepBatcherComposite(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Create a sweep request.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
// Create a second sweep request that has a timeout distance less than
// our configured threshold.
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 222,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 2,
},
Notifier: &dummyNotifier,
}
swap2 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance - 1,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2)
require.NoError(t, err)
store.AssertLoopOutStored()
// Create a third sweep request that has less timeout distance than the
// default max, but is not spending to a wallet address.
sweepReq3 := SweepRequest{
SwapHash: lntypes.Hash{3, 3, 3},
Value: 333,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{3, 3},
Index: 3,
},
Notifier: &dummyNotifier,
}
swap3 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance - 3,
AmountRequested: 333,
},
SwapInvoice: swapInvoice,
IsExternalAddr: true,
}
err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3)
require.NoError(t, err)
store.AssertLoopOutStored()
// Create a fourth sweep request that has a timeout which is not valid
// for the first batch, so it will cause it to create a new batch.
sweepReq4 := SweepRequest{
SwapHash: lntypes.Hash{4, 4, 4},
Value: 444,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{4, 4},
Index: 4,
},
Notifier: &dummyNotifier,
}
swap4 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance + 1,
AmountRequested: 444,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq4.SwapHash, swap4)
require.NoError(t, err)
store.AssertLoopOutStored()
// Create a fifth sweep request that has a timeout which is not valid
// for the first batch, but a valid timeout for the new batch.
sweepReq5 := SweepRequest{
SwapHash: lntypes.Hash{5, 5, 5},
Value: 555,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{5, 5},
Index: 5,
},
Notifier: &dummyNotifier,
}
swap5 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance + 5,
AmountRequested: 555,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq5.SwapHash, swap5)
require.NoError(t, err)
store.AssertLoopOutStored()
// Create a sixth sweep request that has a valid timeout for the new
// batch, but is paying to a non-wallet address.
sweepReq6 := SweepRequest{
SwapHash: lntypes.Hash{6, 6, 6},
Value: 666,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{6, 6},
Index: 6,
},
Notifier: &dummyNotifier,
}
swap6 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111 + defaultMaxTimeoutDistance + 6,
AmountRequested: 666,
},
SwapInvoice: swapInvoice,
IsExternalAddr: true,
}
err = store.CreateLoopOut(ctx, sweepReq6.SwapHash, swap6)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Insert the same swap twice, this should be a noop.
require.NoError(t, batcher.AddSweep(&sweepReq1))
require.NoError(t, batcher.AddSweep(&sweepReq2))
// Batcher should not create a second batch as timeout distance is small
// enough.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
require.NoError(t, batcher.AddSweep(&sweepReq3))
// Batcher should create a second batch as this sweep pays to a non
// wallet address.
require.Eventually(t, func() bool {
return len(batcher.batches) == 2
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
require.NoError(t, batcher.AddSweep(&sweepReq4))
// Batcher should create a third batch as timeout distance is greater
// than the threshold.
require.Eventually(t, func() bool {
return len(batcher.batches) == 3
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
require.NoError(t, batcher.AddSweep(&sweepReq5))
// Batcher should not create a fourth batch as timeout distance is small
// enough for it to join the last batch.
require.Eventually(t, func() bool {
return len(batcher.batches) == 3
}, test.Timeout, eventuallyCheckFrequency)
require.NoError(t, batcher.AddSweep(&sweepReq6))
// Batcher should create a fourth batch as this sweep pays to a non
// wallet address.
require.Eventually(t, func() bool {
return len(batcher.batches) == 4
}, test.Timeout, eventuallyCheckFrequency)
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
require.Eventually(t, func() bool {
// Verify that each batch has the correct number of sweeps in
// it.
for _, batch := range batcher.batches {
switch batch.primarySweepID {
case sweepReq1.SwapHash:
if len(batch.sweeps) != 2 {
return false
}
case sweepReq3.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
case sweepReq4.SwapHash:
if len(batch.sweeps) != 2 {
return false
}
case sweepReq6.SwapHash:
if len(batch.sweeps) != 1 {
return false
}
}
}
return true
}, test.Timeout, eventuallyCheckFrequency)
// Check that all sweeps were stored.
require.True(t, batcherStore.AssertSweepStored(sweepReq1.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq2.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq3.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq4.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq5.SwapHash))
require.True(t, batcherStore.AssertSweepStored(sweepReq6.SwapHash))
}
// makeTestTx creates a test transaction with a single output of the given
// value.
func makeTestTx(value int64) *wire.MsgTx {
tx := wire.NewMsgTx(wire.TxVersion)
tx.AddTxOut(wire.NewTxOut(value, nil))
return tx
}
// TestGetFeePortionForSweep tests that the fee portion for a sweep is correctly
// calculated.
func TestGetFeePortionForSweep(t *testing.T) {
tests := []struct {
name string
spendTxValue int64
numSweeps int
totalSweptAmt btcutil.Amount
expectedFeePortion btcutil.Amount
expectedRoundingDiff btcutil.Amount
}{
{
"Even Split",
100, 5, 200, 20, 0,
},
{
"Single Sweep",
100, 1, 200, 100, 0,
},
{
"With Rounding Diff",
200, 4, 350, 37, 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
spendTx := makeTestTx(tt.spendTxValue)
feePortion, roundingDiff := getFeePortionForSweep(
spendTx, tt.numSweeps, tt.totalSweptAmt,
)
require.Equal(t, tt.expectedFeePortion, feePortion)
require.Equal(t, tt.expectedRoundingDiff, roundingDiff)
})
}
}
// TestRestoringEmptyBatch tests that the batcher can be restored with an empty
// batch.
func TestRestoringEmptyBatch(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
_, err = batcherStore.InsertSweepBatch(ctx, &dbBatch{})
require.NoError(t, err)
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
var wg sync.WaitGroup
wg.Add(1)
var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Wait for the batcher to be initialized.
<-batcher.initDone
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq))
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored and we have exactly one
// active batch.
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Make sure we have only one batch stored (as we dropped the dormant
// one).
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
require.NoError(t, err)
require.Len(t, batches, 1)
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
checkBatcherError(t, runErr)
}
type loopStoreMock struct {
loops map[lntypes.Hash]*loopdb.LoopOut
mu sync.Mutex
}
func newLoopStoreMock() *loopStoreMock {
return &loopStoreMock{
loops: make(map[lntypes.Hash]*loopdb.LoopOut),
}
}
func (s *loopStoreMock) FetchLoopOutSwap(ctx context.Context,
hash lntypes.Hash) (*loopdb.LoopOut, error) {
s.mu.Lock()
defer s.mu.Unlock()
out, has := s.loops[hash]
if !has {
return nil, errors.New("loop not found")
}
return out, nil
}
func (s *loopStoreMock) putLoopOutSwap(hash lntypes.Hash, out *loopdb.LoopOut) {
s.mu.Lock()
defer s.mu.Unlock()
s.loops[hash] = out
}
// TestHandleSweepTwice tests that handing the same sweep twice must not
// add it to different batches.
func TestHandleSweepTwice(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
store := newLoopStoreMock()
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
var wg sync.WaitGroup
wg.Add(1)
var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Wait for the batcher to be initialized.
<-batcher.initDone
const shortCltv = 111
const longCltv = 111 + defaultMaxTimeoutDistance + 6
// Create two sweep requests with CltvExpiry distant from each other
// to go assigned to separate batches.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
loopOut1 := &loopdb.LoopOut{
Loop: loopdb.Loop{
Hash: lntypes.Hash{1, 1, 1},
},
Contract: &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: shortCltv,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
},
}
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 222,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 2,
},
Notifier: &dummyNotifier,
}
loopOut2 := &loopdb.LoopOut{
Loop: loopdb.Loop{
Hash: lntypes.Hash{2, 2, 2},
},
Contract: &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: longCltv,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
},
}
store.putLoopOutSwap(sweepReq1.SwapHash, loopOut1)
store.putLoopOutSwap(sweepReq2.SwapHash, loopOut2)
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Since two batches were created we check that it registered for its
// primary sweep's spend.
<-lnd.RegisterSpendChannel
// Deliver the second sweep. It will go to a separate batch,
// since CltvExpiry values are distant enough.
require.NoError(t, batcher.AddSweep(&sweepReq2))
<-lnd.RegisterSpendChannel
// Once batcher receives sweep request it will eventually spin up
// batches.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored and we have exactly one
// active batch.
return batcherStore.AssertSweepStored(sweepReq1.SwapHash) &&
batcherStore.AssertSweepStored(sweepReq2.SwapHash) &&
len(batcher.batches) == 2
}, test.Timeout, eventuallyCheckFrequency)
// Change the second sweep so that it can be added to the first batch.
// Change CltvExpiry.
loopOut2 = &loopdb.LoopOut{
Loop: loopdb.Loop{
Hash: lntypes.Hash{2, 2, 2},
},
Contract: &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: shortCltv,
AmountRequested: 222,
},
SwapInvoice: swapInvoice,
},
}
store.putLoopOutSwap(sweepReq2.SwapHash, loopOut2)
// Re-add the second sweep. It is expected to stay in second batch,
// not added to both batches.
require.NoError(t, batcher.AddSweep(&sweepReq2))
require.Eventually(t, func() bool {
// Make sure there are two batches.
batches := batcher.batches
if len(batches) != 2 {
return false
}
// Make sure the second batch has the second sweep.
sweep2, has := batches[1].sweeps[sweepReq2.SwapHash]
if !has {
return false
}
// Make sure the second sweep's timeout has been updated.
if sweep2.timeout != shortCltv {
return false
}
return true
}, test.Timeout, eventuallyCheckFrequency)
// Make sure each batch has one sweep. If the second sweep was added to
// both batches, the following check won't pass.
require.Equal(t, 1, len(batcher.batches[0].sweeps))
require.Equal(t, 1, len(batcher.batches[1].sweeps))
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
checkBatcherError(t, runErr)
}
// TestRestoringPreservesConfTarget tests that after the batch is written to DB
// and loaded back, its batchConfTarget value is preserved.
func TestRestoringPreservesConfTarget(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
var wg sync.WaitGroup
wg.Add(1)
var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Wait for the batcher to be initialized.
<-batcher.initDone
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
SweepConfTarget: 123,
}
err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq))
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored and we have exactly one
// active batch, with one sweep and proper batchConfTarget.
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
len(batcher.batches) == 1 &&
len(batcher.batches[0].sweeps) == 1 &&
batcher.batches[0].cfg.batchConfTarget == 123
}, test.Timeout, eventuallyCheckFrequency)
// Make sure we have stored the batch.
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
require.NoError(t, err)
require.Len(t, batches, 1)
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
// Now launch it again.
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
ctx, cancel = context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Wait for the batcher to be initialized.
<-batcher.initDone
// Wait for batch to load.
require.Eventually(t, func() bool {
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
len(batcher.batches) == 1 &&
len(batcher.batches[0].sweeps) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Make sure batchConfTarget was preserved.
require.Equal(t, 123, int(batcher.batches[0].cfg.batchConfTarget))
// Expect registration for spend notification.
<-lnd.RegisterSpendChannel
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
}
type sweepFetcherMock struct {
store map[lntypes.Hash]*SweepInfo
}
func (f *sweepFetcherMock) FetchSweep(ctx context.Context, hash lntypes.Hash) (
*SweepInfo, error) {
return f.store[hash], nil
}
// TestSweepFetcher tests providing custom sweep fetcher to Batcher.
func TestSweepFetcher(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
// Extract payment address from the invoice.
swapPaymentAddr, err := utils.ObtainSwapPaymentAddr(
swapInvoice, lnd.ChainParams,
)
require.NoError(t, err)
sweepFetcher := &sweepFetcherMock{
store: map[lntypes.Hash]*SweepInfo{
{1, 1, 1}: {
ConfTarget: 123,
Timeout: 111,
SwapInvoicePaymentAddr: *swapPaymentAddr,
},
},
}
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepFetcher)
var wg sync.WaitGroup
wg.Add(1)
var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Wait for the batcher to be initialized.
<-batcher.initDone
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq))
// Since a batch was created we check that it registered for its primary
// sweep's spend.
<-lnd.RegisterSpendChannel
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored and we have exactly one
// active batch, with one sweep and proper batchConfTarget.
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
len(batcher.batches) == 1 &&
len(batcher.batches[0].sweeps) == 1 &&
batcher.batches[0].cfg.batchConfTarget == 123
}, test.Timeout, eventuallyCheckFrequency)
// Make sure we have stored the batch.
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
require.NoError(t, err)
require.Len(t, batches, 1)
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
}
// TestSweepBatcherCloseDuringAdding tests that sweep batcher works correctly
// if it is closed (stops running) during AddSweep call.
func TestSweepBatcherCloseDuringAdding(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Add many swaps.
for i := byte(1); i < 255; i++ {
swapHash := lntypes.Hash{i, i, i}
// Create a swap contract.
swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, swapHash, swap)
require.NoError(t, err)
store.AssertLoopOutStored()
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Add many sweeps.
for i := byte(1); i < 255; i++ {
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{i, i, i},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{i, i},
Index: 1,
},
Notifier: &dummyNotifier,
}
// Deliver sweep request to batcher.
err := batcher.AddSweep(&sweepReq)
if err == ErrBatcherShuttingDown {
break
}
require.NoError(t, err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
// Close sweepbatcher during addings.
time.Sleep(1 * time.Millisecond)
cancel()
}()
// We don't know how many spend notification registrations will be
// issued, so accept them while waiting for two goroutines to stop.
quit := make(chan struct{})
registrationChan := make(chan struct{})
go func() {
defer close(registrationChan)
for {
select {
case <-lnd.RegisterSpendChannel:
case <-quit:
return
}
}
}()
wg.Wait()
close(quit)
<-registrationChan
}