2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-09 19:10:47 +00:00
loop/executor.go

263 lines
5.7 KiB
Go
Raw Normal View History

package loop
2019-03-06 20:13:50 +00:00
import (
"context"
2023-11-15 09:20:11 +00:00
"errors"
2019-03-06 20:13:50 +00:00
"fmt"
"strings"
2019-03-06 20:13:50 +00:00
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweep"
"github.com/lightninglabs/loop/sweepbatcher"
2023-11-13 13:50:10 +00:00
"github.com/lightningnetwork/lnd/lntypes"
2019-03-06 20:13:50 +00:00
"github.com/lightningnetwork/lnd/queue"
)
// executorConfig contains executor configuration data.
type executorConfig struct {
lnd *lndclient.LndServices
sweeper *sweep.Sweeper
batcher *sweepbatcher.Batcher
store loopdb.SwapStore
2019-03-06 20:13:50 +00:00
createExpiryTimer func(expiry time.Duration) <-chan time.Time
loopOutMaxParts uint32
totalPaymentTimeout time.Duration
maxPaymentRetries int
cancelSwap func(ctx context.Context, details *outCancelDetails) error
verifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
2019-03-06 20:13:50 +00:00
}
// executor is responsible for executing swaps.
//
// TODO(roasbeef): rename to SubSwapper.
2019-03-06 20:13:50 +00:00
type executor struct {
wg sync.WaitGroup
newSwaps chan genericSwap
currentHeight uint32
ready chan struct{}
2023-11-13 13:50:10 +00:00
sync.Mutex
2019-03-06 20:13:50 +00:00
executorConfig
}
// newExecutor returns a new swap executor instance.
func newExecutor(cfg *executorConfig) *executor {
return &executor{
executorConfig: *cfg,
newSwaps: make(chan genericSwap),
ready: make(chan struct{}),
}
}
// run starts the executor event loop. It accepts and executes new swaps,
// providing them with required config data.
func (s *executor) run(mainCtx context.Context,
2023-11-13 13:50:10 +00:00
statusChan chan<- SwapInfo,
abandonChans map[lntypes.Hash]chan struct{}) error {
2019-03-06 20:13:50 +00:00
var (
err error
blockEpochChan <-chan int32
blockErrorChan <-chan error
batcherErrChan chan error
)
for {
blockEpochChan, blockErrorChan, err =
s.lnd.ChainNotifier.RegisterBlockEpochNtfn(mainCtx)
2023-08-10 12:51:38 +00:00
if err == nil {
break
}
if strings.Contains(err.Error(),
"in the process of starting") {
log.Warnf("LND chain notifier server not ready yet, " +
"retrying with delay")
// Give chain notifier some time to start and try to
// re-attempt block epoch subscription.
select {
case <-time.After(500 * time.Millisecond):
continue
case <-mainCtx.Done():
return err
}
}
2023-08-10 12:51:38 +00:00
return err
2019-03-06 20:13:50 +00:00
}
2023-08-10 12:51:38 +00:00
// Before starting, make sure we have an up-to-date block height.
// Otherwise, we might reveal a preimage for a swap that is already
2019-03-06 20:13:50 +00:00
// expired.
2023-08-10 12:51:38 +00:00
log.Infof("Wait for first block notification")
2019-03-06 20:13:50 +00:00
var height int32
setHeight := func(h int32) {
height = h
atomic.StoreUint32(&s.currentHeight, uint32(h))
}
select {
case h := <-blockEpochChan:
2019-10-07 15:29:24 +00:00
setHeight(h)
2019-03-06 20:13:50 +00:00
case err := <-blockErrorChan:
return err
case <-mainCtx.Done():
return mainCtx.Err()
}
batcherErrChan = make(chan error, 1)
s.wg.Add(1)
go func() {
defer s.wg.Done()
err := s.batcher.Run(mainCtx)
if err != nil {
select {
case batcherErrChan <- err:
case <-mainCtx.Done():
}
}
}()
2019-03-06 20:13:50 +00:00
// Start main event loop.
2019-10-28 16:06:07 +00:00
log.Infof("Starting event loop at height %v", height)
2019-03-06 20:13:50 +00:00
2023-08-10 12:51:38 +00:00
// Signal that executor being ready with an up-to-date block height.
2019-03-06 20:13:50 +00:00
close(s.ready)
// Use a map to administer the individual notification queues for the
// swaps.
blockEpochQueues := make(map[int]*queue.ConcurrentQueue)
// On exit, stop all queue goroutines.
defer func() {
for _, queue := range blockEpochQueues {
queue.Stop()
}
}()
swapDoneChan := make(chan int)
nextSwapID := 0
2019-03-06 20:13:50 +00:00
for {
select {
case newSwap := <-s.newSwaps:
queue := queue.NewConcurrentQueue(10)
queue.Start()
swapID := nextSwapID
blockEpochQueues[swapID] = queue
s.wg.Add(1)
go func() {
defer s.wg.Done()
2021-05-24 06:40:15 +00:00
err := newSwap.execute(mainCtx, &executeConfig{
2023-12-03 05:15:51 +00:00
statusChan: statusChan,
sweeper: s.sweeper,
batcher: s.batcher,
2023-12-03 05:15:51 +00:00
blockEpochChan: queue.ChanOut(),
timerFactory: s.executorConfig.createExpiryTimer,
loopOutMaxParts: s.executorConfig.loopOutMaxParts,
totalPaymentTimeout: s.executorConfig.totalPaymentTimeout,
maxPaymentRetries: s.executorConfig.maxPaymentRetries,
cancelSwap: s.executorConfig.cancelSwap,
verifySchnorrSig: s.executorConfig.verifySchnorrSig,
2019-03-06 20:13:50 +00:00
}, height)
2023-11-15 09:20:11 +00:00
if err != nil && !errors.Is(
err, context.Canceled,
) {
2021-05-24 06:40:15 +00:00
log.Errorf("Execute error: %v", err)
}
2019-03-06 20:13:50 +00:00
2023-11-13 13:50:10 +00:00
// If a loop-in ended we have to remove its
// abandon channel from our abandonChans map
// since the swap finalized.
if swap, ok := newSwap.(*loopInSwap); ok {
s.Lock()
delete(abandonChans, swap.hash)
s.Unlock()
}
2019-03-06 20:13:50 +00:00
select {
case swapDoneChan <- swapID:
case <-mainCtx.Done():
}
}()
nextSwapID++
2019-03-06 20:13:50 +00:00
case doneID := <-swapDoneChan:
queue, ok := blockEpochQueues[doneID]
if !ok {
return fmt.Errorf(
"swap id %v not found in queues",
doneID)
}
queue.Stop()
delete(blockEpochQueues, doneID)
case h := <-blockEpochChan:
2019-10-07 15:29:24 +00:00
setHeight(h)
2019-03-06 20:13:50 +00:00
for _, queue := range blockEpochQueues {
select {
2019-10-07 15:29:24 +00:00
case queue.ChanIn() <- h:
2019-03-06 20:13:50 +00:00
case <-mainCtx.Done():
return mainCtx.Err()
}
}
case err := <-blockErrorChan:
return fmt.Errorf("block error: %v", err)
case err := <-batcherErrChan:
return fmt.Errorf("batcher error: %v", err)
2019-03-06 20:13:50 +00:00
case <-mainCtx.Done():
return mainCtx.Err()
}
}
}
// initiateSwap delivers a new swap to the executor main loop.
func (s *executor) initiateSwap(ctx context.Context,
swap genericSwap) {
select {
case s.newSwaps <- swap:
case <-ctx.Done():
return
}
}
// height returns the current height known to the swap server.
func (s *executor) height() int32 {
return int32(atomic.LoadUint32(&s.currentHeight))
}
// waitFinished waits for all swap goroutines to finish.
func (s *executor) waitFinished() {
s.wg.Wait()
}