mirror of
https://github.com/lightninglabs/loop
synced 2024-11-04 06:00:21 +00:00
7578349220
This commit changes the way the reservation manager works, by not exiting when the fetchL402 call fails. This ensures that a client can still run loop, even if they don't have outbound capacity or the loop server is offline.
350 lines
8.2 KiB
Go
350 lines
8.2 KiB
Go
package reservation
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcec/v2"
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
|
|
"github.com/lightninglabs/loop/fsm"
|
|
reservationrpc "github.com/lightninglabs/loop/swapserverrpc"
|
|
)
|
|
|
|
// Manager manages the reservation state machines.
|
|
type Manager struct {
|
|
// cfg contains all the services that the reservation manager needs to
|
|
// operate.
|
|
cfg *Config
|
|
|
|
// activeReservations contains all the active reservationsFSMs.
|
|
activeReservations map[ID]*FSM
|
|
|
|
// hasL402 is true if the client has a valid L402.
|
|
hasL402 bool
|
|
|
|
runCtx context.Context
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// NewManager creates a new reservation manager.
|
|
func NewManager(cfg *Config) *Manager {
|
|
return &Manager{
|
|
cfg: cfg,
|
|
activeReservations: make(map[ID]*FSM),
|
|
}
|
|
}
|
|
|
|
// Run runs the reservation manager.
|
|
func (m *Manager) Run(ctx context.Context, height int32) error {
|
|
log.Debugf("Starting reservation manager")
|
|
|
|
runCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
m.runCtx = runCtx
|
|
currentHeight := height
|
|
|
|
err := m.RecoverReservations(runCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
|
|
RegisterBlockEpochNtfn(runCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reservationResChan := make(
|
|
chan *reservationrpc.ServerReservationNotification,
|
|
)
|
|
|
|
err = m.RegisterReservationNotifications(reservationResChan)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case height := <-newBlockChan:
|
|
log.Debugf("Received block %v", height)
|
|
currentHeight = height
|
|
|
|
case reservationRes := <-reservationResChan:
|
|
log.Debugf("Received reservation %x",
|
|
reservationRes.ReservationId)
|
|
_, err := m.newReservation(
|
|
runCtx, uint32(currentHeight), reservationRes,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
case err := <-newBlockErrChan:
|
|
return err
|
|
|
|
case <-runCtx.Done():
|
|
log.Debugf("Stopping reservation manager")
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// newReservation creates a new reservation from the reservation request.
|
|
func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
|
|
req *reservationrpc.ServerReservationNotification) (*FSM, error) {
|
|
|
|
var reservationID ID
|
|
err := reservationID.FromByteSlice(
|
|
req.ReservationId,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serverKey, err := btcec.ParsePubKey(req.ServerKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the reservation state machine. We need to pass in the runCtx
|
|
// of the reservation manager so that the state machine will keep on
|
|
// running even if the grpc conte
|
|
reservationFSM := NewFSM(
|
|
ctx, m.cfg,
|
|
)
|
|
|
|
// Add the reservation to the active reservations map.
|
|
m.Lock()
|
|
m.activeReservations[reservationID] = reservationFSM
|
|
m.Unlock()
|
|
|
|
initContext := &InitReservationContext{
|
|
reservationID: reservationID,
|
|
serverPubkey: serverKey,
|
|
value: btcutil.Amount(req.Value),
|
|
expiry: req.Expiry,
|
|
heightHint: currentHeight,
|
|
}
|
|
|
|
// Send the init event to the state machine.
|
|
go func() {
|
|
err = reservationFSM.SendEvent(OnServerRequest, initContext)
|
|
if err != nil {
|
|
log.Errorf("Error sending init event: %v", err)
|
|
}
|
|
}()
|
|
|
|
// We'll now wait for the reservation to be in the state where it is
|
|
// waiting to be confirmed.
|
|
err = reservationFSM.DefaultObserver.WaitForState(
|
|
ctx, 5*time.Second, WaitForConfirmation,
|
|
fsm.WithWaitForStateOption(time.Second),
|
|
)
|
|
if err != nil {
|
|
if reservationFSM.LastActionError != nil {
|
|
return nil, fmt.Errorf("error waiting for "+
|
|
"state: %v, last action error: %v",
|
|
err, reservationFSM.LastActionError)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return reservationFSM, nil
|
|
}
|
|
|
|
// fetchL402 fetches the L402 from the server. This method will keep on
|
|
// retrying until it gets a valid response.
|
|
func (m *Manager) fetchL402(ctx context.Context) {
|
|
// Add a 0 timer so that we initially fetch the L402 immediately.
|
|
timer := time.NewTimer(0)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case <-timer.C:
|
|
err := m.cfg.FetchL402(ctx)
|
|
if err != nil {
|
|
log.Warnf("Error fetching L402: %v", err)
|
|
timer.Reset(time.Second * 10)
|
|
continue
|
|
}
|
|
m.hasL402 = true
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// RegisterReservationNotifications registers a new reservation notification
|
|
// stream.
|
|
func (m *Manager) RegisterReservationNotifications(
|
|
reservationChan chan *reservationrpc.ServerReservationNotification) error {
|
|
|
|
// In order to create a valid lsat we first are going to call
|
|
// the FetchL402 method. As a client might not have outbound capacity
|
|
// yet, we'll retry until we get a valid response.
|
|
if !m.hasL402 {
|
|
m.fetchL402(m.runCtx)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(m.runCtx)
|
|
|
|
// We'll now subscribe to the reservation notifications.
|
|
reservationStream, err := m.cfg.ReservationClient.
|
|
ReservationNotificationStream(
|
|
ctx, &reservationrpc.ReservationNotificationRequest{},
|
|
)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Successfully subscribed to reservation notifications")
|
|
|
|
// We'll now start a goroutine that will forward all the reservation
|
|
// notifications to the reservationChan.
|
|
go func() {
|
|
for {
|
|
reservationRes, err := reservationStream.Recv()
|
|
if err == nil && reservationRes != nil {
|
|
log.Debugf("Received reservation %x",
|
|
reservationRes.ReservationId)
|
|
reservationChan <- reservationRes
|
|
continue
|
|
}
|
|
log.Errorf("Error receiving "+
|
|
"reservation: %v", err)
|
|
|
|
cancel()
|
|
|
|
// If we encounter an error, we'll
|
|
// try to reconnect.
|
|
for {
|
|
select {
|
|
case <-m.runCtx.Done():
|
|
return
|
|
|
|
case <-time.After(time.Second * 10):
|
|
log.Debugf("Reconnecting to " +
|
|
"reservation notifications")
|
|
err = m.RegisterReservationNotifications(
|
|
reservationChan,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Error "+
|
|
"reconnecting: %v", err)
|
|
continue
|
|
}
|
|
|
|
// If we were able to reconnect, we'll
|
|
// return.
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// RecoverReservations tries to recover all reservations that are still active
|
|
// from the database.
|
|
func (m *Manager) RecoverReservations(ctx context.Context) error {
|
|
reservations, err := m.cfg.Store.ListReservations(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, reservation := range reservations {
|
|
if isFinalState(reservation.State) {
|
|
continue
|
|
}
|
|
|
|
log.Debugf("Recovering reservation %x", reservation.ID)
|
|
|
|
fsmCtx := context.WithValue(ctx, reservation.ID, nil)
|
|
|
|
reservationFSM := NewFSMFromReservation(
|
|
fsmCtx, m.cfg, reservation,
|
|
)
|
|
|
|
m.activeReservations[reservation.ID] = reservationFSM
|
|
|
|
// As SendEvent can block, we'll start a goroutine to process
|
|
// the event.
|
|
go func() {
|
|
err := reservationFSM.SendEvent(OnRecover, nil)
|
|
if err != nil {
|
|
log.Errorf("FSM %v Error sending recover "+
|
|
"event %v, state: %v",
|
|
reservationFSM.reservation.ID, err,
|
|
reservationFSM.reservation.State)
|
|
}
|
|
}()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetReservations retrieves all reservations from the database.
|
|
func (m *Manager) GetReservations(ctx context.Context) ([]*Reservation, error) {
|
|
return m.cfg.Store.ListReservations(ctx)
|
|
}
|
|
|
|
// GetReservation returns the reservation for the given id.
|
|
func (m *Manager) GetReservation(ctx context.Context, id ID) (*Reservation,
|
|
error) {
|
|
|
|
return m.cfg.Store.GetReservation(ctx, id)
|
|
}
|
|
|
|
// LockReservation locks the reservation with the given ID.
|
|
func (m *Manager) LockReservation(ctx context.Context, id ID) error {
|
|
// Try getting the reservation from the active reservations map.
|
|
m.Lock()
|
|
reservation, ok := m.activeReservations[id]
|
|
m.Unlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("reservation not found")
|
|
}
|
|
|
|
// Try to send the lock event to the reservation.
|
|
err := reservation.SendEvent(OnLocked, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UnlockReservation unlocks the reservation with the given ID.
|
|
func (m *Manager) UnlockReservation(ctx context.Context, id ID) error {
|
|
// Try getting the reservation from the active reservations map.
|
|
m.Lock()
|
|
reservation, ok := m.activeReservations[id]
|
|
m.Unlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("reservation not found")
|
|
}
|
|
|
|
// Try to send the unlock event to the reservation.
|
|
err := reservation.SendEvent(OnUnlocked, nil)
|
|
if err != nil && strings.Contains(err.Error(), "config error") {
|
|
// If the error is a config error, we can ignore it, as the
|
|
// reservation is already unlocked.
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|