2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-09 19:10:47 +00:00
loop/sweepbatcher/sweep_batcher.go

954 lines
27 KiB
Go
Raw Normal View History

2023-08-23 14:44:14 +00:00
package sweepbatcher
import (
"context"
"errors"
2023-08-23 14:44:14 +00:00
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
2023-08-23 14:44:14 +00:00
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
2023-08-23 14:44:14 +00:00
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/input"
2023-08-23 14:44:14 +00:00
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
const (
// defaultMaxTimeoutDistance is the default maximum timeout distance
// of sweeps that can appear in the same batch.
defaultMaxTimeoutDistance = 288
// batchOpen is the string representation of the state of a batch that
// is open.
batchOpen = "open"
// batchClosed is the string representation of the state of a batch
// that is closed.
batchClosed = "closed"
// batchConfirmed is the string representation of the state of a batch
// that is confirmed.
batchConfirmed = "confirmed"
// defaultMainnetPublishDelay is the default publish delay that is used
// for mainnet.
defaultMainnetPublishDelay = 5 * time.Second
// defaultTestnetPublishDelay is the default publish delay that is used
// for testnet.
defaultPublishDelay = 500 * time.Millisecond
)
type BatcherStore interface {
// FetchUnconfirmedSweepBatches fetches all the batches from the
// database that are not in a confirmed state.
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error)
2023-08-23 14:44:14 +00:00
// InsertSweepBatch inserts a batch into the database, returning the id
// of the inserted batch.
InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error)
// DropBatch drops a batch from the database. This should only be used
// when a batch is empty.
DropBatch(ctx context.Context, id int32) error
2023-08-23 14:44:14 +00:00
// UpdateSweepBatch updates a batch in the database.
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error
2023-08-23 14:44:14 +00:00
// ConfirmBatch confirms a batch by setting its state to confirmed.
ConfirmBatch(ctx context.Context, id int32) error
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)
2023-08-23 14:44:14 +00:00
// UpsertSweep inserts a sweep into the database, or updates an existing
// sweep if it already exists.
UpsertSweep(ctx context.Context, sweep *dbSweep) error
// GetSweepStatus returns the completed status of the sweep.
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error)
// GetParentBatch returns the parent batch of a (completed) sweep.
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch,
error)
// TotalSweptAmount returns the total amount swept by a (confirmed)
// batch.
TotalSweptAmount(ctx context.Context, id int32) (btcutil.Amount, error)
2023-08-23 14:44:14 +00:00
}
// SweepInfo stores any data related to sweeping a specific outpoint.
type SweepInfo struct {
// ConfTarget is the confirmation target of the sweep.
ConfTarget int32
// Timeout is the timeout of the swap that the sweep belongs to.
Timeout int32
// InitiationHeight is the height at which the swap was initiated.
InitiationHeight int32
// HTLC is the HTLC that is being swept.
HTLC swap.Htlc
// Preimage is the preimage of the HTLC that is being swept.
Preimage lntypes.Preimage
// SwapInvoicePaymentAddr is the payment address of the swap invoice.
SwapInvoicePaymentAddr [32]byte
// HTLCKeys is the set of keys used to sign the HTLC.
HTLCKeys loopdb.HtlcKeys
// HTLCSuccessEstimator is a function that estimates the weight of the
// HTLC success script.
HTLCSuccessEstimator func(*input.TxWeightEstimator) error
// ProtocolVersion is the protocol version of the swap that the sweep
// belongs to.
ProtocolVersion loopdb.ProtocolVersion
// IsExternalAddr is true if the sweep spends to a non-wallet address.
IsExternalAddr bool
// DestAddr is the destination address of the sweep.
DestAddr btcutil.Address
// NonCoopHint is set, if the sweep can not be spent cooperatively and
// has to be spent using preimage. This is only used in fee estimations
// when selecting a batch for the sweep to minimize fees.
NonCoopHint bool
}
// SweepFetcher is used to get details of a sweep.
type SweepFetcher interface {
// FetchSweep returns details of the sweep with the given hash.
FetchSweep(ctx context.Context, hash lntypes.Hash) (*SweepInfo, error)
}
2023-08-23 14:44:14 +00:00
// MuSig2SignSweep is a function that can be used to sign a sweep transaction
// cooperatively with the swap server.
type MuSig2SignSweep func(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte,
prevoutMap map[wire.OutPoint]*wire.TxOut) (
[]byte, []byte, error)
// SignMuSig2 is a function that can be used to sign a sweep transaction in a
// custom way.
type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version,
swapHash lntypes.Hash, rootHash chainhash.Hash, sigHash [32]byte,
) ([]byte, error)
2023-08-23 14:44:14 +00:00
// VerifySchnorrSig is a function that can be used to verify a schnorr
// signature.
type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
// FeeRateProvider is a function that returns min fee rate of a batch sweeping
// the UTXO of the swap.
type FeeRateProvider func(ctx context.Context,
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
2023-08-23 14:44:14 +00:00
// SweepRequest is a request to sweep a specific outpoint.
type SweepRequest struct {
// SwapHash is the hash of the swap that is being swept.
SwapHash lntypes.Hash
// Outpoint is the outpoint that is being swept.
Outpoint wire.OutPoint
// Value is the value of the outpoint that is being swept.
Value btcutil.Amount
// Notifier is a notifier that is used to notify the requester of this
// sweep that the sweep was successful.
Notifier *SpendNotifier
}
type SpendDetail struct {
// Tx is the transaction that spent the outpoint.
Tx *wire.MsgTx
// OnChainFeePortion is the fee portion that was paid to get this sweep
// confirmed on chain. This is the difference between the value of the
// outpoint and the value of all sweeps that were included in the batch
// divided by the number of sweeps.
OnChainFeePortion btcutil.Amount
}
2023-08-23 14:44:14 +00:00
// SpendNotifier is a notifier that is used to notify the requester of a sweep
// that the sweep was successful.
type SpendNotifier struct {
// SpendChan is a channel where the spend details are received.
SpendChan chan *SpendDetail
2023-08-23 14:44:14 +00:00
// SpendErrChan is a channel where spend errors are received.
SpendErrChan chan error
// QuitChan is a channel that can be closed to stop the notifier.
QuitChan chan bool
}
var (
ErrBatcherShuttingDown = errors.New("batcher shutting down")
2023-08-23 14:44:14 +00:00
)
// Batcher is a system that is responsible for accepting sweep requests and
// placing them in appropriate batches. It will spin up new batches as needed.
type Batcher struct {
// batches is a map of batch IDs to the currently active batches.
batches map[int32]*batch
// sweepReqs is a channel where sweep requests are received.
sweepReqs chan SweepRequest
// errChan is a channel where errors are received.
errChan chan error
// quit signals that the batch must stop.
quit chan struct{}
// initDone is a channel that is closed when the batcher has been
// initialized.
initDone chan struct{}
2023-08-23 14:44:14 +00:00
// wallet is the wallet kit client that is used by batches.
wallet lndclient.WalletKitClient
// chainNotifier is the chain notifier client that is used by batches.
chainNotifier lndclient.ChainNotifierClient
// signerClient is the signer client that is used by batches.
signerClient lndclient.SignerClient
// musig2ServerKit includes all the required functionality to collect
// and verify signatures by the swap server in order to cooperatively
// sweep funds.
musig2ServerSign MuSig2SignSweep
// verifySchnorrSig is a function that can be used to verify a schnorr
// signature.
VerifySchnorrSig VerifySchnorrSig
// chainParams are the chain parameters of the chain that is used by
// batches.
chainParams *chaincfg.Params
// store includes all the database interactions that are needed by the
// batcher and the batches.
store BatcherStore
// sweepStore is used to load sweeps from the database.
sweepStore SweepFetcher
2023-08-23 14:44:14 +00:00
// wg is a waitgroup that is used to wait for all the goroutines to
// exit.
wg sync.WaitGroup
// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
customFeeRate FeeRateProvider
// customMuSig2Signer is a custom signer. If it is set, it is used to
// create musig2 signatures instead of musig2SignSweep and signerClient.
// Note that musig2SignSweep must be nil in this case, however signer
// client must still be provided, as it is used for non-coop spendings.
customMuSig2Signer SignMuSig2
}
// BatcherConfig holds batcher configuration.
type BatcherConfig struct {
// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
customFeeRate FeeRateProvider
// customMuSig2Signer is a custom signer. If it is set, it is used to
// create musig2 signatures instead of musig2SignSweep and signerClient.
// Note that musig2SignSweep must be nil in this case, however signer
// client must still be provided, as it is used for non-coop spendings.
customMuSig2Signer SignMuSig2
}
// BatcherOption configures batcher behaviour.
type BatcherOption func(*BatcherConfig)
// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
// the caller should re-add the sweep by calling AddSweep.
func WithCustomFeeRate(customFeeRate FeeRateProvider) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.customFeeRate = customFeeRate
}
2023-08-23 14:44:14 +00:00
}
// WithCustomSignMuSig2 instructs sweepbatcher to use a custom function to
// produce MuSig2 signatures. If it is set, it is used to create
// musig2 signatures instead of musig2SignSweep and signerClient. Note
// that musig2SignSweep must be nil in this case, however signerClient
// must still be provided, as it is used for non-coop spendings.
func WithCustomSignMuSig2(customMuSig2Signer SignMuSig2) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.customMuSig2Signer = customMuSig2Signer
}
}
2023-08-23 14:44:14 +00:00
// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
signerClient lndclient.SignerClient, musig2ServerSigner MuSig2SignSweep,
verifySchnorrSig VerifySchnorrSig, chainparams *chaincfg.Params,
store BatcherStore, sweepStore SweepFetcher,
opts ...BatcherOption) *Batcher {
var cfg BatcherConfig
for _, opt := range opts {
opt(&cfg)
}
2023-08-23 14:44:14 +00:00
if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil {
panic("customMuSig2Signer must not be used with " +
"musig2ServerSigner")
}
2023-08-23 14:44:14 +00:00
return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
customFeeRate: cfg.customFeeRate,
customMuSig2Signer: cfg.customMuSig2Signer,
2023-08-23 14:44:14 +00:00
}
}
// Run starts the batcher and processes incoming sweep requests.
func (b *Batcher) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.quit)
2023-08-23 14:44:14 +00:00
for _, batch := range b.batches {
batch.Wait()
}
b.wg.Wait()
}()
// First we fetch all the batches that are not in a confirmed state from
// the database. We will then resume the execution of these batches.
batches, err := b.FetchUnconfirmedBatches(runCtx)
if err != nil {
return err
}
for _, batch := range batches {
err := b.spinUpBatchFromDB(runCtx, batch)
if err != nil {
return err
}
}
// Signal that the batcher has been initialized.
close(b.initDone)
2023-08-23 14:44:14 +00:00
for {
select {
case sweepReq := <-b.sweepReqs:
sweep, err := b.fetchSweep(runCtx, sweepReq)
if err != nil {
log.Warnf("fetchSweep failed: %v.", err)
2023-08-23 14:44:14 +00:00
return err
}
err = b.handleSweep(runCtx, sweep, sweepReq.Notifier)
if err != nil {
log.Warnf("handleSweep failed: %v.", err)
2023-08-23 14:44:14 +00:00
return err
}
case err := <-b.errChan:
log.Warnf("Batcher received an error: %v.", err)
2023-08-23 14:44:14 +00:00
return err
case <-runCtx.Done():
log.Infof("Stopping Batcher: run context cancelled.")
2023-08-23 14:44:14 +00:00
return runCtx.Err()
}
}
}
// AddSweep adds a sweep request to the batcher for handling. This will either
// place the sweep in an existing batch or create a new one.
func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
select {
case b.sweepReqs <- *sweepReq:
return nil
case <-b.quit:
return ErrBatcherShuttingDown
}
}
// handleSweep handles a sweep request by either placing it in an existing
// batch, or by spinning up a new batch for it.
func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
notifier *SpendNotifier) error {
completed, err := b.store.GetSweepStatus(ctx, sweep.swapHash)
if err != nil {
return err
}
log.Infof("Batcher handling sweep %x, completed=%v", sweep.swapHash[:6],
completed)
// If the sweep has already been completed in a confirmed batch then we
// can't attach its notifier to the batch as that is no longer running.
// Instead we directly detect and return the spend here.
if completed && *notifier != (SpendNotifier{}) {
// Verify that the parent batch is confirmed. Note that a batch
// is only considered confirmed after it has received three
// on-chain confirmations to prevent issues caused by reorgs.
parentBatch, err := b.store.GetParentBatch(ctx, sweep.swapHash)
if err != nil {
log.Errorf("unable to get parent batch for sweep %x: "+
"%v", sweep.swapHash[:6], err)
return err
}
// The parent batch is indeed confirmed, meaning it is complete
// and we won't be able to attach this sweep to it.
if parentBatch.State == batchConfirmed {
return b.monitorSpendAndNotify(
ctx, sweep, parentBatch.ID, notifier,
)
}
2023-08-23 14:44:14 +00:00
}
sweep.notifier = notifier
// Check if the sweep is already in a batch. If that is the case, we
// provide the sweep to that batch and return.
for _, batch := range b.batches {
// This is a check to see if a batch is completed. In that case
// we just lazily delete it and continue our scan.
if batch.isComplete() {
delete(b.batches, batch.id)
continue
}
if batch.sweepExists(sweep.swapHash) {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
2023-08-23 14:44:14 +00:00
return err
}
if !accepted {
return fmt.Errorf("existing sweep %x was not "+
2024-05-29 01:35:08 +00:00
"accepted by batch %d",
sweep.swapHash[:6], batch.id)
2023-08-23 14:44:14 +00:00
}
// The sweep was updated in the batch, our job is done.
return nil
2023-08-23 14:44:14 +00:00
}
}
// Try to run the greedy algorithm of batch selection to minimize costs.
err = b.greedyAddSweep(ctx, sweep)
if err == nil {
// The greedy algorithm succeeded.
return nil
}
log.Warnf("Greedy batch selection algorithm failed for sweep %x: %v. "+
"Falling back to old approach.", sweep.swapHash[:6], err)
2023-08-23 14:44:14 +00:00
// If one of the batches accepts the sweep, we provide it to that batch.
for _, batch := range b.batches {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
2023-08-23 14:44:14 +00:00
return err
}
// If the sweep was accepted by this batch, we return, our job
// is done.
if accepted {
return nil
}
}
// If no batch is capable of accepting the sweep, we spin up a fresh
// batch and hand the sweep over to it.
return b.spinUpNewBatch(ctx, sweep)
}
// spinUpNewBatch creates new batch, starts it and adds the sweep to it.
func (b *Batcher) spinUpNewBatch(ctx context.Context, sweep *sweep) error {
// Spin up a fresh batch.
newBatch, err := b.spinUpBatch(ctx)
2023-08-23 14:44:14 +00:00
if err != nil {
return err
}
// Add the sweep to the fresh batch.
accepted, err := newBatch.addSweep(ctx, sweep)
2023-08-23 14:44:14 +00:00
if err != nil {
return err
}
// If the sweep wasn't accepted by the fresh batch something is wrong,
// we should return the error.
if !accepted {
return fmt.Errorf("sweep %x was not accepted by new batch %d",
sweep.swapHash[:6], newBatch.id)
2023-08-23 14:44:14 +00:00
}
return nil
}
// spinUpBatch spins up a new batch and returns it.
func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
cfg := b.newBatchConfig(defaultMaxTimeoutDistance)
2023-08-23 14:44:14 +00:00
switch b.chainParams {
case &chaincfg.MainNetParams:
cfg.batchPublishDelay = defaultMainnetPublishDelay
default:
cfg.batchPublishDelay = defaultPublishDelay
}
batchKit := b.newBatchKit()
2023-08-23 14:44:14 +00:00
batch := NewBatch(cfg, batchKit)
id, err := batch.insertAndAcquireID(ctx)
if err != nil {
return nil, err
}
// We add the batch to our map of batches and start it.
b.batches[id] = batch
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := batch.Run(ctx)
if err != nil {
_ = b.writeToErrChan(ctx, err)
}
}()
return batch, nil
}
// spinUpBatchDB spins up a batch that already existed in storage, then
// returns it.
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
if err != nil {
return err
}
if len(dbSweeps) == 0 {
log.Infof("skipping restored batch %d as it has no sweeps",
batch.id)
// It is safe to drop this empty batch as it has no sweeps.
err := b.store.DropBatch(ctx, batch.id)
if err != nil {
log.Warnf("unable to drop empty batch %d: %v",
batch.id, err)
}
return nil
2023-08-23 14:44:14 +00:00
}
primarySweep := dbSweeps[0]
sweeps := make(map[lntypes.Hash]sweep)
// Collect feeRate from sweeps and stored batch.
feeRate := batch.rbfCache.FeeRate
2023-08-23 14:44:14 +00:00
for _, dbSweep := range dbSweeps {
sweep, err := b.convertSweep(ctx, dbSweep)
2023-08-23 14:44:14 +00:00
if err != nil {
return err
}
sweeps[sweep.swapHash] = *sweep
// Set minFeeRate to max(sweep.minFeeRate) for all sweeps.
if feeRate < sweep.minFeeRate {
feeRate = sweep.minFeeRate
}
2023-08-23 14:44:14 +00:00
}
rbfCache := rbfCache{
LastHeight: batch.rbfCache.LastHeight,
FeeRate: feeRate,
}
2024-05-29 01:35:08 +00:00
logger := batchPrefixLogger(fmt.Sprintf("%d", batch.id))
batchKit := b.newBatchKit()
batchKit.id = batch.id
batchKit.batchTxid = batch.batchTxid
batchKit.batchPkScript = batch.batchPkScript
batchKit.state = batch.state
batchKit.primaryID = primarySweep.SwapHash
batchKit.sweeps = sweeps
batchKit.rbfCache = rbfCache
batchKit.log = logger
2023-08-23 14:44:14 +00:00
cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance)
newBatch, err := NewBatchFromDB(cfg, batchKit)
if err != nil {
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
}
2023-08-23 14:44:14 +00:00
// We add the batch to our map of batches and start it.
b.batches[batch.id] = newBatch
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := newBatch.Run(ctx)
if err != nil {
_ = b.writeToErrChan(ctx, err)
}
}()
return nil
}
// FetchUnconfirmedBatches fetches all the batches from the database that are
// not in a confirmed state.
func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
error) {
dbBatches, err := b.store.FetchUnconfirmedSweepBatches(ctx)
if err != nil {
return nil, err
}
batches := make([]*batch, 0, len(dbBatches))
for _, bch := range dbBatches {
bch := bch
batch := batch{}
batch.id = bch.ID
switch bch.State {
case batchOpen:
batch.state = Open
case batchClosed:
batch.state = Closed
case batchConfirmed:
batch.state = Confirmed
}
batch.batchTxid = &bch.BatchTxid
batch.batchPkScript = bch.BatchPkScript
rbfCache := rbfCache{
LastHeight: bch.LastRbfHeight,
FeeRate: chainfee.SatPerKWeight(bch.LastRbfSatPerKw),
}
batch.rbfCache = rbfCache
bchCfg := b.newBatchConfig(bch.MaxTimeoutDistance)
2023-08-23 14:44:14 +00:00
batch.cfg = &bchCfg
batches = append(batches, &batch)
}
return batches, nil
}
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
// the response back to the response channel.
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
parentBatchID int32, notifier *SpendNotifier) error {
2023-08-23 14:44:14 +00:00
spendCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Then we get the total amount that was swept by the batch.
totalSwept, err := b.store.TotalSweptAmount(ctx, parentBatchID)
if err != nil {
return err
}
2023-08-23 14:44:14 +00:00
spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
spendCtx, &sweep.outpoint, sweep.htlc.PkScript,
sweep.initiationHeight,
)
if err != nil {
return err
2023-08-23 14:44:14 +00:00
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
log.Infof("Batcher monitoring spend for swap %x",
sweep.swapHash[:6])
2023-08-23 14:44:14 +00:00
for {
2023-08-23 14:44:14 +00:00
select {
case spend := <-spendChan:
spendTx := spend.SpendingTx
// Calculate the fee portion that each sweep
// should pay for the batch.
feePortionPerSweep, roundingDifference :=
getFeePortionForSweep(
spendTx, len(spendTx.TxIn),
totalSwept,
)
2024-05-29 01:35:08 +00:00
onChainFeePortion := getFeePortionPaidBySweep(
spendTx, feePortionPerSweep,
roundingDifference, sweep,
)
// Notify the requester of the spend
// with the spend details, including the fee
// portion for this particular sweep.
spendDetail := &SpendDetail{
2024-05-29 01:35:08 +00:00
Tx: spendTx,
OnChainFeePortion: onChainFeePortion,
}
select {
case notifier.SpendChan <- spendDetail:
case <-ctx.Done():
}
2023-08-23 14:44:14 +00:00
return
2023-08-23 14:44:14 +00:00
case err := <-spendErr:
select {
case notifier.SpendErrChan <- err:
case <-ctx.Done():
}
2023-08-23 14:44:14 +00:00
_ = b.writeToErrChan(ctx, err)
return
2023-08-23 14:44:14 +00:00
case <-notifier.QuitChan:
return
2023-08-23 14:44:14 +00:00
case <-ctx.Done():
return
}
2023-08-23 14:44:14 +00:00
}
}()
return nil
2023-08-23 14:44:14 +00:00
}
func (b *Batcher) writeToErrChan(ctx context.Context, err error) error {
select {
case b.errChan <- err:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// convertSweep converts a fetched sweep from the database to a sweep that is
// ready to be processed by the batcher. It loads swap from loopdb by calling
// method FetchLoopOutSwap.
func (b *Batcher) convertSweep(ctx context.Context, dbSweep *dbSweep) (
*sweep, error) {
return b.loadSweep(ctx, dbSweep.SwapHash, dbSweep.Outpoint,
dbSweep.Amount)
2023-08-23 14:44:14 +00:00
}
// LoopOutFetcher is used to load LoopOut swaps from the database.
// It is implemented by loopdb.SwapStore.
type LoopOutFetcher interface {
// FetchLoopOutSwap returns the loop out swap with the given hash.
FetchLoopOutSwap(ctx context.Context,
hash lntypes.Hash) (*loopdb.LoopOut, error)
}
2023-08-23 14:44:14 +00:00
// SwapStoreWrapper is LoopOutFetcher wrapper providing SweepFetcher interface.
type SwapStoreWrapper struct {
// swapStore is used to load LoopOut swaps from the database.
swapStore LoopOutFetcher
2023-08-23 14:44:14 +00:00
// chainParams are the chain parameters of the chain that is used by
// batches.
chainParams *chaincfg.Params
}
// FetchSweep returns details of the sweep with the given hash.
// Implements SweepFetcher interface.
func (f *SwapStoreWrapper) FetchSweep(ctx context.Context,
swapHash lntypes.Hash) (*SweepInfo, error) {
swap, err := f.swapStore.FetchLoopOutSwap(ctx, swapHash)
2023-08-23 14:44:14 +00:00
if err != nil {
return nil, fmt.Errorf("failed to fetch loop out for %x: %w",
2023-08-23 14:44:14 +00:00
swapHash[:6], err)
}
htlc, err := utils.GetHtlc(
swapHash, &swap.Contract.SwapContract, f.chainParams,
2023-08-23 14:44:14 +00:00
)
if err != nil {
return nil, fmt.Errorf("failed to get htlc: %w", err)
2023-08-23 14:44:14 +00:00
}
swapPaymentAddr, err := utils.ObtainSwapPaymentAddr(
swap.Contract.SwapInvoice, f.chainParams,
2023-08-23 14:44:14 +00:00
)
if err != nil {
return nil, fmt.Errorf("failed to get payment addr: %w", err)
2023-08-23 14:44:14 +00:00
}
return &SweepInfo{
ConfTarget: swap.Contract.SweepConfTarget,
Timeout: swap.Contract.CltvExpiry,
InitiationHeight: swap.Contract.InitiationHeight,
HTLC: *htlc,
Preimage: swap.Contract.Preimage,
SwapInvoicePaymentAddr: *swapPaymentAddr,
HTLCKeys: swap.Contract.HtlcKeys,
HTLCSuccessEstimator: htlc.AddSuccessToEstimator,
ProtocolVersion: swap.Contract.ProtocolVersion,
IsExternalAddr: swap.Contract.IsExternalAddr,
DestAddr: swap.Contract.DestAddr,
}, nil
}
// NewSweepFetcherFromSwapStore accepts swapStore (e.g. loopdb) and returns
// a wrapper implementing SweepFetcher interface (suitable for NewBatcher).
func NewSweepFetcherFromSwapStore(swapStore LoopOutFetcher,
chainParams *chaincfg.Params) (*SwapStoreWrapper, error) {
return &SwapStoreWrapper{
swapStore: swapStore,
chainParams: chainParams,
}, nil
}
// fetchSweep fetches the sweep related information from the database.
func (b *Batcher) fetchSweep(ctx context.Context,
sweepReq SweepRequest) (*sweep, error) {
return b.loadSweep(ctx, sweepReq.SwapHash, sweepReq.Outpoint,
sweepReq.Value)
}
// loadSweep loads inputs of sweep from the database and from FeeRateProvider
// if needed and returns an assembled sweep object.
func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash,
outpoint wire.OutPoint, value btcutil.Amount) (*sweep, error) {
s, err := b.sweepStore.FetchSweep(ctx, swapHash)
if err != nil {
return nil, fmt.Errorf("failed to fetch sweep data for %x: %w",
swapHash[:6], err)
}
// Find minimum fee rate for the sweep. Use customFeeRate if it is
// provided, otherwise use wallet's EstimateFeeRate.
var minFeeRate chainfee.SatPerKWeight
if b.customFeeRate != nil {
minFeeRate, err = b.customFeeRate(ctx, swapHash)
if err != nil {
return nil, fmt.Errorf("failed to fetch min fee rate "+
"for %x: %w", swapHash[:6], err)
}
if minFeeRate < chainfee.AbsoluteFeePerKwFloor {
return nil, fmt.Errorf("min fee rate too low (%v) for "+
"%x", minFeeRate, swapHash[:6])
}
} else {
if s.ConfTarget == 0 {
log.Warnf("Fee estimation was requested for zero "+
"confTarget for sweep %x.", swapHash[:6])
}
minFeeRate, err = b.wallet.EstimateFeeRate(ctx, s.ConfTarget)
if err != nil {
return nil, fmt.Errorf("failed to estimate fee rate "+
"for %x, confTarget=%d: %w", swapHash[:6],
s.ConfTarget, err)
}
}
2023-08-23 14:44:14 +00:00
return &sweep{
swapHash: swapHash,
outpoint: outpoint,
value: value,
confTarget: s.ConfTarget,
timeout: s.Timeout,
initiationHeight: s.InitiationHeight,
htlc: s.HTLC,
preimage: s.Preimage,
swapInvoicePaymentAddr: s.SwapInvoicePaymentAddr,
htlcKeys: s.HTLCKeys,
htlcSuccessEstimator: s.HTLCSuccessEstimator,
protocolVersion: s.ProtocolVersion,
isExternalAddr: s.IsExternalAddr,
destAddr: s.DestAddr,
minFeeRate: minFeeRate,
nonCoopHint: s.NonCoopHint,
2023-08-23 14:44:14 +00:00
}, nil
}
// newBatchConfig creates new batch config.
func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
return batchConfig{
maxTimeoutDistance: maxTimeoutDistance,
noBumping: b.customFeeRate != nil,
customMuSig2Signer: b.customMuSig2Signer,
}
}
// newBatchKit creates new batch kit.
func (b *Batcher) newBatchKit() batchKit {
return batchKit{
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}
}