mirror of
https://github.com/lightninglabs/loop
synced 2024-11-09 19:10:47 +00:00
026cf0d47a
Now that sweep.minFeeRate is always set, greedy batch selection has all needed inputs to work even without customFeeRate provider.
954 lines
27 KiB
Go
954 lines
27 KiB
Go
package sweepbatcher
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcec/v2"
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcd/chaincfg"
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/lightninglabs/lndclient"
|
|
"github.com/lightninglabs/loop/loopdb"
|
|
"github.com/lightninglabs/loop/swap"
|
|
"github.com/lightninglabs/loop/utils"
|
|
"github.com/lightningnetwork/lnd/input"
|
|
"github.com/lightningnetwork/lnd/lntypes"
|
|
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
|
)
|
|
|
|
const (
|
|
// defaultMaxTimeoutDistance is the default maximum timeout distance
|
|
// of sweeps that can appear in the same batch.
|
|
defaultMaxTimeoutDistance = 288
|
|
|
|
// batchOpen is the string representation of the state of a batch that
|
|
// is open.
|
|
batchOpen = "open"
|
|
|
|
// batchClosed is the string representation of the state of a batch
|
|
// that is closed.
|
|
batchClosed = "closed"
|
|
|
|
// batchConfirmed is the string representation of the state of a batch
|
|
// that is confirmed.
|
|
batchConfirmed = "confirmed"
|
|
|
|
// defaultMainnetPublishDelay is the default publish delay that is used
|
|
// for mainnet.
|
|
defaultMainnetPublishDelay = 5 * time.Second
|
|
|
|
// defaultTestnetPublishDelay is the default publish delay that is used
|
|
// for testnet.
|
|
defaultPublishDelay = 500 * time.Millisecond
|
|
)
|
|
|
|
type BatcherStore interface {
|
|
// FetchUnconfirmedSweepBatches fetches all the batches from the
|
|
// database that are not in a confirmed state.
|
|
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error)
|
|
|
|
// InsertSweepBatch inserts a batch into the database, returning the id
|
|
// of the inserted batch.
|
|
InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error)
|
|
|
|
// DropBatch drops a batch from the database. This should only be used
|
|
// when a batch is empty.
|
|
DropBatch(ctx context.Context, id int32) error
|
|
|
|
// UpdateSweepBatch updates a batch in the database.
|
|
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error
|
|
|
|
// ConfirmBatch confirms a batch by setting its state to confirmed.
|
|
ConfirmBatch(ctx context.Context, id int32) error
|
|
|
|
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
|
|
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)
|
|
|
|
// UpsertSweep inserts a sweep into the database, or updates an existing
|
|
// sweep if it already exists.
|
|
UpsertSweep(ctx context.Context, sweep *dbSweep) error
|
|
|
|
// GetSweepStatus returns the completed status of the sweep.
|
|
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error)
|
|
|
|
// GetParentBatch returns the parent batch of a (completed) sweep.
|
|
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch,
|
|
error)
|
|
|
|
// TotalSweptAmount returns the total amount swept by a (confirmed)
|
|
// batch.
|
|
TotalSweptAmount(ctx context.Context, id int32) (btcutil.Amount, error)
|
|
}
|
|
|
|
// SweepInfo stores any data related to sweeping a specific outpoint.
|
|
type SweepInfo struct {
|
|
// ConfTarget is the confirmation target of the sweep.
|
|
ConfTarget int32
|
|
|
|
// Timeout is the timeout of the swap that the sweep belongs to.
|
|
Timeout int32
|
|
|
|
// InitiationHeight is the height at which the swap was initiated.
|
|
InitiationHeight int32
|
|
|
|
// HTLC is the HTLC that is being swept.
|
|
HTLC swap.Htlc
|
|
|
|
// Preimage is the preimage of the HTLC that is being swept.
|
|
Preimage lntypes.Preimage
|
|
|
|
// SwapInvoicePaymentAddr is the payment address of the swap invoice.
|
|
SwapInvoicePaymentAddr [32]byte
|
|
|
|
// HTLCKeys is the set of keys used to sign the HTLC.
|
|
HTLCKeys loopdb.HtlcKeys
|
|
|
|
// HTLCSuccessEstimator is a function that estimates the weight of the
|
|
// HTLC success script.
|
|
HTLCSuccessEstimator func(*input.TxWeightEstimator) error
|
|
|
|
// ProtocolVersion is the protocol version of the swap that the sweep
|
|
// belongs to.
|
|
ProtocolVersion loopdb.ProtocolVersion
|
|
|
|
// IsExternalAddr is true if the sweep spends to a non-wallet address.
|
|
IsExternalAddr bool
|
|
|
|
// DestAddr is the destination address of the sweep.
|
|
DestAddr btcutil.Address
|
|
|
|
// NonCoopHint is set, if the sweep can not be spent cooperatively and
|
|
// has to be spent using preimage. This is only used in fee estimations
|
|
// when selecting a batch for the sweep to minimize fees.
|
|
NonCoopHint bool
|
|
}
|
|
|
|
// SweepFetcher is used to get details of a sweep.
|
|
type SweepFetcher interface {
|
|
// FetchSweep returns details of the sweep with the given hash.
|
|
FetchSweep(ctx context.Context, hash lntypes.Hash) (*SweepInfo, error)
|
|
}
|
|
|
|
// MuSig2SignSweep is a function that can be used to sign a sweep transaction
|
|
// cooperatively with the swap server.
|
|
type MuSig2SignSweep func(ctx context.Context,
|
|
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
|
|
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte,
|
|
prevoutMap map[wire.OutPoint]*wire.TxOut) (
|
|
[]byte, []byte, error)
|
|
|
|
// SignMuSig2 is a function that can be used to sign a sweep transaction in a
|
|
// custom way.
|
|
type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version,
|
|
swapHash lntypes.Hash, rootHash chainhash.Hash, sigHash [32]byte,
|
|
) ([]byte, error)
|
|
|
|
// VerifySchnorrSig is a function that can be used to verify a schnorr
|
|
// signature.
|
|
type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
|
|
|
|
// FeeRateProvider is a function that returns min fee rate of a batch sweeping
|
|
// the UTXO of the swap.
|
|
type FeeRateProvider func(ctx context.Context,
|
|
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
|
|
|
|
// SweepRequest is a request to sweep a specific outpoint.
|
|
type SweepRequest struct {
|
|
// SwapHash is the hash of the swap that is being swept.
|
|
SwapHash lntypes.Hash
|
|
|
|
// Outpoint is the outpoint that is being swept.
|
|
Outpoint wire.OutPoint
|
|
|
|
// Value is the value of the outpoint that is being swept.
|
|
Value btcutil.Amount
|
|
|
|
// Notifier is a notifier that is used to notify the requester of this
|
|
// sweep that the sweep was successful.
|
|
Notifier *SpendNotifier
|
|
}
|
|
|
|
type SpendDetail struct {
|
|
// Tx is the transaction that spent the outpoint.
|
|
Tx *wire.MsgTx
|
|
|
|
// OnChainFeePortion is the fee portion that was paid to get this sweep
|
|
// confirmed on chain. This is the difference between the value of the
|
|
// outpoint and the value of all sweeps that were included in the batch
|
|
// divided by the number of sweeps.
|
|
OnChainFeePortion btcutil.Amount
|
|
}
|
|
|
|
// SpendNotifier is a notifier that is used to notify the requester of a sweep
|
|
// that the sweep was successful.
|
|
type SpendNotifier struct {
|
|
// SpendChan is a channel where the spend details are received.
|
|
SpendChan chan *SpendDetail
|
|
|
|
// SpendErrChan is a channel where spend errors are received.
|
|
SpendErrChan chan error
|
|
|
|
// QuitChan is a channel that can be closed to stop the notifier.
|
|
QuitChan chan bool
|
|
}
|
|
|
|
var (
|
|
ErrBatcherShuttingDown = errors.New("batcher shutting down")
|
|
)
|
|
|
|
// Batcher is a system that is responsible for accepting sweep requests and
|
|
// placing them in appropriate batches. It will spin up new batches as needed.
|
|
type Batcher struct {
|
|
// batches is a map of batch IDs to the currently active batches.
|
|
batches map[int32]*batch
|
|
|
|
// sweepReqs is a channel where sweep requests are received.
|
|
sweepReqs chan SweepRequest
|
|
|
|
// errChan is a channel where errors are received.
|
|
errChan chan error
|
|
|
|
// quit signals that the batch must stop.
|
|
quit chan struct{}
|
|
|
|
// initDone is a channel that is closed when the batcher has been
|
|
// initialized.
|
|
initDone chan struct{}
|
|
|
|
// wallet is the wallet kit client that is used by batches.
|
|
wallet lndclient.WalletKitClient
|
|
|
|
// chainNotifier is the chain notifier client that is used by batches.
|
|
chainNotifier lndclient.ChainNotifierClient
|
|
|
|
// signerClient is the signer client that is used by batches.
|
|
signerClient lndclient.SignerClient
|
|
|
|
// musig2ServerKit includes all the required functionality to collect
|
|
// and verify signatures by the swap server in order to cooperatively
|
|
// sweep funds.
|
|
musig2ServerSign MuSig2SignSweep
|
|
|
|
// verifySchnorrSig is a function that can be used to verify a schnorr
|
|
// signature.
|
|
VerifySchnorrSig VerifySchnorrSig
|
|
|
|
// chainParams are the chain parameters of the chain that is used by
|
|
// batches.
|
|
chainParams *chaincfg.Params
|
|
|
|
// store includes all the database interactions that are needed by the
|
|
// batcher and the batches.
|
|
store BatcherStore
|
|
|
|
// sweepStore is used to load sweeps from the database.
|
|
sweepStore SweepFetcher
|
|
|
|
// wg is a waitgroup that is used to wait for all the goroutines to
|
|
// exit.
|
|
wg sync.WaitGroup
|
|
|
|
// customFeeRate provides custom min fee rate per swap. The batch uses
|
|
// max of the fee rates of its swaps. In this mode confTarget is
|
|
// ignored and fee bumping by sweepbatcher is disabled.
|
|
customFeeRate FeeRateProvider
|
|
|
|
// customMuSig2Signer is a custom signer. If it is set, it is used to
|
|
// create musig2 signatures instead of musig2SignSweep and signerClient.
|
|
// Note that musig2SignSweep must be nil in this case, however signer
|
|
// client must still be provided, as it is used for non-coop spendings.
|
|
customMuSig2Signer SignMuSig2
|
|
}
|
|
|
|
// BatcherConfig holds batcher configuration.
|
|
type BatcherConfig struct {
|
|
// customFeeRate provides custom min fee rate per swap. The batch uses
|
|
// max of the fee rates of its swaps. In this mode confTarget is
|
|
// ignored and fee bumping by sweepbatcher is disabled.
|
|
customFeeRate FeeRateProvider
|
|
|
|
// customMuSig2Signer is a custom signer. If it is set, it is used to
|
|
// create musig2 signatures instead of musig2SignSweep and signerClient.
|
|
// Note that musig2SignSweep must be nil in this case, however signer
|
|
// client must still be provided, as it is used for non-coop spendings.
|
|
customMuSig2Signer SignMuSig2
|
|
}
|
|
|
|
// BatcherOption configures batcher behaviour.
|
|
type BatcherOption func(*BatcherConfig)
|
|
|
|
// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
|
|
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
|
|
// the caller should re-add the sweep by calling AddSweep.
|
|
func WithCustomFeeRate(customFeeRate FeeRateProvider) BatcherOption {
|
|
return func(cfg *BatcherConfig) {
|
|
cfg.customFeeRate = customFeeRate
|
|
}
|
|
}
|
|
|
|
// WithCustomSignMuSig2 instructs sweepbatcher to use a custom function to
|
|
// produce MuSig2 signatures. If it is set, it is used to create
|
|
// musig2 signatures instead of musig2SignSweep and signerClient. Note
|
|
// that musig2SignSweep must be nil in this case, however signerClient
|
|
// must still be provided, as it is used for non-coop spendings.
|
|
func WithCustomSignMuSig2(customMuSig2Signer SignMuSig2) BatcherOption {
|
|
return func(cfg *BatcherConfig) {
|
|
cfg.customMuSig2Signer = customMuSig2Signer
|
|
}
|
|
}
|
|
|
|
// NewBatcher creates a new Batcher instance.
|
|
func NewBatcher(wallet lndclient.WalletKitClient,
|
|
chainNotifier lndclient.ChainNotifierClient,
|
|
signerClient lndclient.SignerClient, musig2ServerSigner MuSig2SignSweep,
|
|
verifySchnorrSig VerifySchnorrSig, chainparams *chaincfg.Params,
|
|
store BatcherStore, sweepStore SweepFetcher,
|
|
opts ...BatcherOption) *Batcher {
|
|
|
|
var cfg BatcherConfig
|
|
for _, opt := range opts {
|
|
opt(&cfg)
|
|
}
|
|
|
|
if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil {
|
|
panic("customMuSig2Signer must not be used with " +
|
|
"musig2ServerSigner")
|
|
}
|
|
|
|
return &Batcher{
|
|
batches: make(map[int32]*batch),
|
|
sweepReqs: make(chan SweepRequest),
|
|
errChan: make(chan error, 1),
|
|
quit: make(chan struct{}),
|
|
initDone: make(chan struct{}),
|
|
wallet: wallet,
|
|
chainNotifier: chainNotifier,
|
|
signerClient: signerClient,
|
|
musig2ServerSign: musig2ServerSigner,
|
|
VerifySchnorrSig: verifySchnorrSig,
|
|
chainParams: chainparams,
|
|
store: store,
|
|
sweepStore: sweepStore,
|
|
customFeeRate: cfg.customFeeRate,
|
|
customMuSig2Signer: cfg.customMuSig2Signer,
|
|
}
|
|
}
|
|
|
|
// Run starts the batcher and processes incoming sweep requests.
|
|
func (b *Batcher) Run(ctx context.Context) error {
|
|
runCtx, cancel := context.WithCancel(ctx)
|
|
defer func() {
|
|
cancel()
|
|
close(b.quit)
|
|
|
|
for _, batch := range b.batches {
|
|
batch.Wait()
|
|
}
|
|
|
|
b.wg.Wait()
|
|
}()
|
|
|
|
// First we fetch all the batches that are not in a confirmed state from
|
|
// the database. We will then resume the execution of these batches.
|
|
batches, err := b.FetchUnconfirmedBatches(runCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, batch := range batches {
|
|
err := b.spinUpBatchFromDB(runCtx, batch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Signal that the batcher has been initialized.
|
|
close(b.initDone)
|
|
|
|
for {
|
|
select {
|
|
case sweepReq := <-b.sweepReqs:
|
|
sweep, err := b.fetchSweep(runCtx, sweepReq)
|
|
if err != nil {
|
|
log.Warnf("fetchSweep failed: %v.", err)
|
|
return err
|
|
}
|
|
|
|
err = b.handleSweep(runCtx, sweep, sweepReq.Notifier)
|
|
if err != nil {
|
|
log.Warnf("handleSweep failed: %v.", err)
|
|
return err
|
|
}
|
|
|
|
case err := <-b.errChan:
|
|
log.Warnf("Batcher received an error: %v.", err)
|
|
return err
|
|
|
|
case <-runCtx.Done():
|
|
log.Infof("Stopping Batcher: run context cancelled.")
|
|
return runCtx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddSweep adds a sweep request to the batcher for handling. This will either
|
|
// place the sweep in an existing batch or create a new one.
|
|
func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
|
|
select {
|
|
case b.sweepReqs <- *sweepReq:
|
|
return nil
|
|
|
|
case <-b.quit:
|
|
return ErrBatcherShuttingDown
|
|
}
|
|
}
|
|
|
|
// handleSweep handles a sweep request by either placing it in an existing
|
|
// batch, or by spinning up a new batch for it.
|
|
func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
|
|
notifier *SpendNotifier) error {
|
|
|
|
completed, err := b.store.GetSweepStatus(ctx, sweep.swapHash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Batcher handling sweep %x, completed=%v", sweep.swapHash[:6],
|
|
completed)
|
|
|
|
// If the sweep has already been completed in a confirmed batch then we
|
|
// can't attach its notifier to the batch as that is no longer running.
|
|
// Instead we directly detect and return the spend here.
|
|
if completed && *notifier != (SpendNotifier{}) {
|
|
// Verify that the parent batch is confirmed. Note that a batch
|
|
// is only considered confirmed after it has received three
|
|
// on-chain confirmations to prevent issues caused by reorgs.
|
|
parentBatch, err := b.store.GetParentBatch(ctx, sweep.swapHash)
|
|
if err != nil {
|
|
log.Errorf("unable to get parent batch for sweep %x: "+
|
|
"%v", sweep.swapHash[:6], err)
|
|
|
|
return err
|
|
}
|
|
|
|
// The parent batch is indeed confirmed, meaning it is complete
|
|
// and we won't be able to attach this sweep to it.
|
|
if parentBatch.State == batchConfirmed {
|
|
return b.monitorSpendAndNotify(
|
|
ctx, sweep, parentBatch.ID, notifier,
|
|
)
|
|
}
|
|
}
|
|
|
|
sweep.notifier = notifier
|
|
|
|
// Check if the sweep is already in a batch. If that is the case, we
|
|
// provide the sweep to that batch and return.
|
|
for _, batch := range b.batches {
|
|
// This is a check to see if a batch is completed. In that case
|
|
// we just lazily delete it and continue our scan.
|
|
if batch.isComplete() {
|
|
delete(b.batches, batch.id)
|
|
continue
|
|
}
|
|
|
|
if batch.sweepExists(sweep.swapHash) {
|
|
accepted, err := batch.addSweep(ctx, sweep)
|
|
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
|
|
return err
|
|
}
|
|
|
|
if !accepted {
|
|
return fmt.Errorf("existing sweep %x was not "+
|
|
"accepted by batch %d",
|
|
sweep.swapHash[:6], batch.id)
|
|
}
|
|
|
|
// The sweep was updated in the batch, our job is done.
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Try to run the greedy algorithm of batch selection to minimize costs.
|
|
err = b.greedyAddSweep(ctx, sweep)
|
|
if err == nil {
|
|
// The greedy algorithm succeeded.
|
|
return nil
|
|
}
|
|
|
|
log.Warnf("Greedy batch selection algorithm failed for sweep %x: %v. "+
|
|
"Falling back to old approach.", sweep.swapHash[:6], err)
|
|
|
|
// If one of the batches accepts the sweep, we provide it to that batch.
|
|
for _, batch := range b.batches {
|
|
accepted, err := batch.addSweep(ctx, sweep)
|
|
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
|
|
return err
|
|
}
|
|
|
|
// If the sweep was accepted by this batch, we return, our job
|
|
// is done.
|
|
if accepted {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// If no batch is capable of accepting the sweep, we spin up a fresh
|
|
// batch and hand the sweep over to it.
|
|
return b.spinUpNewBatch(ctx, sweep)
|
|
}
|
|
|
|
// spinUpNewBatch creates new batch, starts it and adds the sweep to it.
|
|
func (b *Batcher) spinUpNewBatch(ctx context.Context, sweep *sweep) error {
|
|
// Spin up a fresh batch.
|
|
newBatch, err := b.spinUpBatch(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add the sweep to the fresh batch.
|
|
accepted, err := newBatch.addSweep(ctx, sweep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the sweep wasn't accepted by the fresh batch something is wrong,
|
|
// we should return the error.
|
|
if !accepted {
|
|
return fmt.Errorf("sweep %x was not accepted by new batch %d",
|
|
sweep.swapHash[:6], newBatch.id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// spinUpBatch spins up a new batch and returns it.
|
|
func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
|
|
cfg := b.newBatchConfig(defaultMaxTimeoutDistance)
|
|
|
|
switch b.chainParams {
|
|
case &chaincfg.MainNetParams:
|
|
cfg.batchPublishDelay = defaultMainnetPublishDelay
|
|
|
|
default:
|
|
cfg.batchPublishDelay = defaultPublishDelay
|
|
}
|
|
|
|
batchKit := b.newBatchKit()
|
|
|
|
batch := NewBatch(cfg, batchKit)
|
|
|
|
id, err := batch.insertAndAcquireID(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We add the batch to our map of batches and start it.
|
|
b.batches[id] = batch
|
|
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
|
|
err := batch.Run(ctx)
|
|
if err != nil {
|
|
_ = b.writeToErrChan(ctx, err)
|
|
}
|
|
}()
|
|
|
|
return batch, nil
|
|
}
|
|
|
|
// spinUpBatchDB spins up a batch that already existed in storage, then
|
|
// returns it.
|
|
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
|
|
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(dbSweeps) == 0 {
|
|
log.Infof("skipping restored batch %d as it has no sweeps",
|
|
batch.id)
|
|
|
|
// It is safe to drop this empty batch as it has no sweeps.
|
|
err := b.store.DropBatch(ctx, batch.id)
|
|
if err != nil {
|
|
log.Warnf("unable to drop empty batch %d: %v",
|
|
batch.id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
primarySweep := dbSweeps[0]
|
|
|
|
sweeps := make(map[lntypes.Hash]sweep)
|
|
|
|
// Collect feeRate from sweeps and stored batch.
|
|
feeRate := batch.rbfCache.FeeRate
|
|
|
|
for _, dbSweep := range dbSweeps {
|
|
sweep, err := b.convertSweep(ctx, dbSweep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sweeps[sweep.swapHash] = *sweep
|
|
|
|
// Set minFeeRate to max(sweep.minFeeRate) for all sweeps.
|
|
if feeRate < sweep.minFeeRate {
|
|
feeRate = sweep.minFeeRate
|
|
}
|
|
}
|
|
|
|
rbfCache := rbfCache{
|
|
LastHeight: batch.rbfCache.LastHeight,
|
|
FeeRate: feeRate,
|
|
}
|
|
|
|
logger := batchPrefixLogger(fmt.Sprintf("%d", batch.id))
|
|
|
|
batchKit := b.newBatchKit()
|
|
batchKit.id = batch.id
|
|
batchKit.batchTxid = batch.batchTxid
|
|
batchKit.batchPkScript = batch.batchPkScript
|
|
batchKit.state = batch.state
|
|
batchKit.primaryID = primarySweep.SwapHash
|
|
batchKit.sweeps = sweeps
|
|
batchKit.rbfCache = rbfCache
|
|
batchKit.log = logger
|
|
|
|
cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance)
|
|
|
|
newBatch, err := NewBatchFromDB(cfg, batchKit)
|
|
if err != nil {
|
|
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
|
|
}
|
|
|
|
// We add the batch to our map of batches and start it.
|
|
b.batches[batch.id] = newBatch
|
|
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
|
|
err := newBatch.Run(ctx)
|
|
if err != nil {
|
|
_ = b.writeToErrChan(ctx, err)
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchUnconfirmedBatches fetches all the batches from the database that are
|
|
// not in a confirmed state.
|
|
func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
|
|
error) {
|
|
|
|
dbBatches, err := b.store.FetchUnconfirmedSweepBatches(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
batches := make([]*batch, 0, len(dbBatches))
|
|
for _, bch := range dbBatches {
|
|
bch := bch
|
|
|
|
batch := batch{}
|
|
batch.id = bch.ID
|
|
|
|
switch bch.State {
|
|
case batchOpen:
|
|
batch.state = Open
|
|
|
|
case batchClosed:
|
|
batch.state = Closed
|
|
|
|
case batchConfirmed:
|
|
batch.state = Confirmed
|
|
}
|
|
|
|
batch.batchTxid = &bch.BatchTxid
|
|
batch.batchPkScript = bch.BatchPkScript
|
|
|
|
rbfCache := rbfCache{
|
|
LastHeight: bch.LastRbfHeight,
|
|
FeeRate: chainfee.SatPerKWeight(bch.LastRbfSatPerKw),
|
|
}
|
|
batch.rbfCache = rbfCache
|
|
|
|
bchCfg := b.newBatchConfig(bch.MaxTimeoutDistance)
|
|
batch.cfg = &bchCfg
|
|
|
|
batches = append(batches, &batch)
|
|
}
|
|
|
|
return batches, nil
|
|
}
|
|
|
|
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
|
|
// the response back to the response channel.
|
|
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
|
|
parentBatchID int32, notifier *SpendNotifier) error {
|
|
|
|
spendCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Then we get the total amount that was swept by the batch.
|
|
totalSwept, err := b.store.TotalSweptAmount(ctx, parentBatchID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
|
|
spendCtx, &sweep.outpoint, sweep.htlc.PkScript,
|
|
sweep.initiationHeight,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
log.Infof("Batcher monitoring spend for swap %x",
|
|
sweep.swapHash[:6])
|
|
|
|
for {
|
|
select {
|
|
case spend := <-spendChan:
|
|
spendTx := spend.SpendingTx
|
|
// Calculate the fee portion that each sweep
|
|
// should pay for the batch.
|
|
feePortionPerSweep, roundingDifference :=
|
|
getFeePortionForSweep(
|
|
spendTx, len(spendTx.TxIn),
|
|
totalSwept,
|
|
)
|
|
|
|
onChainFeePortion := getFeePortionPaidBySweep(
|
|
spendTx, feePortionPerSweep,
|
|
roundingDifference, sweep,
|
|
)
|
|
|
|
// Notify the requester of the spend
|
|
// with the spend details, including the fee
|
|
// portion for this particular sweep.
|
|
spendDetail := &SpendDetail{
|
|
Tx: spendTx,
|
|
OnChainFeePortion: onChainFeePortion,
|
|
}
|
|
|
|
select {
|
|
case notifier.SpendChan <- spendDetail:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
return
|
|
|
|
case err := <-spendErr:
|
|
select {
|
|
case notifier.SpendErrChan <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
_ = b.writeToErrChan(ctx, err)
|
|
return
|
|
|
|
case <-notifier.QuitChan:
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Batcher) writeToErrChan(ctx context.Context, err error) error {
|
|
select {
|
|
case b.errChan <- err:
|
|
return nil
|
|
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// convertSweep converts a fetched sweep from the database to a sweep that is
|
|
// ready to be processed by the batcher. It loads swap from loopdb by calling
|
|
// method FetchLoopOutSwap.
|
|
func (b *Batcher) convertSweep(ctx context.Context, dbSweep *dbSweep) (
|
|
*sweep, error) {
|
|
|
|
return b.loadSweep(ctx, dbSweep.SwapHash, dbSweep.Outpoint,
|
|
dbSweep.Amount)
|
|
}
|
|
|
|
// LoopOutFetcher is used to load LoopOut swaps from the database.
|
|
// It is implemented by loopdb.SwapStore.
|
|
type LoopOutFetcher interface {
|
|
// FetchLoopOutSwap returns the loop out swap with the given hash.
|
|
FetchLoopOutSwap(ctx context.Context,
|
|
hash lntypes.Hash) (*loopdb.LoopOut, error)
|
|
}
|
|
|
|
// SwapStoreWrapper is LoopOutFetcher wrapper providing SweepFetcher interface.
|
|
type SwapStoreWrapper struct {
|
|
// swapStore is used to load LoopOut swaps from the database.
|
|
swapStore LoopOutFetcher
|
|
|
|
// chainParams are the chain parameters of the chain that is used by
|
|
// batches.
|
|
chainParams *chaincfg.Params
|
|
}
|
|
|
|
// FetchSweep returns details of the sweep with the given hash.
|
|
// Implements SweepFetcher interface.
|
|
func (f *SwapStoreWrapper) FetchSweep(ctx context.Context,
|
|
swapHash lntypes.Hash) (*SweepInfo, error) {
|
|
|
|
swap, err := f.swapStore.FetchLoopOutSwap(ctx, swapHash)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch loop out for %x: %w",
|
|
swapHash[:6], err)
|
|
}
|
|
|
|
htlc, err := utils.GetHtlc(
|
|
swapHash, &swap.Contract.SwapContract, f.chainParams,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get htlc: %w", err)
|
|
}
|
|
|
|
swapPaymentAddr, err := utils.ObtainSwapPaymentAddr(
|
|
swap.Contract.SwapInvoice, f.chainParams,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get payment addr: %w", err)
|
|
}
|
|
|
|
return &SweepInfo{
|
|
ConfTarget: swap.Contract.SweepConfTarget,
|
|
Timeout: swap.Contract.CltvExpiry,
|
|
InitiationHeight: swap.Contract.InitiationHeight,
|
|
HTLC: *htlc,
|
|
Preimage: swap.Contract.Preimage,
|
|
SwapInvoicePaymentAddr: *swapPaymentAddr,
|
|
HTLCKeys: swap.Contract.HtlcKeys,
|
|
HTLCSuccessEstimator: htlc.AddSuccessToEstimator,
|
|
ProtocolVersion: swap.Contract.ProtocolVersion,
|
|
IsExternalAddr: swap.Contract.IsExternalAddr,
|
|
DestAddr: swap.Contract.DestAddr,
|
|
}, nil
|
|
}
|
|
|
|
// NewSweepFetcherFromSwapStore accepts swapStore (e.g. loopdb) and returns
|
|
// a wrapper implementing SweepFetcher interface (suitable for NewBatcher).
|
|
func NewSweepFetcherFromSwapStore(swapStore LoopOutFetcher,
|
|
chainParams *chaincfg.Params) (*SwapStoreWrapper, error) {
|
|
|
|
return &SwapStoreWrapper{
|
|
swapStore: swapStore,
|
|
chainParams: chainParams,
|
|
}, nil
|
|
}
|
|
|
|
// fetchSweep fetches the sweep related information from the database.
|
|
func (b *Batcher) fetchSweep(ctx context.Context,
|
|
sweepReq SweepRequest) (*sweep, error) {
|
|
|
|
return b.loadSweep(ctx, sweepReq.SwapHash, sweepReq.Outpoint,
|
|
sweepReq.Value)
|
|
}
|
|
|
|
// loadSweep loads inputs of sweep from the database and from FeeRateProvider
|
|
// if needed and returns an assembled sweep object.
|
|
func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash,
|
|
outpoint wire.OutPoint, value btcutil.Amount) (*sweep, error) {
|
|
|
|
s, err := b.sweepStore.FetchSweep(ctx, swapHash)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch sweep data for %x: %w",
|
|
swapHash[:6], err)
|
|
}
|
|
|
|
// Find minimum fee rate for the sweep. Use customFeeRate if it is
|
|
// provided, otherwise use wallet's EstimateFeeRate.
|
|
var minFeeRate chainfee.SatPerKWeight
|
|
if b.customFeeRate != nil {
|
|
minFeeRate, err = b.customFeeRate(ctx, swapHash)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch min fee rate "+
|
|
"for %x: %w", swapHash[:6], err)
|
|
}
|
|
if minFeeRate < chainfee.AbsoluteFeePerKwFloor {
|
|
return nil, fmt.Errorf("min fee rate too low (%v) for "+
|
|
"%x", minFeeRate, swapHash[:6])
|
|
}
|
|
} else {
|
|
if s.ConfTarget == 0 {
|
|
log.Warnf("Fee estimation was requested for zero "+
|
|
"confTarget for sweep %x.", swapHash[:6])
|
|
}
|
|
minFeeRate, err = b.wallet.EstimateFeeRate(ctx, s.ConfTarget)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to estimate fee rate "+
|
|
"for %x, confTarget=%d: %w", swapHash[:6],
|
|
s.ConfTarget, err)
|
|
}
|
|
}
|
|
|
|
return &sweep{
|
|
swapHash: swapHash,
|
|
outpoint: outpoint,
|
|
value: value,
|
|
confTarget: s.ConfTarget,
|
|
timeout: s.Timeout,
|
|
initiationHeight: s.InitiationHeight,
|
|
htlc: s.HTLC,
|
|
preimage: s.Preimage,
|
|
swapInvoicePaymentAddr: s.SwapInvoicePaymentAddr,
|
|
htlcKeys: s.HTLCKeys,
|
|
htlcSuccessEstimator: s.HTLCSuccessEstimator,
|
|
protocolVersion: s.ProtocolVersion,
|
|
isExternalAddr: s.IsExternalAddr,
|
|
destAddr: s.DestAddr,
|
|
minFeeRate: minFeeRate,
|
|
nonCoopHint: s.NonCoopHint,
|
|
}, nil
|
|
}
|
|
|
|
// newBatchConfig creates new batch config.
|
|
func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
|
|
return batchConfig{
|
|
maxTimeoutDistance: maxTimeoutDistance,
|
|
noBumping: b.customFeeRate != nil,
|
|
customMuSig2Signer: b.customMuSig2Signer,
|
|
}
|
|
}
|
|
|
|
// newBatchKit creates new batch kit.
|
|
func (b *Batcher) newBatchKit() batchKit {
|
|
return batchKit{
|
|
returnChan: b.sweepReqs,
|
|
wallet: b.wallet,
|
|
chainNotifier: b.chainNotifier,
|
|
signerClient: b.signerClient,
|
|
musig2SignSweep: b.musig2ServerSign,
|
|
verifySchnorrSig: b.VerifySchnorrSig,
|
|
purger: b.AddSweep,
|
|
store: b.store,
|
|
quit: b.quit,
|
|
}
|
|
}
|