mirror of
synced 2024-11-08 01:10:29 +00:00
In an effort to surface more information about why autoloop is not executing, we add an error when suggest swaps is called with no rules. In other cases we can surface a reason enum with each rule that is set, but in the case where we have no rules, there are no results to accompany with reasons.
804 lines
22 KiB
804 lines
22 KiB
package loopd
import (
const (
completedSwapsCount = 5
// minConfTarget is the minimum confirmation target we'll allow clients
// to specify. This is driven by the minimum confirmation target allowed
// by the backing fee estimator.
minConfTarget = 2
// swapClientServer implements the grpc service exposed by loopd.
type swapClientServer struct {
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
// LoopOut initiates an loop out swap with the given parameters. The call
// returns after the swap has been set up with the swap server. From that point
// onwards, progress can be tracked via the LoopOutStatus stream that is
// returned from Monitor().
func (s *swapClientServer) LoopOut(ctx context.Context,
in *looprpc.LoopOutRequest) (
*looprpc.SwapResponse, error) {
log.Infof("Loop out request received")
sweepConfTarget, err := validateConfTarget(
in.SweepConfTarget, loop.DefaultSweepConfTarget,
if err != nil {
return nil, err
var sweepAddr btcutil.Address
if in.Dest == "" {
// Generate sweep address if none specified.
var err error
sweepAddr, err = s.lnd.WalletKit.NextAddr(context.Background())
if err != nil {
return nil, fmt.Errorf("NextAddr error: %v", err)
} else {
var err error
sweepAddr, err = btcutil.DecodeAddress(
in.Dest, s.lnd.ChainParams,
if err != nil {
return nil, fmt.Errorf("decode address: %v", err)
// Check that the label is valid.
if err := labels.Validate(in.Label); err != nil {
return nil, err
req := &loop.OutRequest{
Amount: btcutil.Amount(in.Amt),
DestAddr: sweepAddr,
MaxMinerFee: btcutil.Amount(in.MaxMinerFee),
MaxPrepayAmount: btcutil.Amount(in.MaxPrepayAmt),
MaxPrepayRoutingFee: btcutil.Amount(in.MaxPrepayRoutingFee),
MaxSwapRoutingFee: btcutil.Amount(in.MaxSwapRoutingFee),
MaxSwapFee: btcutil.Amount(in.MaxSwapFee),
SweepConfTarget: sweepConfTarget,
HtlcConfirmations: in.HtlcConfirmations,
SwapPublicationDeadline: time.Unix(
int64(in.SwapPublicationDeadline), 0,
Label: in.Label,
Initiator: in.Initiator,
switch {
case in.LoopOutChannel != 0 && len(in.OutgoingChanSet) > 0:
return nil, errors.New("loop_out_channel and outgoing_" +
"chan_ids are mutually exclusive")
case in.LoopOutChannel != 0:
req.OutgoingChanSet = loopdb.ChannelSet{in.LoopOutChannel}
req.OutgoingChanSet = in.OutgoingChanSet
info, err := s.impl.LoopOut(ctx, req)
if err != nil {
log.Errorf("LoopOut: %v", err)
return nil, err
return &looprpc.SwapResponse{
Id: info.SwapHash.String(),
IdBytes: info.SwapHash[:],
HtlcAddress: info.HtlcAddressP2WSH.String(),
HtlcAddressP2Wsh: info.HtlcAddressP2WSH.String(),
ServerMessage: info.ServerMessage,
}, nil
func (s *swapClientServer) marshallSwap(loopSwap *loop.SwapInfo) (
*looprpc.SwapStatus, error) {
var (
state looprpc.SwapState
failureReason = looprpc.FailureReason_FAILURE_REASON_NONE
// Set our state var for non-failure states. If we get a failure, we
// will update our failure reason. To remain backwards compatible with
// previous versions where we squashed all failure reasons to a single
// failure state, we set a failure reason for all our different failure
// states, and set our failed state for all of them.
switch loopSwap.State {
case loopdb.StateInitiated:
state = looprpc.SwapState_INITIATED
case loopdb.StatePreimageRevealed:
state = looprpc.SwapState_PREIMAGE_REVEALED
case loopdb.StateHtlcPublished:
state = looprpc.SwapState_HTLC_PUBLISHED
case loopdb.StateInvoiceSettled:
state = looprpc.SwapState_INVOICE_SETTLED
case loopdb.StateSuccess:
state = looprpc.SwapState_SUCCESS
case loopdb.StateFailOffchainPayments:
failureReason = looprpc.FailureReason_FAILURE_REASON_OFFCHAIN
case loopdb.StateFailTimeout:
failureReason = looprpc.FailureReason_FAILURE_REASON_TIMEOUT
case loopdb.StateFailSweepTimeout:
failureReason = looprpc.FailureReason_FAILURE_REASON_SWEEP_TIMEOUT
case loopdb.StateFailInsufficientValue:
failureReason = looprpc.FailureReason_FAILURE_REASON_INSUFFICIENT_VALUE
case loopdb.StateFailTemporary:
failureReason = looprpc.FailureReason_FAILURE_REASON_TEMPORARY
case loopdb.StateFailIncorrectHtlcAmt:
failureReason = looprpc.FailureReason_FAILURE_REASON_INCORRECT_AMOUNT
return nil, fmt.Errorf("unknown swap state: %v", loopSwap.State)
// If we have a failure reason, we have a failure state, so should use
// our catchall failed state.
if failureReason != looprpc.FailureReason_FAILURE_REASON_NONE {
state = looprpc.SwapState_FAILED
var swapType looprpc.SwapType
var htlcAddress, htlcAddressP2WSH, htlcAddressNP2WSH string
switch loopSwap.SwapType {
case swap.TypeIn:
swapType = looprpc.SwapType_LOOP_IN
htlcAddressP2WSH = loopSwap.HtlcAddressP2WSH.EncodeAddress()
if loopSwap.ExternalHtlc {
htlcAddressNP2WSH = loopSwap.HtlcAddressNP2WSH.EncodeAddress()
htlcAddress = htlcAddressNP2WSH
} else {
htlcAddress = htlcAddressP2WSH
case swap.TypeOut:
swapType = looprpc.SwapType_LOOP_OUT
htlcAddressP2WSH = loopSwap.HtlcAddressP2WSH.EncodeAddress()
htlcAddress = htlcAddressP2WSH
return nil, errors.New("unknown swap type")
return &looprpc.SwapStatus{
Amt: int64(loopSwap.AmountRequested),
Id: loopSwap.SwapHash.String(),
IdBytes: loopSwap.SwapHash[:],
State: state,
FailureReason: failureReason,
InitiationTime: loopSwap.InitiationTime.UnixNano(),
LastUpdateTime: loopSwap.LastUpdate.UnixNano(),
HtlcAddress: htlcAddress,
HtlcAddressP2Wsh: htlcAddressP2WSH,
HtlcAddressNp2Wsh: htlcAddressNP2WSH,
Type: swapType,
CostServer: int64(loopSwap.Cost.Server),
CostOnchain: int64(loopSwap.Cost.Onchain),
CostOffchain: int64(loopSwap.Cost.Offchain),
Label: loopSwap.Label,
}, nil
// Monitor will return a stream of swap updates for currently active swaps.
func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
server looprpc.SwapClient_MonitorServer) error {
log.Infof("Monitor request received")
send := func(info loop.SwapInfo) error {
rpcSwap, err := s.marshallSwap(&info)
if err != nil {
return err
return server.Send(rpcSwap)
// Start a notification queue for this subscriber.
queue := queue.NewConcurrentQueue(20)
// Add this subscriber to the global subscriber list. Also create a
// snapshot of all pending and completed swaps within the lock, to
// prevent subscribers from receiving duplicate updates.
id := s.nextSubscriberID
s.subscribers[id] = queue.ChanIn()
var pendingSwaps, completedSwaps []loop.SwapInfo
for _, swap := range s.swaps {
if swap.State.Type() == loopdb.StateTypePending {
pendingSwaps = append(pendingSwaps, swap)
} else {
completedSwaps = append(completedSwaps, swap)
defer func() {
delete(s.subscribers, id)
// Sort completed swaps new to old.
sort.Slice(completedSwaps, func(i, j int) bool {
return completedSwaps[i].LastUpdate.After(
// Discard all but top x latest.
if len(completedSwaps) > completedSwapsCount {
completedSwaps = completedSwaps[:completedSwapsCount]
// Concatenate both sets.
filteredSwaps := append(pendingSwaps, completedSwaps...)
// Sort again, but this time old to new.
sort.Slice(filteredSwaps, func(i, j int) bool {
return filteredSwaps[i].LastUpdate.Before(
// Return swaps to caller.
for _, swap := range filteredSwaps {
if err := send(swap); err != nil {
return err
// As long as the client is connected, keep passing through swap
// updates.
for {
select {
case queueItem, ok := <-queue.ChanOut():
if !ok {
return nil
swap := queueItem.(loop.SwapInfo)
if err := send(swap); err != nil {
return err
// The client cancels the subscription.
case <-server.Context().Done():
return nil
// The server is shutting down.
case <-s.mainCtx.Done():
return fmt.Errorf("server is shutting down")
// ListSwaps returns a list of all currently known swaps and their current
// status.
func (s *swapClientServer) ListSwaps(_ context.Context,
_ *looprpc.ListSwapsRequest) (*looprpc.ListSwapsResponse, error) {
var (
rpcSwaps = make([]*looprpc.SwapStatus, len(s.swaps))
idx = 0
err error
defer s.swapsLock.Unlock()
// We can just use the server's in-memory cache as that contains the
// most up-to-date state including temporary failures which aren't
// persisted to disk. The swaps field is a map, that's why we need an
// additional index.
for _, swp := range s.swaps {
swp := swp
rpcSwaps[idx], err = s.marshallSwap(&swp)
if err != nil {
return nil, err
return &looprpc.ListSwapsResponse{Swaps: rpcSwaps}, nil
// SwapInfo returns all known details about a single swap.
func (s *swapClientServer) SwapInfo(_ context.Context,
req *looprpc.SwapInfoRequest) (*looprpc.SwapStatus, error) {
swapHash, err := lntypes.MakeHash(req.Id)
if err != nil {
return nil, fmt.Errorf("error parsing swap hash: %v", err)
// Just return the server's in-memory cache here too as we also want to
// return temporary failures to the client.
swp, ok := s.swaps[swapHash]
if !ok {
return nil, fmt.Errorf("swap with hash %s not found", req.Id)
return s.marshallSwap(&swp)
// LoopOutTerms returns the terms that the server enforces for loop out swaps.
func (s *swapClientServer) LoopOutTerms(ctx context.Context,
req *looprpc.TermsRequest) (*looprpc.OutTermsResponse, error) {
log.Infof("Loop out terms request received")
terms, err := s.impl.LoopOutTerms(ctx)
if err != nil {
log.Errorf("Terms request: %v", err)
return nil, err
return &looprpc.OutTermsResponse{
MinSwapAmount: int64(terms.MinSwapAmount),
MaxSwapAmount: int64(terms.MaxSwapAmount),
MinCltvDelta: terms.MinCltvDelta,
MaxCltvDelta: terms.MaxCltvDelta,
}, nil
// LoopOutQuote returns a quote for a loop out swap with the provided
// parameters.
func (s *swapClientServer) LoopOutQuote(ctx context.Context,
req *looprpc.QuoteRequest) (*looprpc.OutQuoteResponse, error) {
confTarget, err := validateConfTarget(
req.ConfTarget, loop.DefaultSweepConfTarget,
if err != nil {
return nil, err
quote, err := s.impl.LoopOutQuote(ctx, &loop.LoopOutQuoteRequest{
Amount: btcutil.Amount(req.Amt),
SweepConfTarget: confTarget,
SwapPublicationDeadline: time.Unix(
int64(req.SwapPublicationDeadline), 0,
if err != nil {
return nil, err
return &looprpc.OutQuoteResponse{
HtlcSweepFeeSat: int64(quote.MinerFee),
PrepayAmtSat: int64(quote.PrepayAmount),
SwapFeeSat: int64(quote.SwapFee),
SwapPaymentDest: quote.SwapPaymentDest[:],
}, nil
// GetTerms returns the terms that the server enforces for swaps.
func (s *swapClientServer) GetLoopInTerms(ctx context.Context,
req *looprpc.TermsRequest) (*looprpc.InTermsResponse, error) {
log.Infof("Loop in terms request received")
terms, err := s.impl.LoopInTerms(ctx)
if err != nil {
log.Errorf("Terms request: %v", err)
return nil, err
return &looprpc.InTermsResponse{
MinSwapAmount: int64(terms.MinSwapAmount),
MaxSwapAmount: int64(terms.MaxSwapAmount),
}, nil
// GetQuote returns a quote for a swap with the provided parameters.
func (s *swapClientServer) GetLoopInQuote(ctx context.Context,
req *looprpc.QuoteRequest) (*looprpc.InQuoteResponse, error) {
log.Infof("Loop in quote request received")
htlcConfTarget, err := validateLoopInRequest(
req.ConfTarget, req.ExternalHtlc,
if err != nil {
return nil, err
quote, err := s.impl.LoopInQuote(ctx, &loop.LoopInQuoteRequest{
Amount: btcutil.Amount(req.Amt),
HtlcConfTarget: htlcConfTarget,
ExternalHtlc: req.ExternalHtlc,
if err != nil {
return nil, err
return &looprpc.InQuoteResponse{
HtlcPublishFeeSat: int64(quote.MinerFee),
SwapFeeSat: int64(quote.SwapFee),
}, nil
func (s *swapClientServer) LoopIn(ctx context.Context,
in *looprpc.LoopInRequest) (
*looprpc.SwapResponse, error) {
log.Infof("Loop in request received")
htlcConfTarget, err := validateLoopInRequest(
in.HtlcConfTarget, in.ExternalHtlc,
if err != nil {
return nil, err
// Check that the label is valid.
if err := labels.Validate(in.Label); err != nil {
return nil, err
req := &loop.LoopInRequest{
Amount: btcutil.Amount(in.Amt),
MaxMinerFee: btcutil.Amount(in.MaxMinerFee),
MaxSwapFee: btcutil.Amount(in.MaxSwapFee),
HtlcConfTarget: htlcConfTarget,
ExternalHtlc: in.ExternalHtlc,
Label: in.Label,
Initiator: in.Initiator,
if in.LastHop != nil {
lastHop, err := route.NewVertexFromBytes(in.LastHop)
if err != nil {
return nil, err
req.LastHop = &lastHop
swapInfo, err := s.impl.LoopIn(ctx, req)
if err != nil {
log.Errorf("Loop in: %v", err)
return nil, err
response := &looprpc.SwapResponse{
Id: swapInfo.SwapHash.String(),
IdBytes: swapInfo.SwapHash[:],
HtlcAddressP2Wsh: swapInfo.HtlcAddressP2WSH.String(),
ServerMessage: swapInfo.ServerMessage,
if req.ExternalHtlc {
response.HtlcAddressNp2Wsh = swapInfo.HtlcAddressNP2WSH.String()
response.HtlcAddress = response.HtlcAddressNp2Wsh
} else {
response.HtlcAddress = response.HtlcAddressP2Wsh
return response, nil
// GetLsatTokens returns all tokens that are contained in the LSAT token store.
func (s *swapClientServer) GetLsatTokens(ctx context.Context,
_ *looprpc.TokensRequest) (*looprpc.TokensResponse, error) {
log.Infof("Get LSAT tokens request received")
tokens, err := s.impl.LsatStore.AllTokens()
if err != nil {
return nil, err
rpcTokens := make([]*looprpc.LsatToken, len(tokens))
idx := 0
for key, token := range tokens {
macBytes, err := token.BaseMacaroon().MarshalBinary()
if err != nil {
return nil, err
rpcTokens[idx] = &looprpc.LsatToken{
BaseMacaroon: macBytes,
PaymentHash: token.PaymentHash[:],
PaymentPreimage: token.Preimage[:],
AmountPaidMsat: int64(token.AmountPaid),
RoutingFeePaidMsat: int64(token.RoutingFeePaid),
TimeCreated: token.TimeCreated.Unix(),
Expired: !token.IsValid(),
StorageName: key,
return &looprpc.TokensResponse{Tokens: rpcTokens}, nil
// GetLiquidityParams gets our current liquidity manager's parameters.
func (s *swapClientServer) GetLiquidityParams(_ context.Context,
_ *looprpc.GetLiquidityParamsRequest) (*looprpc.LiquidityParameters,
error) {
cfg := s.liquidityMgr.GetParameters()
satPerByte := cfg.SweepFeeRateLimit.FeePerKVByte() / 1000
rpcCfg := &looprpc.LiquidityParameters{
MaxMinerFeeSat: uint64(cfg.MaximumMinerFee),
MaxSwapFeePpm: uint64(cfg.MaximumSwapFeePPM),
MaxRoutingFeePpm: uint64(cfg.MaximumRoutingFeePPM),
MaxPrepayRoutingFeePpm: uint64(cfg.MaximumPrepayRoutingFeePPM),
MaxPrepaySat: uint64(cfg.MaximumPrepay),
SweepFeeRateSatPerVbyte: uint64(satPerByte),
SweepConfTarget: cfg.SweepConfTarget,
FailureBackoffSec: uint64(cfg.FailureBackOff.Seconds()),
Autoloop: cfg.Autoloop,
AutoloopBudgetSat: uint64(cfg.AutoFeeBudget),
AutoMaxInFlight: uint64(cfg.MaxAutoInFlight),
Rules: make(
[]*looprpc.LiquidityRule, 0, len(cfg.ChannelRules),
MinSwapAmount: uint64(cfg.ClientRestrictions.Minimum),
MaxSwapAmount: uint64(cfg.ClientRestrictions.Maximum),
// Zero golang time is different to a zero unix time, so we only set
// our start date if it is non-zero.
if !cfg.AutoFeeStartDate.IsZero() {
rpcCfg.AutoloopBudgetStartSec = uint64(
for channel, rule := range cfg.ChannelRules {
rpcRule := &looprpc.LiquidityRule{
ChannelId: channel.ToUint64(),
Type: looprpc.LiquidityRuleType_THRESHOLD,
IncomingThreshold: uint32(rule.MinimumIncoming),
OutgoingThreshold: uint32(rule.MinimumOutgoing),
rpcCfg.Rules = append(rpcCfg.Rules, rpcRule)
return rpcCfg, nil
// SetLiquidityParams attempts to set our current liquidity manager's
// parameters.
func (s *swapClientServer) SetLiquidityParams(ctx context.Context,
in *looprpc.SetLiquidityParamsRequest) (*looprpc.SetLiquidityParamsResponse,
error) {
satPerVbyte := chainfee.SatPerKVByte(
in.Parameters.SweepFeeRateSatPerVbyte * 1000,
params := liquidity.Parameters{
MaximumMinerFee: btcutil.Amount(in.Parameters.MaxMinerFeeSat),
MaximumSwapFeePPM: int(in.Parameters.MaxSwapFeePpm),
MaximumRoutingFeePPM: int(in.Parameters.MaxRoutingFeePpm),
MaximumPrepayRoutingFeePPM: int(in.Parameters.MaxPrepayRoutingFeePpm),
MaximumPrepay: btcutil.Amount(in.Parameters.MaxPrepaySat),
SweepFeeRateLimit: satPerVbyte.FeePerKWeight(),
SweepConfTarget: in.Parameters.SweepConfTarget,
FailureBackOff: time.Duration(in.Parameters.FailureBackoffSec) *
Autoloop: in.Parameters.Autoloop,
AutoFeeBudget: btcutil.Amount(in.Parameters.AutoloopBudgetSat),
MaxAutoInFlight: int(in.Parameters.AutoMaxInFlight),
ChannelRules: make(
ClientRestrictions: liquidity.Restrictions{
Minimum: btcutil.Amount(in.Parameters.MinSwapAmount),
Maximum: btcutil.Amount(in.Parameters.MaxSwapAmount),
// Zero unix time is different to zero golang time.
if in.Parameters.AutoloopBudgetStartSec != 0 {
params.AutoFeeStartDate = time.Unix(
int64(in.Parameters.AutoloopBudgetStartSec), 0,
for _, rule := range in.Parameters.Rules {
var (
shortID = lnwire.NewShortChanIDFromInt(rule.ChannelId)
err error
// Make sure that there are not multiple rules set for a single
// channel.
if _, ok := params.ChannelRules[shortID]; ok {
return nil, fmt.Errorf("multiple rules set for "+
"channel: %v", shortID)
params.ChannelRules[shortID], err = rpcToRule(rule)
if err != nil {
return nil, err
if err := s.liquidityMgr.SetParameters(ctx, params); err != nil {
return nil, err
return &looprpc.SetLiquidityParamsResponse{}, nil
// rpcToRule switches on rpc rule type to convert to our rule interface.
func rpcToRule(rule *looprpc.LiquidityRule) (*liquidity.ThresholdRule, error) {
switch rule.Type {
case looprpc.LiquidityRuleType_UNKNOWN:
return nil, fmt.Errorf("rule type field must be set")
case looprpc.LiquidityRuleType_THRESHOLD:
return liquidity.NewThresholdRule(
), nil
return nil, fmt.Errorf("unknown rule: %T", rule)
// SuggestSwaps provides a list of suggested swaps based on lnd's current
// channel balances and rules set by the liquidity manager.
func (s *swapClientServer) SuggestSwaps(ctx context.Context,
_ *looprpc.SuggestSwapsRequest) (*looprpc.SuggestSwapsResponse, error) {
swaps, err := s.liquidityMgr.SuggestSwaps(ctx, false)
switch err {
case liquidity.ErrNoRules:
return nil, status.Error(codes.FailedPrecondition, err.Error())
case nil:
return nil, err
var loopOut []*looprpc.LoopOutRequest
for _, swap := range swaps {
loopOut = append(loopOut, &looprpc.LoopOutRequest{
Amt: int64(swap.Amount),
OutgoingChanSet: swap.OutgoingChanSet,
MaxSwapFee: int64(swap.MaxSwapFee),
MaxMinerFee: int64(swap.MaxMinerFee),
MaxPrepayAmt: int64(swap.MaxPrepayAmount),
MaxSwapRoutingFee: int64(swap.MaxSwapRoutingFee),
MaxPrepayRoutingFee: int64(swap.MaxPrepayRoutingFee),
SweepConfTarget: swap.SweepConfTarget,
return &looprpc.SuggestSwapsResponse{
LoopOut: loopOut,
}, nil
// processStatusUpdates reads updates on the status channel and processes them.
// NOTE: This must run inside a goroutine as it blocks until the main context
// shuts down.
func (s *swapClientServer) processStatusUpdates(mainCtx context.Context) {
for {
select {
// On updates, refresh the server's in-memory state and inform
// subscribers about the changes.
case swp := <-s.statusChan:
s.swaps[swp.SwapHash] = swp
for _, subscriber := range s.subscribers {
select {
case subscriber <- swp:
case <-mainCtx.Done():
// Server is shutting down.
case <-mainCtx.Done():
// validateConfTarget ensures the given confirmation target is valid. If one
// isn't specified (0 value), then the default target is used.
func validateConfTarget(target, defaultTarget int32) (int32, error) {
switch {
case target == 0:
return defaultTarget, nil
// Ensure the target respects our minimum threshold.
case target < minConfTarget:
return 0, fmt.Errorf("a confirmation target of at least %v "+
"must be provided", minConfTarget)
return target, nil
// validateLoopInRequest fails if the mutually exclusive conf target and
// external parameters are both set.
func validateLoopInRequest(htlcConfTarget int32, external bool) (int32, error) {
// If the htlc is going to be externally set, the htlcConfTarget should
// not be set, because it has no relevance when the htlc is external.
if external && htlcConfTarget != 0 {
return 0, errors.New("external and htlc conf target cannot " +
"both be set")
// If the htlc is being externally published, we do not need to set a
// confirmation target.
if external {
return 0, nil
return validateConfTarget(htlcConfTarget, loop.DefaultHtlcConfTarget)