mirror of https://github.com/lightninglabs/loop
staticaddr: deposit manager and fsm
parent
55148db070
commit
e31fc04774
@ -0,0 +1,140 @@
|
||||
package deposit
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightninglabs/lndclient"
|
||||
"github.com/lightninglabs/loop/fsm"
|
||||
"github.com/lightninglabs/loop/staticaddr/script"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConfTarget = 3
|
||||
)
|
||||
|
||||
// PublishDepositExpirySweepAction creates and publishes the timeout transaction
|
||||
// that spends the deposit from the static address timeout leaf to the
|
||||
// predefined timeout sweep pkscript.
|
||||
func (f *FSM) PublishDepositExpirySweepAction(_ fsm.EventContext) fsm.EventType {
|
||||
msgTx := wire.NewMsgTx(2)
|
||||
|
||||
params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx)
|
||||
if err != nil {
|
||||
return fsm.OnError
|
||||
}
|
||||
|
||||
// Add the deposit outpoint as input to the transaction.
|
||||
msgTx.AddTxIn(&wire.TxIn{
|
||||
PreviousOutPoint: f.deposit.OutPoint,
|
||||
Sequence: params.Expiry,
|
||||
SignatureScript: nil,
|
||||
})
|
||||
|
||||
// Estimate the fee rate of an expiry spend transaction.
|
||||
feeRateEstimator, err := f.cfg.WalletKit.EstimateFeeRate(
|
||||
f.ctx, defaultConfTarget,
|
||||
)
|
||||
if err != nil {
|
||||
return f.HandleError(fmt.Errorf("timeout sweep fee "+
|
||||
"estimation failed: %v", err))
|
||||
}
|
||||
|
||||
weight := script.ExpirySpendWeight()
|
||||
|
||||
fee := feeRateEstimator.FeeForWeight(weight)
|
||||
|
||||
// We cap the fee at 20% of the deposit value.
|
||||
if fee > f.deposit.Value/5 {
|
||||
return f.HandleError(errors.New("fee is greater than 20% of " +
|
||||
"the deposit value"))
|
||||
}
|
||||
|
||||
output := &wire.TxOut{
|
||||
Value: int64(f.deposit.Value - fee),
|
||||
PkScript: f.deposit.TimeOutSweepPkScript,
|
||||
}
|
||||
msgTx.AddTxOut(output)
|
||||
|
||||
txOut := &wire.TxOut{
|
||||
Value: int64(f.deposit.Value),
|
||||
PkScript: params.PkScript,
|
||||
}
|
||||
|
||||
prevOut := []*wire.TxOut{txOut}
|
||||
|
||||
signDesc, err := f.SignDescriptor()
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
rawSigs, err := f.cfg.Signer.SignOutputRaw(
|
||||
f.ctx, msgTx, []*lndclient.SignDescriptor{signDesc}, prevOut,
|
||||
)
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx)
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
sig := rawSigs[0]
|
||||
msgTx.TxIn[0].Witness, err = address.GenTimeoutWitness(sig)
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
err = f.cfg.WalletKit.PublishTransaction(
|
||||
f.ctx, msgTx, f.deposit.OutPoint.Hash.String()+"-close-sweep",
|
||||
)
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
f.Debugf("published timeout sweep with txid: %v", msgTx.TxHash())
|
||||
|
||||
return OnExpiryPublished
|
||||
}
|
||||
|
||||
// WaitForExpirySweepAction waits for a sufficient number of confirmations
|
||||
// before a timeout sweep is considered successful.
|
||||
func (f *FSM) WaitForExpirySweepAction(_ fsm.EventContext) fsm.EventType {
|
||||
spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
|
||||
f.ctx, nil, f.deposit.TimeOutSweepPkScript, defaultConfTarget,
|
||||
int32(f.deposit.ConfirmationHeight),
|
||||
)
|
||||
if err != nil {
|
||||
return f.HandleError(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errSpendChan:
|
||||
log.Debugf("error while sweeping expired deposit: %v", err)
|
||||
return fsm.OnError
|
||||
|
||||
case <-spendChan:
|
||||
return OnExpirySwept
|
||||
|
||||
case <-f.ctx.Done():
|
||||
return fsm.OnError
|
||||
}
|
||||
}
|
||||
|
||||
// SweptExpiredDepositAction is the final action of the FSM. It signals to the
|
||||
// manager that the deposit has been swept and the FSM can be removed. It also
|
||||
// ends the state machine main loop by cancelling its context.
|
||||
func (f *FSM) SweptExpiredDepositAction(_ fsm.EventContext) fsm.EventType {
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
return fsm.OnError
|
||||
|
||||
default:
|
||||
f.finalizedDepositChan <- f.deposit.OutPoint
|
||||
f.ctx.Done()
|
||||
}
|
||||
|
||||
return fsm.NoOp
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
package deposit
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightninglabs/loop/fsm"
|
||||
)
|
||||
|
||||
// ID is a unique identifier for a deposit.
|
||||
type ID [IdLength]byte
|
||||
|
||||
// FromByteSlice creates a deposit id from a byte slice.
|
||||
func (r *ID) FromByteSlice(b []byte) error {
|
||||
if len(b) != IdLength {
|
||||
return fmt.Errorf("deposit id must be 32 bytes, got %d, %x",
|
||||
len(b), b)
|
||||
}
|
||||
|
||||
copy(r[:], b)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deposit bundles an utxo at a static address together with manager-relevant
|
||||
// data.
|
||||
type Deposit struct {
|
||||
// ID is the unique identifier of the deposit.
|
||||
ID ID
|
||||
|
||||
// State is the current state of the deposit.
|
||||
State fsm.StateType
|
||||
|
||||
// The outpoint of the deposit.
|
||||
wire.OutPoint
|
||||
|
||||
// Value is the amount of the deposit.
|
||||
Value btcutil.Amount
|
||||
|
||||
// ConfirmationHeight is the absolute height at which the deposit was
|
||||
// first confirmed.
|
||||
ConfirmationHeight int64
|
||||
|
||||
// TimeOutSweepPkScript is the pk script that is used to sweep the
|
||||
// deposit to after it is expired.
|
||||
TimeOutSweepPkScript []byte
|
||||
}
|
||||
|
||||
// IsPending returns true if the deposit is pending.
|
||||
func (d *Deposit) IsPending() bool {
|
||||
return !d.IsFinal()
|
||||
}
|
||||
|
||||
// IsFinal returns true if the deposit is final.
|
||||
func (d *Deposit) IsFinal() bool {
|
||||
return d.State == Expired || d.State == Failed
|
||||
}
|
||||
|
||||
// GetRandomDepositID generates a random deposit ID.
|
||||
func GetRandomDepositID() (ID, error) {
|
||||
var id ID
|
||||
_, err := rand.Read(id[:])
|
||||
return id, err
|
||||
}
|
@ -0,0 +1,297 @@
|
||||
package deposit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/txscript"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightninglabs/lndclient"
|
||||
"github.com/lightninglabs/loop/fsm"
|
||||
"github.com/lightninglabs/loop/staticaddr"
|
||||
"github.com/lightninglabs/loop/staticaddr/address"
|
||||
"github.com/lightninglabs/loop/staticaddr/script"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultObserverSize = 20
|
||||
)
|
||||
|
||||
var (
|
||||
ErrProtocolVersionNotSupported = errors.New("protocol version not " +
|
||||
"supported")
|
||||
)
|
||||
|
||||
// States.
|
||||
var (
|
||||
Deposited = fsm.StateType("Deposited")
|
||||
|
||||
PublishExpiredDeposit = fsm.StateType("PublishExpiredDeposit")
|
||||
|
||||
WaitForExpirySweep = fsm.StateType("WaitForExpirySweep")
|
||||
|
||||
Expired = fsm.StateType("Expired")
|
||||
|
||||
Failed = fsm.StateType("DepositFailed")
|
||||
)
|
||||
|
||||
// Events.
|
||||
var (
|
||||
OnStart = fsm.EventType("OnStart")
|
||||
OnExpiry = fsm.EventType("OnExpiry")
|
||||
OnExpiryPublished = fsm.EventType("OnExpiryPublished")
|
||||
OnExpirySwept = fsm.EventType("OnExpirySwept")
|
||||
OnRecover = fsm.EventType("OnRecover")
|
||||
)
|
||||
|
||||
// FSM is the state machine that handles the instant out.
|
||||
type FSM struct {
|
||||
*fsm.StateMachine
|
||||
|
||||
cfg *ManagerConfig
|
||||
|
||||
deposit *Deposit
|
||||
|
||||
params *address.Parameters
|
||||
|
||||
address *script.StaticAddress
|
||||
|
||||
ctx context.Context
|
||||
|
||||
blockNtfnChan chan uint32
|
||||
|
||||
finalizedDepositChan chan wire.OutPoint
|
||||
}
|
||||
|
||||
// NewFSM creates a new state machine that can action on all static address
|
||||
// feature requests.
|
||||
func NewFSM(ctx context.Context, deposit *Deposit, cfg *ManagerConfig,
|
||||
finalizedDepositChan chan wire.OutPoint,
|
||||
recoverStateMachine bool) (*FSM, error) {
|
||||
|
||||
params, err := cfg.AddressManager.GetStaticAddressParameters(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get static address "+
|
||||
"parameters: %v", err)
|
||||
}
|
||||
|
||||
address, err := cfg.AddressManager.GetStaticAddress(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get static address: %v", err)
|
||||
}
|
||||
|
||||
depoFsm := &FSM{
|
||||
cfg: cfg,
|
||||
deposit: deposit,
|
||||
params: params,
|
||||
address: address,
|
||||
ctx: ctx,
|
||||
blockNtfnChan: make(chan uint32),
|
||||
finalizedDepositChan: finalizedDepositChan,
|
||||
}
|
||||
|
||||
depositStates := depoFsm.DepositStatesV0()
|
||||
switch params.ProtocolVersion {
|
||||
case staticaddr.ProtocolVersion_V0:
|
||||
|
||||
default:
|
||||
return nil, ErrProtocolVersionNotSupported
|
||||
}
|
||||
|
||||
if recoverStateMachine {
|
||||
depoFsm.StateMachine = fsm.NewStateMachineWithState(
|
||||
depositStates, deposit.State,
|
||||
DefaultObserverSize,
|
||||
)
|
||||
} else {
|
||||
depoFsm.StateMachine = fsm.NewStateMachine(
|
||||
depositStates, DefaultObserverSize,
|
||||
)
|
||||
}
|
||||
|
||||
depoFsm.ActionEntryFunc = depoFsm.updateDeposit
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case currentHeight := <-depoFsm.blockNtfnChan:
|
||||
err := depoFsm.handleBlockNotification(
|
||||
currentHeight,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("error handling block "+
|
||||
"notification: %v", err)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return depoFsm, nil
|
||||
}
|
||||
|
||||
func (f *FSM) handleBlockNotification(currentHeight uint32) error {
|
||||
params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
isExpired := func() bool {
|
||||
return currentHeight >= uint32(f.deposit.ConfirmationHeight)+
|
||||
params.Expiry
|
||||
}
|
||||
|
||||
if isExpired() && f.deposit.State != WaitForExpirySweep &&
|
||||
!f.deposit.IsFinal() {
|
||||
|
||||
go func() {
|
||||
err := f.SendEvent(OnExpiry, nil)
|
||||
if err != nil {
|
||||
log.Debugf("error sending OnExpiry event: %v",
|
||||
err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DepositStatesV0 returns the states a deposit can be in.
|
||||
func (f *FSM) DepositStatesV0() fsm.States {
|
||||
return fsm.States{
|
||||
fsm.EmptyState: fsm.State{
|
||||
Transitions: fsm.Transitions{
|
||||
OnStart: Deposited,
|
||||
},
|
||||
Action: fsm.NoOpAction,
|
||||
},
|
||||
Deposited: fsm.State{
|
||||
Transitions: fsm.Transitions{
|
||||
OnExpiry: PublishExpiredDeposit,
|
||||
OnRecover: Deposited,
|
||||
},
|
||||
Action: fsm.NoOpAction,
|
||||
},
|
||||
PublishExpiredDeposit: fsm.State{
|
||||
Transitions: fsm.Transitions{
|
||||
OnRecover: PublishExpiredDeposit,
|
||||
OnExpiryPublished: WaitForExpirySweep,
|
||||
// If the timeout sweep failed we go back to
|
||||
// Deposited, hoping that another timeout sweep
|
||||
// attempt will be successful. Alternatively,
|
||||
// the client can try to coop-spend the deposit.
|
||||
fsm.OnError: Deposited,
|
||||
},
|
||||
Action: f.PublishDepositExpirySweepAction,
|
||||
},
|
||||
WaitForExpirySweep: fsm.State{
|
||||
Transitions: fsm.Transitions{
|
||||
OnExpirySwept: Expired,
|
||||
OnRecover: PublishExpiredDeposit,
|
||||
// If the timeout sweep failed we go back to
|
||||
// Deposited, hoping that another timeout sweep
|
||||
// attempt will be successful. Alternatively,
|
||||
// the client can try to coop-spend the deposit.
|
||||
fsm.OnError: Deposited,
|
||||
},
|
||||
Action: f.WaitForExpirySweepAction,
|
||||
},
|
||||
Expired: fsm.State{
|
||||
Transitions: fsm.Transitions{
|
||||
OnExpiry: Expired,
|
||||
},
|
||||
Action: f.SweptExpiredDepositAction,
|
||||
},
|
||||
Failed: fsm.State{
|
||||
Action: fsm.NoOpAction,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// DepositEntryFunction is called after every action and updates the deposit in
|
||||
// the db.
|
||||
func (f *FSM) updateDeposit(notification fsm.Notification) {
|
||||
if f.deposit == nil {
|
||||
return
|
||||
}
|
||||
|
||||
f.Debugf("NextState: %v, PreviousState: %v, Event: %v",
|
||||
notification.NextState, notification.PreviousState,
|
||||
notification.Event,
|
||||
)
|
||||
|
||||
f.deposit.State = notification.NextState
|
||||
|
||||
// Don't update the deposit if we are in an initial state or if we
|
||||
// are transitioning from an initial state to a failed state.
|
||||
state := f.deposit.State
|
||||
if state == fsm.EmptyState || state == Deposited ||
|
||||
(notification.PreviousState == Deposited && state == Failed) {
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
err := f.cfg.Store.UpdateDeposit(f.ctx, f.deposit)
|
||||
if err != nil {
|
||||
f.Errorf("unable to update deposit: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Infof logs an info message with the deposit outpoint.
|
||||
func (f *FSM) Infof(format string, args ...interface{}) {
|
||||
log.Infof(
|
||||
"Deposit %v: "+format,
|
||||
append(
|
||||
[]interface{}{f.deposit.OutPoint},
|
||||
args...,
|
||||
)...,
|
||||
)
|
||||
}
|
||||
|
||||
// Debugf logs a debug message with the deposit outpoint.
|
||||
func (f *FSM) Debugf(format string, args ...interface{}) {
|
||||
log.Debugf(
|
||||
"Deposit %v: "+format,
|
||||
append(
|
||||
[]interface{}{f.deposit.OutPoint},
|
||||
args...,
|
||||
)...,
|
||||
)
|
||||
}
|
||||
|
||||
// Errorf logs an error message with the deposit outpoint.
|
||||
func (f *FSM) Errorf(format string, args ...interface{}) {
|
||||
log.Errorf(
|
||||
"Deposit %v: "+format,
|
||||
append(
|
||||
[]interface{}{f.deposit.OutPoint},
|
||||
args...,
|
||||
)...,
|
||||
)
|
||||
}
|
||||
|
||||
// SignDescriptor returns the sign descriptor for the static address output.
|
||||
func (f *FSM) SignDescriptor() (*lndclient.SignDescriptor, error) {
|
||||
address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &lndclient.SignDescriptor{
|
||||
WitnessScript: address.TimeoutLeaf.Script,
|
||||
KeyDesc: keychain.KeyDescriptor{
|
||||
PubKey: f.params.ClientPubkey,
|
||||
},
|
||||
Output: wire.NewTxOut(
|
||||
int64(f.deposit.Value), f.params.PkScript,
|
||||
),
|
||||
HashType: txscript.SigHashDefault,
|
||||
InputIndex: 0,
|
||||
SignMethod: input.TaprootScriptSpendSignMethod,
|
||||
}, nil
|
||||
}
|
@ -0,0 +1,420 @@
|
||||
package deposit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btcd/txscript"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightninglabs/lndclient"
|
||||
"github.com/lightninglabs/loop"
|
||||
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
|
||||
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
)
|
||||
|
||||
const (
|
||||
// PollInterval is the interval in which we poll for new deposits to our
|
||||
// static address.
|
||||
PollInterval = 10 * time.Second
|
||||
|
||||
// MinConfs is the minimum number of confirmations we require for a
|
||||
// deposit to be considered available for loop-ins, coop-spends and
|
||||
// timeouts.
|
||||
MinConfs = 3
|
||||
|
||||
// MaxConfs is unset since we don't require a max number of
|
||||
// confirmations for deposits.
|
||||
MaxConfs = 0
|
||||
)
|
||||
|
||||
// ManagerConfig holds the configuration for the address manager.
|
||||
type ManagerConfig struct {
|
||||
// AddressClient is the client that communicates with the loop server
|
||||
// to manage static addresses.
|
||||
AddressClient staticaddressrpc.StaticAddressServerClient
|
||||
|
||||
AddressManager AddressManager
|
||||
|
||||
// SwapClient provides loop rpc functionality.
|
||||
SwapClient *loop.Client
|
||||
|
||||
// Store is the database store that is used to store static address
|
||||
// related records.
|
||||
Store Store
|
||||
|
||||
// WalletKit is the wallet client that is used to derive new keys from
|
||||
// lnd's wallet.
|
||||
WalletKit lndclient.WalletKitClient
|
||||
|
||||
// ChainParams is the chain configuration(mainnet, testnet...) this
|
||||
// manager uses.
|
||||
ChainParams *chaincfg.Params
|
||||
|
||||
// ChainNotifier is the chain notifier that is used to listen for new
|
||||
// blocks.
|
||||
ChainNotifier lndclient.ChainNotifierClient
|
||||
|
||||
// Signer is the signer client that is used to sign transactions.
|
||||
Signer lndclient.SignerClient
|
||||
}
|
||||
|
||||
// Manager manages the address state machines.
|
||||
type Manager struct {
|
||||
cfg *ManagerConfig
|
||||
|
||||
runCtx context.Context
|
||||
|
||||
sync.Mutex
|
||||
|
||||
// initChan signals the daemon that the address manager has completed
|
||||
// its initialization.
|
||||
initChan chan struct{}
|
||||
|
||||
// activeDeposits contains all the active static address outputs.
|
||||
activeDeposits map[wire.OutPoint]*FSM
|
||||
|
||||
// initiationHeight stores the currently best known block height.
|
||||
initiationHeight uint32
|
||||
|
||||
// currentHeight stores the currently best known block height.
|
||||
currentHeight uint32
|
||||
|
||||
// deposits contains all the deposits that have ever been made to the
|
||||
// static address. This field is used to store and recover deposits. It
|
||||
// also serves as basis for reconciliation of newly detected deposits by
|
||||
// matching them against deposits in this map that were already seen.
|
||||
deposits map[wire.OutPoint]*Deposit
|
||||
|
||||
// finalizedDepositChan is a channel that receives deposits that have
|
||||
// been finalized. The manager will adjust its internal state and flush
|
||||
// finalized deposits from its memory.
|
||||
finalizedDepositChan chan wire.OutPoint
|
||||
}
|
||||
|
||||
// NewManager creates a new deposit manager.
|
||||
func NewManager(cfg *ManagerConfig) *Manager {
|
||||
return &Manager{
|
||||
cfg: cfg,
|
||||
initChan: make(chan struct{}),
|
||||
activeDeposits: make(map[wire.OutPoint]*FSM),
|
||||
deposits: make(map[wire.OutPoint]*Deposit),
|
||||
finalizedDepositChan: make(chan wire.OutPoint),
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the address manager.
|
||||
func (m *Manager) Run(ctx context.Context, currentHeight uint32) error {
|
||||
m.runCtx = ctx
|
||||
|
||||
m.Lock()
|
||||
m.currentHeight, m.initiationHeight = currentHeight, currentHeight
|
||||
m.Unlock()
|
||||
|
||||
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(m.runCtx) //nolint:lll
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Recover previous deposits and static address parameters from the DB.
|
||||
err = m.recover(m.runCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the deposit notifier.
|
||||
m.pollDeposits(ctx)
|
||||
|
||||
// Communicate to the caller that the address manager has completed its
|
||||
// initialization.
|
||||
close(m.initChan)
|
||||
|
||||
for {
|
||||
select {
|
||||
case height := <-newBlockChan:
|
||||
m.Lock()
|
||||
m.currentHeight = uint32(height)
|
||||
m.Unlock()
|
||||
|
||||
// Inform all active deposits about a new block arrival.
|
||||
for _, fsm := range m.activeDeposits {
|
||||
select {
|
||||
case fsm.blockNtfnChan <- uint32(height):
|
||||
|
||||
case <-m.runCtx.Done():
|
||||
return m.runCtx.Err()
|
||||
}
|
||||
}
|
||||
case outpoint := <-m.finalizedDepositChan:
|
||||
// If deposits notify us about their finalization, we
|
||||
// update the manager's internal state and flush the
|
||||
// finalized deposit from memory.
|
||||
m.finalizeDeposit(outpoint)
|
||||
|
||||
case err := <-newBlockErrChan:
|
||||
return err
|
||||
|
||||
case <-m.runCtx.Done():
|
||||
return m.runCtx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recover recovers static address parameters, previous deposits and state
|
||||
// machines from the database and starts the deposit notifier.
|
||||
func (m *Manager) recover(ctx context.Context) error {
|
||||
log.Infof("Recovering static address parameters and deposits...")
|
||||
|
||||
// Recover deposits.
|
||||
deposits, err := m.cfg.Store.AllDeposits(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i, d := range deposits {
|
||||
m.deposits[d.OutPoint] = deposits[i]
|
||||
|
||||
// If the current deposit is final it wasn't active when we
|
||||
// shut down the client last. So we don't need to start a fsm
|
||||
// for it.
|
||||
if d.IsFinal() {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("Recovering deposit %x", d.ID)
|
||||
|
||||
// Create a state machine for a given deposit.
|
||||
fsm, err := NewFSM(
|
||||
m.runCtx, d, m.cfg,
|
||||
m.finalizedDepositChan, true,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the OnRecover event to the state machine.
|
||||
go func() {
|
||||
err = fsm.SendEvent(OnRecover, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending OnStart event: %v",
|
||||
err)
|
||||
}
|
||||
}()
|
||||
|
||||
m.activeDeposits[d.OutPoint] = fsm
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitInitComplete waits until the address manager has completed its setup.
|
||||
func (m *Manager) WaitInitComplete() {
|
||||
defer log.Debugf("Static address deposit manager initiation complete.")
|
||||
<-m.initChan
|
||||
}
|
||||
|
||||
// pollDeposits polls new deposits to our static address and notifies the
|
||||
// manager's event loop about them.
|
||||
func (m *Manager) pollDeposits(ctx context.Context) {
|
||||
log.Debugf("waiting for new static address deposits...")
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(PollInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := m.reconcileDeposits(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("unable to reconcile "+
|
||||
"deposits: %v", err)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// reconcileDeposits fetches all spends to our static address from our lnd
|
||||
// wallet and matches it against the deposits in our memory that we've seen so
|
||||
// far. It picks the newly identified deposits and starts a state machine per
|
||||
// deposit to track its progress.
|
||||
func (m *Manager) reconcileDeposits(ctx context.Context) error {
|
||||
log.Tracef("Reconciling new deposits...")
|
||||
|
||||
utxos, err := m.cfg.AddressManager.ListUnspent(
|
||||
ctx, MinConfs, MaxConfs,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to list new deposits: %v", err)
|
||||
}
|
||||
|
||||
newDeposits := m.filterNewDeposits(utxos)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to filter new deposits: %v", err)
|
||||
}
|
||||
|
||||
if len(newDeposits) == 0 {
|
||||
log.Tracef("No new deposits...")
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, utxo := range newDeposits {
|
||||
deposit, err := m.createNewDeposit(ctx, utxo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to retain new deposit: %v",
|
||||
err)
|
||||
}
|
||||
|
||||
log.Debugf("Received deposit: %v", deposit)
|
||||
err = m.startDepositFsm(deposit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to start new deposit FSM: %v",
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createNewDeposit transforms the wallet utxo into a deposit struct and stores
|
||||
// it in our database and manager memory.
|
||||
func (m *Manager) createNewDeposit(ctx context.Context,
|
||||
utxo *lnwallet.Utxo) (*Deposit, error) {
|
||||
|
||||
blockHeight, err := m.getBlockHeight(ctx, utxo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the sweep pk script.
|
||||
addr, err := m.cfg.WalletKit.NextAddr(
|
||||
ctx, lnwallet.DefaultAccountName,
|
||||
walletrpc.AddressType_TAPROOT_PUBKEY, false,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
timeoutSweepPkScript, err := txscript.PayToAddrScript(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id, err := GetRandomDepositID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deposit := &Deposit{
|
||||
ID: id,
|
||||
State: Deposited,
|
||||
OutPoint: utxo.OutPoint,
|
||||
Value: utxo.Value,
|
||||
ConfirmationHeight: int64(blockHeight),
|
||||
TimeOutSweepPkScript: timeoutSweepPkScript,
|
||||
}
|
||||
|
||||
err = m.cfg.Store.CreateDeposit(ctx, deposit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.deposits[deposit.OutPoint] = deposit
|
||||
m.Unlock()
|
||||
|
||||
return deposit, nil
|
||||
}
|
||||
|
||||
// getBlockHeight retrieves the block height of a given utxo.
|
||||
func (m *Manager) getBlockHeight(ctx context.Context,
|
||||
utxo *lnwallet.Utxo) (uint32, error) {
|
||||
|
||||
addressParams, err := m.cfg.AddressManager.GetStaticAddressParameters(
|
||||
ctx,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("couldn't get confirmation height for "+
|
||||
"deposit, %v", err)
|
||||
}
|
||||
|
||||
notifChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
|
||||
ctx, &utxo.OutPoint.Hash, addressParams.PkScript, MinConfs,
|
||||
int32(m.initiationHeight),
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
select {
|
||||
case tx := <-notifChan:
|
||||
return tx.BlockHeight, nil
|
||||
|
||||
case err := <-errChan:
|
||||
return 0, err
|
||||
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// filterNewDeposits filters the given utxos for new deposits that we haven't
|
||||
// seen before.
|
||||
func (m *Manager) filterNewDeposits(utxos []*lnwallet.Utxo) []*lnwallet.Utxo {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
var newDeposits []*lnwallet.Utxo
|
||||
for _, utxo := range utxos {
|
||||
_, ok := m.deposits[utxo.OutPoint]
|
||||
if !ok {
|
||||
newDeposits = append(newDeposits, utxo)
|
||||
}
|
||||
}
|
||||
|
||||
return newDeposits
|
||||
}
|
||||
|
||||
// startDepositFsm creates a new state machine flow from the latest deposit to
|
||||
// our static address.
|
||||
func (m *Manager) startDepositFsm(deposit *Deposit) error {
|
||||
// Create a state machine for a given deposit.
|
||||
fsm, err := NewFSM(
|
||||
m.runCtx, deposit, m.cfg, m.finalizedDepositChan, false,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the start event to the state machine.
|
||||
go func() {
|
||||
err = fsm.SendEvent(OnStart, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending OnStart event: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = fsm.DefaultObserver.WaitForState(m.runCtx, time.Minute, Deposited)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the FSM to the active FSMs map.
|
||||
m.Lock()
|
||||
m.activeDeposits[deposit.OutPoint] = fsm
|
||||
m.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) finalizeDeposit(outpoint wire.OutPoint) {
|
||||
m.Lock()
|
||||
delete(m.activeDeposits, outpoint)
|
||||
delete(m.deposits, outpoint)
|
||||
m.Unlock()
|
||||
}
|
Loading…
Reference in New Issue