From 9af6718089e6e1b8b02a1d9f130dd1e9638f4403 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 9 Aug 2024 17:19:38 -0300 Subject: [PATCH] sweepbatcher: test InitialDelay and PublishDelay Tested scenarios: 1. For a regular sweep newly added it waits for initialDelay + publishDelay before publishing a transaction. 2. For a sweep recovered from DB it does not wait before publishing tx. 3. If a sweep is about to expire, initialDelay is skipped. --- sweepbatcher/sweep_batcher_test.go | 494 +++++++++++++++++++++++++++++ 1 file changed, 494 insertions(+) diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 96b77fc..18fe747 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -19,9 +19,11 @@ import ( "github.com/lightninglabs/loop/test" "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -548,6 +550,493 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) } +// wrappedLogger implements btclog.Logger, recording last debug message format. +// It is needed to watch for messages in tests. +type wrappedLogger struct { + btclog.Logger + + debugMessages []string + infoMessages []string +} + +// Debugf logs debug message. +func (l *wrappedLogger) Debugf(format string, params ...interface{}) { + l.debugMessages = append(l.debugMessages, format) + l.Logger.Debugf(format, params...) +} + +// Infof logs info message. +func (l *wrappedLogger) Infof(format string, params ...interface{}) { + l.infoMessages = append(l.infoMessages, format) + l.Logger.Infof(format, params...) +} + +// testDelays tests that WithInitialDelay and WithPublishDelay work. +func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { + // Set initial delay and publish delay. + const ( + initialDelay = 4 * time.Second + publishDelay = 3 * time.Second + ) + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC) + tickSignal := make(chan time.Duration) + testClock := clock.NewTestClockWithTickSignal(startTime, tickSignal) + + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(initialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + + var wg sync.WaitGroup + wg.Add(1) + + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Create a sweep request. + sweepReq := SweepRequest{ + SwapHash: lntypes.Hash{1, 1, 1}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 1000, + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq)) + + // Expect two timers to be set: initialDelay and publishDelay, + // and RegisterSpend to be called. The order is not determined, + // so catch these actions from two separate goroutines. + var wg2 sync.WaitGroup + + wg2.Add(1) + go func() { + defer wg2.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg2.Add(1) + var delays []time.Duration + go func() { + defer wg2.Done() + + // Expect two timers: initialDelay and publishDelay. + delays = append(delays, <-tickSignal) + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registrations. + wg2.Wait() + + // Expect timer for initialDelay and publishDelay to be registered. + wantDelays := []time.Duration{initialDelay, publishDelay} + require.Equal(t, wantDelays, delays) + + // Eventually the batch is launched. + require.Eventually(t, func() bool { + return len(batcher.batches) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Replace the logger in the batch with wrappedLogger to watch messages. + var batch1 *batch + for _, batch := range batcher.batches { + batch1 = batch + } + require.NotNil(t, batch1) + testLogger := &wrappedLogger{Logger: batch1.log} + batch1.log = testLogger + + // Advance the clock to publishDelay. It will trigger the publishDelay + // timer, but won't result in publishing, because of initialDelay. + now := startTime.Add(publishDelay) + testClock.SetTime(now) + + // Wait for batch publishing to be skipped, because initialDelay has not + // ended. + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Contains(t, testLogger.debugMessages, stillWaitingMsg) + }, test.Timeout, eventuallyCheckFrequency) + + // Advance the clock to the end of initialDelay. + now = startTime.Add(initialDelay) + testClock.SetTime(now) + + // Expect timer for publishDelay to be registered. + require.Equal(t, publishDelay, <-tickSignal) + + // Advance the clock. + now = now.Add(publishDelay) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Once batcher receives sweep request it will eventually spin up a + // batch. + require.Eventually(t, func() bool { + // Make sure that the sweep was stored + if !batcherStore.AssertSweepStored(sweepReq.SwapHash) { + return false + } + + // Make sure there is exactly one active batch. + if len(batcher.batches) != 1 { + return false + } + + // Get the batch. + batch := getOnlyBatch(batcher) + + // Make sure the batch has one sweep. + return len(batch.sweeps) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Make sure we have stored the batch. + batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx) + require.NoError(t, err) + require.Len(t, batches, 1) + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) + + // Advance the clock by 1 second. + now = now.Add(time.Second) + testClock.SetTime(now) + + // Now launch it again. + batcher = NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(initialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + ctx, cancel = context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Wait for batch to load. + require.Eventually(t, func() bool { + // Make sure that the sweep was stored + if !batcherStore.AssertSweepStored(sweepReq.SwapHash) { + return false + } + + // Make sure there is exactly one active batch. + if len(batcher.batches) != 1 { + return false + } + + // Get the batch. + batch := getOnlyBatch(batcher) + + // Make sure the batch has one sweep. + return len(batch.sweeps) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Expect a timer to be set: 0 (instead of publishDelay), and + // RegisterSpend to be called. The order is not determined, so catch + // these actions from two separate goroutines. + var wg3 sync.WaitGroup + + wg3.Add(1) + go func() { + defer wg3.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg3.Add(1) + delays = nil + go func() { + defer wg3.Done() + + // Expect one timer: publishDelay (0). + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registration. + wg3.Wait() + + // Expect one timer: publishDelay (0). + wantDelays = []time.Duration{0} + require.Equal(t, wantDelays, delays) + + // Advance the clock. + now = now.Add(time.Millisecond) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Tick tock next block. + err = lnd.NotifyHeight(601) + require.NoError(t, err) + + // Expect timer for publishDelay (0) to be registered. Make sure + // sweepbatcher does not wait for recovered batches after new block + // arrives as well. + require.Equal(t, time.Duration(0), <-tickSignal) + + // Advance the clock. + now = now.Add(time.Millisecond) + testClock.SetTime(now) + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) + + // Advance the clock by 1 second. + now = now.Add(time.Second) + testClock.SetTime(now) + + // Now test for large initialDelay and make sure it is cancelled + // for an urgent sweep. + const largeInitialDelay = 6 * time.Hour + + batcher = NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithInitialDelay(largeInitialDelay), + WithPublishDelay(publishDelay), WithClock(testClock), + ) + ctx, cancel = context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Expect spend notification and publication for the first batch. + // Expect a timer to be set: 0 (instead of publishDelay), and + // RegisterSpend to be called. The order is not determined, so catch + // these actions from two separate goroutines. + var wg4 sync.WaitGroup + + wg4.Add(1) + go func() { + defer wg4.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg4.Add(1) + delays = nil + go func() { + defer wg4.Done() + + // Expect one timer: publishDelay (0). + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timer registration. + wg4.Wait() + + // Expect one timer: publishDelay (0). + wantDelays = []time.Duration{0} + require.Equal(t, wantDelays, delays) + + // Get spend notification and tx publication for the first batch. + <-lnd.TxPublishChannel + + // Create a sweep request which is not urgent, but close to. + sweepReq2 := SweepRequest{ + SwapHash: lntypes.Hash{2, 2, 2}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + const blocksInDelay = int32(largeInitialDelay / (10 * time.Minute)) + swap2 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + // CltvExpiry is not urgent, but close. + CltvExpiry: 600 + blocksInDelay*2 + 5, + + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{2}, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq2.SwapHash, swap2) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq2)) + + // Expect the sweep to be added to new batch. Expect two timers: + // largeInitialDelay and publishDelay. RegisterSpend is called in + // parallel, so catch these actions from two separate goroutines. + var wg5 sync.WaitGroup + + wg5.Add(1) + go func() { + defer wg5.Done() + + // Since a batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + }() + + wg5.Add(1) + delays = nil + go func() { + defer wg5.Done() + + // Expect two timer: largeInitialDelay, publishDelay. + delays = append(delays, <-tickSignal) + delays = append(delays, <-tickSignal) + }() + + // Wait for RegisterSpend and for timers' registrations. + wg5.Wait() + + // Expect two timers: largeInitialDelay, publishDelay. + wantDelays = []time.Duration{largeInitialDelay, publishDelay} + require.Equal(t, wantDelays, delays) + + // Replace the logger in the batch with wrappedLogger to watch messages. + var batch2 *batch + for _, batch := range batcher.batches { + if batch.id != batch1.id { + batch2 = batch + } + } + require.NotNil(t, batch2) + testLogger2 := &wrappedLogger{Logger: batch2.log} + batch2.log = testLogger2 + + // Add another sweep which is urgent. It will go to the same batch + // to make sure minimum timeout is calculated properly. + sweepReq3 := SweepRequest{ + SwapHash: lntypes.Hash{3, 3, 3}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + swap3 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + // CltvExpiry is urgent. + CltvExpiry: 600 + blocksInDelay*2 - 5, + + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: lntypes.Preimage{3}, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, sweepReq3.SwapHash, swap3) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq3)) + + // Wait for sweep to be added to the batch. + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Contains(t, testLogger2.infoMessages, "adding sweep %x") + }, test.Timeout, eventuallyCheckFrequency) + + // Advance the clock by publishDelay. Don't wait largeInitialDelay. + now = now.Add(publishDelay) + testClock.SetTime(now) + + // Wait for tx to be published. + tx := <-lnd.TxPublishChannel + require.Equal(t, 2, len(tx.TxIn)) + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) +} + // testSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func testSweepBatcherSweepReentry(t *testing.T, store testStore, @@ -2284,6 +2773,11 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) { runTests(t, testSweepBatcherSimpleLifecycle) } +// TestDelays tests that WithInitialDelay and WithPublishDelay work. +func TestDelays(t *testing.T) { + runTests(t, testDelays) +} + // TestSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func TestSweepBatcherSweepReentry(t *testing.T) {