2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-09 19:10:47 +00:00

sweepbatcher: add option WithPublishErrorHandler

WithPublishErrorHandler sets the callback used to handle publish errors.
It can be used to filter out noisy messages.
This commit is contained in:
Boris Nagaev 2024-08-27 11:44:28 -03:00
parent 6b852c1c0b
commit 7c93dff830
No known key found for this signature in database
3 changed files with 281 additions and 97 deletions

View File

@ -282,6 +282,11 @@ type batch struct {
// verifySchnorrSig is a function that verifies a schnorr signature.
verifySchnorrSig VerifySchnorrSig
// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
// purger is a function that can take a sweep which is being purged and
// hand it over to the batcher for further processing.
purger Purger
@ -308,23 +313,24 @@ type Purger func(sweepReq *SweepRequest) error
// struct is only used as a wrapper for the arguments that are required to
// create a new batch.
type batchKit struct {
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
publishErrorHandler PublishErrorHandler
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}
// scheduleNextCall schedules the next call to the batch handler's main event
@ -353,28 +359,29 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
return &batch{
// We set the ID to a negative value to flag that this batch has
// never been persisted, so it needs to be assigned a new ID.
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
}
}
@ -396,32 +403,33 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
}
return &batch{
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
}, nil
}
@ -795,23 +803,31 @@ func (b *batch) publish(ctx context.Context) error {
return err
}
// logPublishError is a function which logs publish errors.
logPublishError := func(errMsg string, err error) {
b.publishErrorHandler(err, errMsg, b.log)
}
if b.cfg.mixedBatch {
fee, err, signSuccess = b.publishMixedBatch(ctx)
if err != nil {
b.log.Warnf("Mixed batch publish error: %v", err)
logPublishError("mixed batch publish error", err)
}
} else {
fee, err, signSuccess = b.publishBatchCoop(ctx)
if err != nil {
b.log.Warnf("co-op publish error: %v", err)
logPublishError("co-op publish error", err)
}
}
if !signSuccess {
fee, err = b.publishBatch(ctx)
if err != nil {
logPublishError("non-coop publish error", err)
}
}
if err != nil {
b.log.Warnf("publish error: %v", err)
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@ -12,6 +13,8 @@ import (
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/loopdb"
@ -159,6 +162,25 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
type FeeRateProvider func(ctx context.Context,
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
// PublishErrorHandler is a function that handles transaction publishing error.
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)
// defaultPublishErrorLogger is an instance of PublishErrorHandler which logs
// all errors as warnings, but "insufficient fee" as info (since they are
// expected, if RBF fails).
func defaultPublishErrorLogger(err error, errMsg string, log btclog.Logger) {
// Check if the error is "insufficient fee" error.
if strings.Contains(err.Error(), chain.ErrInsufficientFee.Error()) {
// Log "insufficient fee" with level Info.
log.Infof("%s: %v", errMsg, err)
return
}
// Log any other error as a warning.
log.Warnf("%s: %v", errMsg, err)
}
// SweepRequest is a request to sweep a specific outpoint.
type SweepRequest struct {
// SwapHash is the hash of the swap that is being swept.
@ -296,6 +318,11 @@ type Batcher struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool
// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
}
// BatcherConfig holds batcher configuration.
@ -341,6 +368,11 @@ type BatcherConfig struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool
// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
}
// BatcherOption configures batcher behaviour.
@ -420,6 +452,14 @@ func WithMixedBatch() BatcherOption {
}
}
// WithPublishErrorHandler sets the callback used to handle publish errors.
// It can be used to filter out noisy messages.
func WithPublishErrorHandler(handler PublishErrorHandler) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.publishErrorHandler = handler
}
}
// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
@ -432,6 +472,11 @@ func NewBatcher(wallet lndclient.WalletKitClient,
// By default, loop/labels.LoopOutBatchSweepSuccess is used
// to label sweep transactions.
txLabeler: labels.LoopOutBatchSweepSuccess,
// publishErrorHandler is a function that handles transaction
// publishing error. By default, it logs all errors as warnings,
// but "insufficient fee" as Info.
publishErrorHandler: defaultPublishErrorLogger,
}
for _, opt := range opts {
opt(&cfg)
@ -448,26 +493,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
}
return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
publishErrorHandler: cfg.publishErrorHandler,
}
}
@ -1092,14 +1138,15 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
// newBatchKit creates new batch kit.
func (b *Batcher) newBatchKit() batchKit {
return batchKit{
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
publishErrorHandler: b.publishErrorHandler,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}
}

View File

@ -541,6 +541,122 @@ func testTxLabeler(t *testing.T, store testStore,
checkBatcherError(t, runErr)
}
// testTransactionPublisher wraps a wallet kit and returns publish error on
// the first publish attempt. Further attempts succeed.
type testTransactionPublisher struct {
lndclient.WalletKitClient
attempts int
}
var testPublishError = errors.New("test publish error")
// PublishTransaction publishes the transaction or fails it's the first attempt.
func (p *testTransactionPublisher) PublishTransaction(ctx context.Context,
tx *wire.MsgTx, label string) error {
p.attempts++
if p.attempts == 1 {
return testPublishError
}
return p.WalletKitClient.PublishTransaction(ctx, tx, label)
}
// testPublishErrorHandler tests that publish error handler installed with
// WithPublishErrorHandler, works as expected.
func testPublishErrorHandler(t *testing.T, store testStore,
batcherStore testBatcherStore) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
walletKit := &testTransactionPublisher{WalletKitClient: lnd.WalletKit}
// Catch all publish errors and send them to a channel.
publishErrorChan := make(chan error)
errorHandler := func(err error, errMsg string, log btclog.Logger) {
log.Warnf("%s: %v", errMsg, err)
publishErrorChan <- err
}
batcher := NewBatcher(walletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
batcherStore, sweepStore, WithPublishErrorHandler(errorHandler))
var (
runErr error
wg sync.WaitGroup
)
wg.Add(1)
go func() {
defer wg.Done()
runErr = batcher.Run(ctx)
}()
// Create a sweep request.
sweepReq1 := SweepRequest{
SwapHash: lntypes.Hash{1, 1, 1},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{1, 1},
Index: 1,
},
Notifier: &dummyNotifier,
}
swap1 := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
HtlcKeys: htlcKeys,
},
DestAddr: destAddr,
SwapInvoice: swapInvoice,
SweepConfTarget: 111,
}
err = store.CreateLoopOut(ctx, sweepReq1.SwapHash, swap1)
require.NoError(t, err)
store.AssertLoopOutStored()
// Deliver sweep request to batcher.
require.NoError(t, batcher.AddSweep(&sweepReq1))
// Eventually request will be consumed and a new batch will spin up.
require.Eventually(t, func() bool {
return len(batcher.batches) == 1
}, test.Timeout, eventuallyCheckFrequency)
// When batch is successfully created it will execute it's first step,
// which leads to a spend monitor of the primary sweep.
<-lnd.RegisterSpendChannel
// The first attempt to publish the batch tx is expected to fail.
require.ErrorIs(t, <-publishErrorChan, testPublishError)
// Mine a block to trigger another publishing attempt.
err = lnd.NotifyHeight(601)
require.NoError(t, err)
// Now publishing should succeed for tx to be published.
<-lnd.TxPublishChannel
// Now make the batcher quit by canceling the context.
cancel()
wg.Wait()
checkBatcherError(t, runErr)
}
// testSweepBatcherSimpleLifecycle tests the simple lifecycle of the batches
// that are created and run by the batcher.
func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
@ -3613,6 +3729,11 @@ func TestTxLabeler(t *testing.T) {
runTests(t, testTxLabeler)
}
// TestPublishErrorHandler tests transaction labels.
func TestPublishErrorHandler(t *testing.T) {
runTests(t, testPublishErrorHandler)
}
// TestSweepBatcherSimpleLifecycle tests the simple lifecycle of the batches
// that are created and run by the batcher.
func TestSweepBatcherSimpleLifecycle(t *testing.T) {