From f21a21ee4128bd4b0f1eb96d888ed85c4f8aef12 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sat, 24 Aug 2024 00:20:45 -0300 Subject: [PATCH 1/2] sweepbatcher: set max batch size to 1000 sweeps This is needed to avoid non-standard batch transactions (larger than 400k wu). A non-cooperative input is 393 wu, so 1000 inputs are still under 400k wu. --- sweepbatcher/greedy_batch_selection.go | 10 +- sweepbatcher/sweep_batch.go | 12 ++ sweepbatcher/sweep_batcher_test.go | 161 +++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/greedy_batch_selection.go b/sweepbatcher/greedy_batch_selection.go index 78a2a5d..6b7b253 100644 --- a/sweepbatcher/greedy_batch_selection.go +++ b/sweepbatcher/greedy_batch_selection.go @@ -17,7 +17,8 @@ import ( // greedyAddSweep selects a batch for the sweep using the greedy algorithm, // which minimizes costs, and adds the sweep to the batch. To accomplish this, // it first collects fee details about the sweep being added, about a potential -// new batch composed of this sweep only, and about all existing batches. Then +// new batch composed of this sweep only, and about all existing batches. It +// skips batches with at least MaxSweepsPerBatch swaps to keep tx standard. Then // it passes the data to selectBatches() function, which emulates adding the // sweep to each batch and creating new batch for the sweep, and calculates the // costs of each alternative. Based on the estimates of selectBatches(), this @@ -40,6 +41,13 @@ func (b *Batcher) greedyAddSweep(ctx context.Context, sweep *sweep) error { // Collect weight and fee rate info about existing batches. batches := make([]feeDetails, 0, len(b.batches)) for _, existingBatch := range b.batches { + // Enforce MaxSweepsPerBatch. If there are already too many + // sweeps in the batch, do not add another sweep to prevent the + // tx from becoming non-standard. + if len(existingBatch.sweeps) >= MaxSweepsPerBatch { + continue + } + batchFeeDetails, err := estimateBatchWeight(existingBatch) if err != nil { return fmt.Errorf("failed to estimate tx weight for "+ diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 549d8f8..9da8d8c 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -45,6 +45,11 @@ const ( // maxFeeToSwapAmtRatio is the maximum fee to swap amount ratio that // we allow for a batch transaction. maxFeeToSwapAmtRatio = 0.2 + + // MaxSweepsPerBatch is the maximum number of sweeps in a single batch. + // It is needed to prevent sweep tx from becoming non-standard. Max + // standard transaction is 400k wu, a non-cooperative input is 393 wu. + MaxSweepsPerBatch = 1000 ) var ( @@ -462,6 +467,13 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { return true, nil } + // Enforce MaxSweepsPerBatch. If there are already too many sweeps in + // the batch, do not add another sweep to prevent the tx from becoming + // non-standard. + if len(b.sweeps) >= MaxSweepsPerBatch { + return false, nil + } + // Since all the actions of the batch happen sequentially, we could // arrive here after the batch got closed because of a spend. In this // case we cannot add the sweep to this batch. diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index e48100e..ae3c98e 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -19,6 +19,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/test" "github.com/lightninglabs/loop/utils" + "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/input" @@ -1176,6 +1177,161 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { checkBatcherError(t, runErr) } +// testMaxSweepsPerBatch tests the limit on max number of sweeps per batch. +func testMaxSweepsPerBatch(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + // Disable logging, because this test is very noisy. + oldLogger := log + UseLogger(build.NewSubLogger("SWEEP", nil)) + defer UseLogger(oldLogger) + + defer test.Guard(t, test.WithGuardTimeout(5*time.Minute))() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + startTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC) + testClock := clock.NewTestClock(startTime) + + // Create muSig2SignSweep failing all sweeps to force non-cooperative + // scenario (it increases transaction size). + muSig2SignSweep := func(ctx context.Context, + protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash, + paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte, + prevoutMap map[wire.OutPoint]*wire.TxOut) ( + []byte, []byte, error) { + + return nil, nil, fmt.Errorf("test error") + } + + // Set publish delay. + const publishDelay = 3 * time.Second + + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + muSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, WithPublishDelay(publishDelay), + WithClock(testClock), + ) + + var wg sync.WaitGroup + wg.Add(1) + + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + const swapsNum = MaxSweepsPerBatch + 1 + + // Expect 2 batches to be registered. + expectedBatches := (swapsNum + MaxSweepsPerBatch - 1) / + MaxSweepsPerBatch + + for i := 0; i < swapsNum; i++ { + preimage := lntypes.Preimage{2, byte(i % 256), byte(i / 256)} + swapHash := preimage.Hash() + + // Create a sweep request. + sweepReq := SweepRequest{ + SwapHash: swapHash, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 1000, + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + + // Make preimage unique to pass SQL constraints. + Preimage: preimage, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 123, + } + + err = store.CreateLoopOut(ctx, swapHash, swap) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(&sweepReq)) + + // If this is new batch, expect a spend registration. + if i%MaxSweepsPerBatch == 0 { + <-lnd.RegisterSpendChannel + } + } + + // Eventually the batches are launched and all the sweeps are added. + require.Eventually(t, func() bool { + // Make sure all the batches have started. + if len(batcher.batches) != expectedBatches { + return false + } + + // Make sure all the sweeps were added. + sweepsNum := 0 + for _, batch := range batcher.batches { + sweepsNum += len(batch.sweeps) + } + return sweepsNum == swapsNum + }, test.Timeout, eventuallyCheckFrequency) + + // Advance the clock to publishDelay, so batches are published. + now := startTime.Add(publishDelay) + testClock.SetTime(now) + + // Expect mockSigner.SignOutputRaw calls to sign non-cooperative + // sweeps. + for i := 0; i < expectedBatches; i++ { + <-lnd.SignOutputRawChannel + } + + // Wait for txs to be published. + inputsNum := 0 + const maxWeight = lntypes.WeightUnit(400_000) + for i := 0; i < expectedBatches; i++ { + tx := <-lnd.TxPublishChannel + inputsNum += len(tx.TxIn) + + // Make sure the transaction size is standard. + weight := lntypes.WeightUnit( + blockchain.GetTransactionWeight(btcutil.NewTx(tx)), + ) + require.Less(t, weight, maxWeight) + t.Logf("tx weight: %v", weight) + } + + // Make sure the number of inputs in batch transactions is equal + // to the number of swaps. + require.Equal(t, swapsNum, inputsNum) + + // Now make the batcher quit by canceling the context. + cancel() + wg.Wait() + + // Make sure the batcher exited without an error. + checkBatcherError(t, runErr) +} + // testSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func testSweepBatcherSweepReentry(t *testing.T, store testStore, @@ -3468,6 +3624,11 @@ func TestDelays(t *testing.T) { runTests(t, testDelays) } +// TestMaxSweepsPerBatch tests the limit on max number of sweeps per batch. +func TestMaxSweepsPerBatch(t *testing.T) { + runTests(t, testMaxSweepsPerBatch) +} + // TestSweepBatcherSweepReentry tests that when an old version of the batch tx // gets confirmed the sweep leftovers are sent back to the batcher. func TestSweepBatcherSweepReentry(t *testing.T) { From 7780aca492446122dbe1e2b73dc84e3367feca4e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 26 Aug 2024 20:53:29 -0300 Subject: [PATCH 2/2] sweepbatcher: log the reason of acceptance failure Add Info log message in cases when addSweep returns accept=false and err=nil. --- sweepbatcher/sweep_batch.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 9da8d8c..d2b2c0e 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -438,6 +438,8 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // If the provided sweep is nil, we can't proceed with any checks, so // we just return early. if sweep == nil { + b.log.Infof("the sweep is nil") + return false, nil } @@ -471,6 +473,9 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // the batch, do not add another sweep to prevent the tx from becoming // non-standard. if len(b.sweeps) >= MaxSweepsPerBatch { + b.log.Infof("the batch has already too many sweeps (%d >= %d)", + len(b.sweeps), MaxSweepsPerBatch) + return false, nil } @@ -478,6 +483,8 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // arrive here after the batch got closed because of a spend. In this // case we cannot add the sweep to this batch. if b.state != Open { + b.log.Infof("the batch state (%v) is not open", b.state) + return false, nil } @@ -485,7 +492,17 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // address, or the incoming sweep is spending to non-wallet address, // we cannot add this sweep to the batch. for _, s := range b.sweeps { - if s.isExternalAddr || sweep.isExternalAddr { + if s.isExternalAddr { + b.log.Infof("the batch already has a sweep (%x) with "+ + "an external address", s.swapHash[:6]) + + return false, nil + } + + if sweep.isExternalAddr { + b.log.Infof("the batch is not empty and new sweep (%x)"+ + " has an external address", sweep.swapHash[:6]) + return false, nil } } @@ -498,6 +515,11 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { int32(math.Abs(float64(sweep.timeout - s.timeout))) if timeoutDistance > b.cfg.maxTimeoutDistance { + b.log.Infof("too long timeout distance between the "+ + "batch and sweep %x: %d > %d", + sweep.swapHash[:6], timeoutDistance, + b.cfg.maxTimeoutDistance) + return false, nil } }