2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-11 13:11:12 +00:00
loop/swap_server_client.go
Andras Banki-Horvath b47f67a6de
loopin: push the htlc internal key if the invoice is swept
This commit adds key reveal to MuSig2 loopin swaps' success path. In
this case the client reveals their internal HTLC key to the server when
the swap invoice is settled. With this key the server can sweep the swap
HTLC without any more interaction from the client. We'll do this every
block (after the invoice has been settled).
2023-03-20 20:08:43 +01:00

913 lines
25 KiB
Go

package loop
import (
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/aperture/lsat"
"github.com/lightninglabs/loop/loopdb"
looprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/tor"
"github.com/lightningnetwork/lnd/zpay32"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)
var (
// errServerSubscriptionComplete is returned when our subscription to
// server updates exits because the server has no more updates to
// provide us, because its part in the swap is complete.
errServerSubscriptionComplete = errors.New("server finished serving " +
"updates")
// errSubscriptionFailed is returned when our subscription returns with
// and EOF, indicating that the server restarted, we had an unexpected
// network failure. Since we do not have restart-recovery, we note that
// we will not resume our subscription once this error occurs.
errSubscriptionFailed = errors.New("failed, no further updates will " +
"be provided")
)
// RoutingPluginType represents the routing plugin type directly.
type RoutingPluginType uint8
const (
// RoutingPluginNone is recommended when the client shouldn't use any
// routing plugin.
RoutingPluginNone RoutingPluginType = 0
// RoutingPluginLowHigh is recommended when the client should use
// low-high routing method.
RoutingPluginLowHigh RoutingPluginType = 1
)
// String pretty prints a RoutingPluginType.
func (r RoutingPluginType) String() string {
switch r {
case RoutingPluginLowHigh:
return "Low/High"
default:
return "None"
}
}
type swapServerClient interface {
GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error)
GetLoopOutQuote(ctx context.Context, amt btcutil.Amount, expiry int32,
swapPublicationDeadline time.Time) (
*LoopOutQuote, error)
GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error)
GetLoopInQuote(ctx context.Context, amt btcutil.Amount,
pubKey route.Vertex, lastHop *route.Vertex,
routeHints [][]zpay32.HopHint) (*LoopInQuote, error)
Probe(ctx context.Context, amt btcutil.Amount, target route.Vertex,
lastHop *route.Vertex, routeHints [][]zpay32.HopHint) error
NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error)
PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error
NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, senderScriptKey,
senderInternalKey [33]byte, swapInvoice, probeInvoice string,
lastHop *route.Vertex, initiator string) (
*newLoopInResponse, error)
// SubscribeLoopOutUpdates subscribes to loop out server state.
SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
// SubscribeLoopInUpdates subscribes to loop in server state.
SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
// CancelLoopOutSwap cancels a loop out swap.
CancelLoopOutSwap(ctx context.Context,
details *outCancelDetails) error
// RecommendRoutingPlugin asks the server for routing plugin
// recommendation for off-chain payment(s) of a swap.
RecommendRoutingPlugin(ctx context.Context, swapHash lntypes.Hash,
paymentAddr [32]byte) (RoutingPluginType, error)
// ReportRoutingResult reports a routing result corresponding to a swap.
ReportRoutingResult(ctx context.Context,
swapHash lntypes.Hash, paymentAddr [32]byte,
plugin RoutingPluginType, success bool, attempts int32,
totalTime int64) error
// MuSig2SignSweep calls the server to cooperatively sign the MuSig2
// htlc spend. Returns the server's nonce and partial signature.
MuSig2SignSweep(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte) (
[]byte, []byte, error)
// PushKey sends the client's HTLC internal key associated with the
// swap to the server.
PushKey(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
clientInternalPrivateKey [32]byte) error
}
type grpcSwapServerClient struct {
server looprpc.SwapServerClient
conn *grpc.ClientConn
wg sync.WaitGroup
}
// stop sends the signal for the server's goroutines to shutdown and waits for
// them to complete.
func (s *grpcSwapServerClient) stop() {
if err := s.conn.Close(); err != nil {
log.Warnf("could not close connection: %v", err)
}
s.wg.Wait()
}
var _ swapServerClient = (*grpcSwapServerClient)(nil)
func newSwapServerClient(cfg *ClientConfig, lsatStore lsat.Store) (
*grpcSwapServerClient, error) {
// Create the server connection with the interceptor that will handle
// the LSAT protocol for us.
clientInterceptor := lsat.NewInterceptor(
cfg.Lnd, lsatStore, serverRPCTimeout, cfg.MaxLsatCost,
cfg.MaxLsatFee, false,
)
serverConn, err := getSwapServerConn(
cfg.ServerAddress, cfg.ProxyAddress, cfg.SwapServerNoTLS,
cfg.TLSPathServer, clientInterceptor,
)
if err != nil {
return nil, err
}
server := looprpc.NewSwapServerClient(serverConn)
return &grpcSwapServerClient{
conn: serverConn,
server: server,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopOutTerms(rpcCtx,
&looprpc.ServerLoopOutTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
},
)
if err != nil {
return nil, err
}
return &LoopOutTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
MinCltvDelta: terms.MinCltvDelta,
MaxCltvDelta: terms.MaxCltvDelta,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutQuote(ctx context.Context,
amt btcutil.Amount, expiry int32, swapPublicationDeadline time.Time) (
*LoopOutQuote, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
quoteResp, err := s.server.LoopOutQuote(rpcCtx,
&looprpc.ServerLoopOutQuoteRequest{
Amt: uint64(amt),
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
Expiry: expiry,
},
)
if err != nil {
return nil, err
}
dest, err := hex.DecodeString(quoteResp.SwapPaymentDest)
if err != nil {
return nil, err
}
if len(dest) != 33 {
return nil, errors.New("invalid payment dest")
}
var destArray [33]byte
copy(destArray[:], dest)
return &LoopOutQuote{
PrepayAmount: btcutil.Amount(quoteResp.PrepayAmt),
SwapFee: btcutil.Amount(quoteResp.SwapFee),
SwapPaymentDest: destArray,
}, nil
}
func (s *grpcSwapServerClient) GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopInTerms(rpcCtx,
&looprpc.ServerLoopInTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
},
)
if err != nil {
return nil, err
}
return &LoopInTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
}, nil
}
func (s *grpcSwapServerClient) GetLoopInQuote(ctx context.Context,
amt btcutil.Amount, pubKey route.Vertex, lastHop *route.Vertex,
routeHints [][]zpay32.HopHint) (*LoopInQuote, error) {
err := s.Probe(ctx, amt, pubKey, lastHop, routeHints)
if err != nil && status.Code(err) != codes.Unavailable {
log.Warnf("Server probe error: %v", err)
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
req := &looprpc.ServerLoopInQuoteRequest{
Amt: uint64(amt),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
Pubkey: pubKey[:],
}
if lastHop != nil {
req.LastHop = lastHop[:]
}
if routeHints != nil {
rh, err := marshallRouteHints(routeHints)
if err != nil {
return nil, err
}
req.RouteHints = rh
}
quoteResp, err := s.server.LoopInQuote(rpcCtx, req)
if err != nil {
return nil, err
}
return &LoopInQuote{
SwapFee: btcutil.Amount(quoteResp.SwapFee),
CltvDelta: quoteResp.CltvDelta,
}, nil
}
// marshallRouteHints marshalls a list of route hints.
func marshallRouteHints(routeHints [][]zpay32.HopHint) (
[]*looprpc.RouteHint, error) {
rpcRouteHints := make([]*looprpc.RouteHint, 0, len(routeHints))
for _, routeHint := range routeHints {
rpcRouteHint := make(
[]*looprpc.HopHint, 0, len(routeHint),
)
for _, hint := range routeHint {
rpcHint, err := marshallHopHint(hint)
if err != nil {
return nil, err
}
rpcRouteHint = append(rpcRouteHint, rpcHint)
}
rpcRouteHints = append(rpcRouteHints, &looprpc.RouteHint{
HopHints: rpcRouteHint,
})
}
return rpcRouteHints, nil
}
// marshallHopHint marshalls a single hop hint.
func marshallHopHint(hint zpay32.HopHint) (*looprpc.HopHint, error) {
nodeID, err := route.NewVertexFromBytes(
hint.NodeID.SerializeCompressed(),
)
if err != nil {
return nil, err
}
return &looprpc.HopHint{
ChanId: hint.ChannelID,
CltvExpiryDelta: uint32(hint.CLTVExpiryDelta),
FeeBaseMsat: hint.FeeBaseMSat,
FeeProportionalMillionths: hint.FeeProportionalMillionths,
NodeId: nodeID.String(),
}, nil
}
func (s *grpcSwapServerClient) Probe(ctx context.Context, amt btcutil.Amount,
target route.Vertex, lastHop *route.Vertex,
routeHints [][]zpay32.HopHint) error {
rpcCtx, rpcCancel := context.WithTimeout(ctx, probeTimeout)
defer rpcCancel()
rpcRouteHints, err := marshallRouteHints(routeHints)
if err != nil {
return err
}
req := &looprpc.ServerProbeRequest{
Amt: uint64(amt),
Target: target[:],
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
RouteHints: rpcRouteHints,
}
if lastHop != nil {
req.LastHop = lastHop[:]
}
_, err = s.server.Probe(rpcCtx, req)
return err
}
func (s *grpcSwapServerClient) NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
swapResp, err := s.server.NewLoopOutSwap(rpcCtx,
&looprpc.ServerLoopOutRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
ReceiverKey: receiverKey[:],
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
Expiry: expiry,
UserAgent: UserAgent(initiator),
},
)
if err != nil {
return nil, err
}
var senderKey [33]byte
copy(senderKey[:], swapResp.SenderKey)
// Validate sender key.
_, err = btcec.ParsePubKey(senderKey[:])
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopOutResponse{
swapInvoice: swapResp.SwapInvoice,
prepayInvoice: swapResp.PrepayInvoice,
senderKey: senderKey,
serverMessage: swapResp.ServerMessage,
}, nil
}
// PushLoopOutPreimage pushes a preimage to the server.
func (s *grpcSwapServerClient) PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
_, err := s.server.LoopOutPushPreimage(rpcCtx,
&looprpc.ServerLoopOutPushPreimageRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
Preimage: preimage[:],
},
)
return err
}
func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, senderScriptKey,
senderInternalKey [33]byte, swapInvoice, probeInvoice string,
lastHop *route.Vertex, initiator string) (*newLoopInResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
req := &looprpc.ServerLoopInRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
SenderKey: senderScriptKey[:],
SwapInvoice: swapInvoice,
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
ProbeInvoice: probeInvoice,
UserAgent: UserAgent(initiator),
}
if lastHop != nil {
req.LastHop = lastHop[:]
}
// Set the client's internal key if this is a MuSig2 swap.
if loopdb.CurrentProtocolVersion() >= loopdb.ProtocolVersionMuSig2 {
req.SenderInternalPubkey = senderInternalKey[:]
}
swapResp, err := s.server.NewLoopInSwap(rpcCtx, req)
if err != nil {
return nil, err
}
var receiverKey, receiverInternalKey [33]byte
copy(receiverKey[:], swapResp.ReceiverKey)
copy(receiverInternalKey[:], swapResp.ReceiverInternalPubkey)
// Validate receiver key.
_, err = btcec.ParsePubKey(receiverKey[:])
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopInResponse{
receiverKey: receiverKey,
receiverInternalKey: receiverInternalKey,
expiry: swapResp.Expiry,
serverMessage: swapResp.ServerMessage,
}, nil
}
// ServerUpdate summarizes an update from the swap server.
type ServerUpdate struct {
// State is the state that the server has sent us.
State looprpc.ServerSwapState
// Timestamp is the time of the server state update.
Timestamp time.Time
}
// SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopInUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopOutUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// makeServerUpdate takes a stream receive function and a channel that it
// should pipe updates into. It sends events into the updates channel until
// the client cancels, server client shuts down or the subscription is cancelled
// server side.
func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
<-chan error) {
// We will return exactly one error from this function so we buffer
// our error channel so that the function exit is not dependent on
// the error being read.
errChan := make(chan error, 1)
updateChan := make(chan *ServerUpdate)
// Create a goroutine that will pipe updates in to our updates channel.
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
// Try to receive from our stream. If there are no items
// to consume, this call will block. If our stream is
// cancelled by the server we will receive an error.
response, err := receive()
switch err {
// If we get a nil error, we proceed with to delivering
// the update we have just received.
case nil:
// If we get an EOF error, the server is finished
// sending us updates, so we return with a non-nil
// a subscription complete error to inform the caller
// that they will no longer receive updates.
case io.EOF:
errChan <- errServerSubscriptionComplete
return
// If we receive a non-nil error, we exit.
default:
// If we get a transport is closing error, we
// send a server restarting error so that the
// caller is informed that we will not get
// any more updates from the server (since we
// don't have retry logic yet).
if isErrConClosing(err) {
errChan <- errSubscriptionFailed
} else {
errChan <- err
}
return
}
select {
// Try to send our update to the update channel.
case updateChan <- response:
// If the client cancels their context, we exit with
// no error.
case <-ctx.Done():
errChan <- nil
return
}
}
}()
return updateChan, errChan
}
// paymentType is an enum representing different types of off-chain payments
// made by a swap.
type paymentType uint8
const (
// paymentTypePrepay indicates that we could not route the prepay.
paymentTypePrepay paymentType = iota
// paymentTypeInvoice indicates that we could not route the swap
// invoice.
paymentTypeInvoice
)
// routeCancelMetadata contains cancelation information for swaps that are
// canceled because the client could not route off-chain to the server.
type routeCancelMetadata struct {
// paymentType is the type of payment that failed.
paymentType paymentType
// attempts is the set of htlc attempts made by the client, reporting
// the distance from the invoice's destination node that a failure
// occurred.
attempts []uint32
// failureReason is the reason that the payment failed.
failureReason lnrpc.PaymentFailureReason
}
// outCancelDetails contains the informaton required to cancel a loop out swap.
type outCancelDetails struct {
// Hash is the swap's hash.
hash lntypes.Hash
// paymentAddr is the payment address for the swap's invoice.
paymentAddr [32]byte
// metadata contains additional information about the swap.
metadata routeCancelMetadata
}
// CancelLoopOutSwap sends an instruction to the server to cancel a loop out
// swap.
func (s *grpcSwapServerClient) CancelLoopOutSwap(ctx context.Context,
details *outCancelDetails) error {
req := &looprpc.CancelLoopOutSwapRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
SwapHash: details.hash[:],
PaymentAddress: details.paymentAddr[:],
}
var err error
req.CancelInfo, err = rpcRouteCancel(details)
if err != nil {
return err
}
_, err = s.server.CancelLoopOutSwap(ctx, req)
return err
}
// RecommendRoutingPlugin asks the server for routing plugin recommendation for
// off-chain payment(s) of a swap.
func (s *grpcSwapServerClient) RecommendRoutingPlugin(ctx context.Context,
swapHash lntypes.Hash, paymentAddr [32]byte) (RoutingPluginType, error) {
req := &looprpc.RecommendRoutingPluginReq{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
SwapHash: swapHash[:],
PaymentAddress: paymentAddr[:],
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
res, err := s.server.RecommendRoutingPlugin(rpcCtx, req)
if err != nil {
return RoutingPluginNone, err
}
var plugin RoutingPluginType
switch res.Plugin {
case looprpc.RoutingPlugin_NONE:
plugin = RoutingPluginNone
case looprpc.RoutingPlugin_LOW_HIGH:
plugin = RoutingPluginLowHigh
default:
log.Warnf("Recommended routing plugin is unknown: %v", plugin)
plugin = RoutingPluginNone
}
return plugin, nil
}
// ReportRoutingResult reports a routing result corresponding to a swap.
func (s *grpcSwapServerClient) ReportRoutingResult(ctx context.Context,
swapHash lntypes.Hash, paymentAddr [32]byte, plugin RoutingPluginType,
success bool, attempts int32, totalTime int64) error {
var rpcRoutingPlugin looprpc.RoutingPlugin
switch plugin {
case RoutingPluginLowHigh:
rpcRoutingPlugin = looprpc.RoutingPlugin_LOW_HIGH
default:
rpcRoutingPlugin = looprpc.RoutingPlugin_NONE
}
req := &looprpc.ReportRoutingResultReq{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion(),
SwapHash: swapHash[:],
PaymentAddress: paymentAddr[:],
Plugin: rpcRoutingPlugin,
Success: success,
Attempts: attempts,
TotalTime: totalTime,
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
_, err := s.server.ReportRoutingResult(rpcCtx, req)
return err
}
// MuSig2SignSweep calls the server to cooperatively sign the MuSig2 htlc
// spend. Returns the server's nonce and partial signature.
func (s *grpcSwapServerClient) MuSig2SignSweep(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
paymentAddr [32]byte, nonce []byte, sweepTxPsbt []byte) (
[]byte, []byte, error) {
req := &looprpc.MuSig2SignSweepReq{
ProtocolVersion: looprpc.ProtocolVersion(protocolVersion),
SwapHash: swapHash[:],
PaymentAddress: paymentAddr[:],
Nonce: nonce,
SweepTxPsbt: sweepTxPsbt,
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
res, err := s.server.MuSig2SignSweep(rpcCtx, req)
if err != nil {
return nil, nil, err
}
return res.Nonce, res.PartialSignature, nil
}
// PushKey sends the client's HTLC internal key associated with the swap to
// the server.
func (s *grpcSwapServerClient) PushKey(ctx context.Context,
protocolVersion loopdb.ProtocolVersion, swapHash lntypes.Hash,
clientInternalPrivateKey [32]byte) error {
req := &looprpc.ServerPushKeyReq{
ProtocolVersion: looprpc.ProtocolVersion(protocolVersion),
SwapHash: swapHash[:],
InternalPrivkey: clientInternalPrivateKey[:],
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
_, err := s.server.PushKey(rpcCtx, req)
return err
}
func rpcRouteCancel(details *outCancelDetails) (
*looprpc.CancelLoopOutSwapRequest_RouteCancel, error) {
attempts := make(
[]*looprpc.HtlcAttempt, len(details.metadata.attempts),
)
for i, remaining := range details.metadata.attempts {
attempts[i] = &looprpc.HtlcAttempt{
RemainingHops: remaining,
}
}
resp := &looprpc.CancelLoopOutSwapRequest_RouteCancel{
RouteCancel: &looprpc.RouteCancel{
Attempts: attempts,
// We can cast our lnd failure reason to a loop payment
// failure reason because these values are copied 1:1
// from lnd.
Failure: looprpc.PaymentFailureReason(
details.metadata.failureReason,
),
},
}
switch details.metadata.paymentType {
case paymentTypePrepay:
resp.RouteCancel.RouteType = looprpc.RoutePaymentType_PREPAY_ROUTE
case paymentTypeInvoice:
resp.RouteCancel.RouteType = looprpc.RoutePaymentType_INVOICE_ROUTE
default:
return nil, fmt.Errorf("unknown payment type: %v",
details.metadata.paymentType)
}
return resp, nil
}
// getSwapServerConn returns a connection to the swap server. A non-empty
// proxyAddr indicates that a SOCKS proxy found at the address should be used to
// establish the connection.
func getSwapServerConn(address, proxyAddress string, insecure bool,
tlsPath string, interceptor *lsat.ClientInterceptor) (*grpc.ClientConn,
error) {
// Create a dial options array.
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(
interceptor.UnaryInterceptor,
),
grpc.WithStreamInterceptor(
interceptor.StreamInterceptor,
),
}
// There are three options to connect to a swap server, either insecure,
// using a self-signed certificate or with a certificate signed by a
// public CA.
switch {
case insecure:
opts = append(opts, grpc.WithInsecure())
case tlsPath != "":
// Load the specified TLS certificate and build
// transport credentials
creds, err := credentials.NewClientTLSFromFile(tlsPath, "")
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(creds))
default:
creds := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(creds))
}
// If a SOCKS proxy address was specified, then we should dial through
// it.
if proxyAddress != "" {
log.Infof("Proxying connection to %v over Tor SOCKS proxy %v",
address, proxyAddress)
torDialer := func(_ context.Context, addr string) (net.Conn, error) {
return tor.Dial(
addr, proxyAddress, false, false,
tor.DefaultConnTimeout,
)
}
opts = append(opts, grpc.WithContextDialer(torDialer))
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %v",
err)
}
return conn, nil
}
// isErrConClosing identifies whether we have received a "transport is closing"
// error from a grpc stream, indicating that the server has shutdown. We need
// to string match this error because ErrConnClosing is part of an internal
// grpc package, so cannot be used directly.
func isErrConClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "transport is closing")
}
type newLoopOutResponse struct {
swapInvoice string
prepayInvoice string
senderKey [33]byte
serverMessage string
}
type newLoopInResponse struct {
receiverKey [33]byte
receiverInternalKey [33]byte
expiry int32
serverMessage string
}