sweepbatcher: test InitialDelay and PublishDelay

Tested scenarios:

1. For a regular sweep newly added it waits for initialDelay + publishDelay
   before publishing a transaction.
2. For a sweep recovered from DB it does not wait before publishing tx.
3. If a sweep is about to expire, initialDelay is skipped.
Boris Nagaev 1 month ago
parent 3f56345492
commit 9af6718089
No known key found for this signature in database

@ -19,9 +19,11 @@ import (
@ -548,6 +550,493 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
}, test.Timeout, eventuallyCheckFrequency)
// wrappedLogger implements btclog.Logger, recording last debug message format.
// It is needed to watch for messages in tests.
type wrappedLogger struct {
debugMessages []string
infoMessages []string
// Debugf logs debug message.
func (l *wrappedLogger) Debugf(format string, params ...interface{}) {
l.debugMessages = append(l.debugMessages, format)
l.Logger.Debugf(format, params...)
// Infof logs info message.
func (l *wrappedLogger) Infof(format string, params ...interface{}) {
l.infoMessages = append(l.infoMessages, format)
l.Logger.Infof(format, params...)
// testDelays tests that WithInitialDelay and WithPublishDelay work.
func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) {
// Set initial delay and publish delay.
const (
initialDelay = 4 * time.Second
publishDelay = 3 * time.Second
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC)
tickSignal := make(chan time.Duration)
testClock := clock.NewTestClockWithTickSignal(startTime, tickSignal)
batcher := NewBatcher(
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
batcherStore, sweepStore, WithInitialDelay(initialDelay),
WithPublishDelay(publishDelay), WithClock(testClock),
var wg sync.WaitGroup
var runErr error
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
// Wait for the batcher to be initialized.
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
Notifier: &dummyNotifier,
swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 1000,
AmountRequested: 111,
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
HtlcKeys: htlcKeys,
DestAddr: destAddr,
SwapInvoice: swapInvoice,
SweepConfTarget: 123,
err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap)
require.NoError(t, err)
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq))
// Expect two timers to be set: initialDelay and publishDelay,
// and RegisterSpend to be called. The order is not determined,
// so catch these actions from two separate goroutines.
var wg2 sync.WaitGroup
go func() {
defer wg2.Done()
// Since a batch was created we check that it registered for its
// primary sweep's spend.
var delays []time.Duration
go func() {
defer wg2.Done()
// Expect two timers: initialDelay and publishDelay.
delays = append(delays, <-tickSignal)
delays = append(delays, <-tickSignal)
// Wait for RegisterSpend and for timer registrations.
// Expect timer for initialDelay and publishDelay to be registered.
wantDelays := []time.Duration{initialDelay, publishDelay}
require.Equal(t, wantDelays, delays)
// Eventually the batch is launched.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Replace the logger in the batch with wrappedLogger to watch messages.
var batch1 *batch
for _, batch := range batcher.batches {
batch1 = batch
require.NotNil(t, batch1)
testLogger := &wrappedLogger{Logger: batch1.log}
batch1.log = testLogger
// Advance the clock to publishDelay. It will trigger the publishDelay
// timer, but won't result in publishing, because of initialDelay.
now := startTime.Add(publishDelay)
// Wait for batch publishing to be skipped, because initialDelay has not
// ended.
require.EventuallyWithT(t, func(c *assert.CollectT) {
require.Contains(t, testLogger.debugMessages, stillWaitingMsg)
}, test.Timeout, eventuallyCheckFrequency)
// Advance the clock to the end of initialDelay.
now = startTime.Add(initialDelay)
// Expect timer for publishDelay to be registered.
require.Equal(t, publishDelay, <-tickSignal)
// Advance the clock.
now = now.Add(publishDelay)
// Wait for tx to be published.
// Once batcher receives sweep request it will eventually spin up a
// batch.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored
if !batcherStore.AssertSweepStored(sweepReq.SwapHash) {
return false
// Make sure there is exactly one active batch.
if len(batcher.batches) != 1 {
return false
// Get the batch.
batch := getOnlyBatch(batcher)
// Make sure the batch has one sweep.
return len(batch.sweeps) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Make sure we have stored the batch.
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
require.NoError(t, err)
require.Len(t, batches, 1)
// Now make the batcher quit by canceling the context.
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
// Advance the clock by 1 second.
now = now.Add(time.Second)
// Now launch it again.
batcher = NewBatcher(
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
batcherStore, sweepStore, WithInitialDelay(initialDelay),
WithPublishDelay(publishDelay), WithClock(testClock),
ctx, cancel = context.WithCancel(context.Background())
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
// Wait for the batcher to be initialized.
// Wait for batch to load.
require.Eventually(t, func() bool {
// Make sure that the sweep was stored
if !batcherStore.AssertSweepStored(sweepReq.SwapHash) {
return false
// Make sure there is exactly one active batch.
if len(batcher.batches) != 1 {
return false
// Get the batch.
batch := getOnlyBatch(batcher)
// Make sure the batch has one sweep.
return len(batch.sweeps) == 1
}, test.Timeout, eventuallyCheckFrequency)
// Expect a timer to be set: 0 (instead of publishDelay), and
// RegisterSpend to be called. The order is not determined, so catch
// these actions from two separate goroutines.
var wg3 sync.WaitGroup
go func() {
defer wg3.Done()
// Since a batch was created we check that it registered for its
// primary sweep's spend.
delays = nil
go func() {
defer wg3.Done()
// Expect one timer: publishDelay (0).
delays = append(delays, <-tickSignal)
// Wait for RegisterSpend and for timer registration.
// Expect one timer: publishDelay (0).
wantDelays = []time.Duration{0}
require.Equal(t, wantDelays, delays)
// Advance the clock.
now = now.Add(time.Millisecond)
// Wait for tx to be published.
// Tick tock next block.
err = lnd.NotifyHeight(601)
require.NoError(t, err)
// Expect timer for publishDelay (0) to be registered. Make sure
// sweepbatcher does not wait for recovered batches after new block
// arrives as well.
require.Equal(t, time.Duration(0), <-tickSignal)
// Advance the clock.
now = now.Add(time.Millisecond)
// Wait for tx to be published.
// Now make the batcher quit by canceling the context.
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
// Advance the clock by 1 second.
now = now.Add(time.Second)
// Now test for large initialDelay and make sure it is cancelled
// for an urgent sweep.
const largeInitialDelay = 6 * time.Hour
batcher = NewBatcher(
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
batcherStore, sweepStore, WithInitialDelay(largeInitialDelay),
WithPublishDelay(publishDelay), WithClock(testClock),
ctx, cancel = context.WithCancel(context.Background())
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
// Wait for the batcher to be initialized.
// Expect spend notification and publication for the first batch.
// Expect a timer to be set: 0 (instead of publishDelay), and
// RegisterSpend to be called. The order is not determined, so catch
// these actions from two separate goroutines.
var wg4 sync.WaitGroup
go func() {
defer wg4.Done()
// Since a batch was created we check that it registered for its
// primary sweep's spend.
delays = nil
go func() {
defer wg4.Done()
// Expect one timer: publishDelay (0).
delays = append(delays, <-tickSignal)
// Wait for RegisterSpend and for timer registration.
// Expect one timer: publishDelay (0).
wantDelays = []time.Duration{0}
require.Equal(t, wantDelays, delays)
// Get spend notification and tx publication for the first batch.
// Create a sweep request which is not urgent, but close to.
sweepReq2 := SweepRequest{
SwapHash: lntypes.Hash{2, 2, 2},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 1,
Notifier: &dummyNotifier,
const blocksInDelay = int32(largeInitialDelay / (10 * time.Minute))
swap2 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
// CltvExpiry is not urgent, but close.
CltvExpiry: 600 + blocksInDelay*2 + 5,
AmountRequested: 111,
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
HtlcKeys: htlcKeys,
// Make preimage unique to pass SQL constraints.
Preimage: lntypes.Preimage{2},
DestAddr: destAddr,
SwapInvoice: swapInvoice,
SweepConfTarget: 123,
err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2)
require.NoError(t, err)
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq2))
// Expect the sweep to be added to new batch. Expect two timers:
// largeInitialDelay and publishDelay. RegisterSpend is called in
// parallel, so catch these actions from two separate goroutines.
var wg5 sync.WaitGroup
go func() {
defer wg5.Done()
// Since a batch was created we check that it registered for its
// primary sweep's spend.
delays = nil
go func() {
defer wg5.Done()
// Expect two timer: largeInitialDelay, publishDelay.
delays = append(delays, <-tickSignal)
delays = append(delays, <-tickSignal)
// Wait for RegisterSpend and for timers' registrations.
// Expect two timers: largeInitialDelay, publishDelay.
wantDelays = []time.Duration{largeInitialDelay, publishDelay}
require.Equal(t, wantDelays, delays)
// Replace the logger in the batch with wrappedLogger to watch messages.
var batch2 *batch
for _, batch := range batcher.batches {
if batch.id != batch1.id {
batch2 = batch
require.NotNil(t, batch2)
testLogger2 := &wrappedLogger{Logger: batch2.log}
batch2.log = testLogger2
// Add another sweep which is urgent. It will go to the same batch
// to make sure minimum timeout is calculated properly.
sweepReq3 := SweepRequest{
SwapHash: lntypes.Hash{3, 3, 3},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{2, 2},
Index: 1,
Notifier: &dummyNotifier,
swap3 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
// CltvExpiry is urgent.
CltvExpiry: 600 + blocksInDelay*2 - 5,
AmountRequested: 111,
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
HtlcKeys: htlcKeys,
// Make preimage unique to pass SQL constraints.
Preimage: lntypes.Preimage{3},
DestAddr: destAddr,
SwapInvoice: swapInvoice,
SweepConfTarget: 123,
err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3)
require.NoError(t, err)
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq3))
// Wait for sweep to be added to the batch.
require.EventuallyWithT(t, func(c *assert.CollectT) {
require.Contains(t, testLogger2.infoMessages, "adding sweep %x")
}, test.Timeout, eventuallyCheckFrequency)
// Advance the clock by publishDelay. Don't wait largeInitialDelay.
now = now.Add(publishDelay)
// Wait for tx to be published.
tx := <-lnd.TxPublishChannel
require.Equal(t, 2, len(tx.TxIn))
// Now make the batcher quit by canceling the context.
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
// testSweepBatcherSweepReentry tests that when an old version of the batch tx
// gets confirmed the sweep leftovers are sent back to the batcher.
func testSweepBatcherSweepReentry(t *testing.T, store testStore,
@ -2284,6 +2773,11 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) {
runTests(t, testSweepBatcherSimpleLifecycle)
// TestDelays tests that WithInitialDelay and WithPublishDelay work.
func TestDelays(t *testing.T) {
runTests(t, testDelays)
// TestSweepBatcherSweepReentry tests that when an old version of the batch tx
// gets confirmed the sweep leftovers are sent back to the batcher.
func TestSweepBatcherSweepReentry(t *testing.T) {
