2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-08 01:10:29 +00:00
loop/swap_server_client.go
Oliver Gugger 27fd6b3304
multi: use aperture instead of local lsat library
Since the lsat code was copied over to aperture a while ago for other
projects to use as well, we should migrate over to that shared code
also. This will give us the recently added "remove token if payment
failed and try again" feature for free along with some other small
updates to the lsat package.
2021-04-29 10:01:15 +02:00

544 lines
15 KiB
Go

package loop
import (
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/aperture/lsat"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/looprpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/tor"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var (
// errServerSubscriptionComplete is returned when our subscription to
// server updates exits because the server has no more updates to
// provide us, because its part in the swap is complete.
errServerSubscriptionComplete = errors.New("server finished serving " +
"updates")
// errSubscriptionFailed is returned when our subscription returns with
// and EOF, indicating that the server restarted, we had an unexpected
// network failure. Since we do not have restart-recovery, we note that
// we will not resume our subscription once this error occurs.
errSubscriptionFailed = errors.New("failed, no further updates will " +
"be provided")
)
type swapServerClient interface {
GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error)
GetLoopOutQuote(ctx context.Context, amt btcutil.Amount, expiry int32,
swapPublicationDeadline time.Time) (
*LoopOutQuote, error)
GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error)
GetLoopInQuote(ctx context.Context, amt btcutil.Amount) (
*LoopInQuote, error)
NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error)
PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error
NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount,
senderKey [33]byte, swapInvoice, probeInvoice string,
lastHop *route.Vertex, initiator string) (*newLoopInResponse,
error)
// SubscribeLoopOutUpdates subscribes to loop out server state.
SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
// SubscribeLoopInUpdates subscribes to loop in server state.
SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
}
type grpcSwapServerClient struct {
server looprpc.SwapServerClient
conn *grpc.ClientConn
wg sync.WaitGroup
}
// stop sends the signal for the server's goroutines to shutdown and waits for
// them to complete.
func (s *grpcSwapServerClient) stop() {
if err := s.conn.Close(); err != nil {
log.Warnf("could not close connection: %v", err)
}
s.wg.Wait()
}
var _ swapServerClient = (*grpcSwapServerClient)(nil)
func newSwapServerClient(cfg *ClientConfig, lsatStore lsat.Store) (
*grpcSwapServerClient, error) {
// Create the server connection with the interceptor that will handle
// the LSAT protocol for us.
clientInterceptor := lsat.NewInterceptor(
cfg.Lnd, lsatStore, serverRPCTimeout, cfg.MaxLsatCost,
cfg.MaxLsatFee, false,
)
serverConn, err := getSwapServerConn(
cfg.ServerAddress, cfg.ProxyAddress, cfg.SwapServerNoTLS,
cfg.TLSPathServer, clientInterceptor,
)
if err != nil {
return nil, err
}
server := looprpc.NewSwapServerClient(serverConn)
return &grpcSwapServerClient{
conn: serverConn,
server: server,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopOutTerms(rpcCtx,
&looprpc.ServerLoopOutTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopOutTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
MinCltvDelta: terms.MinCltvDelta,
MaxCltvDelta: terms.MaxCltvDelta,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutQuote(ctx context.Context,
amt btcutil.Amount, expiry int32, swapPublicationDeadline time.Time) (
*LoopOutQuote, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
quoteResp, err := s.server.LoopOutQuote(rpcCtx,
&looprpc.ServerLoopOutQuoteRequest{
Amt: uint64(amt),
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Expiry: expiry,
},
)
if err != nil {
return nil, err
}
dest, err := hex.DecodeString(quoteResp.SwapPaymentDest)
if err != nil {
return nil, err
}
if len(dest) != 33 {
return nil, errors.New("invalid payment dest")
}
var destArray [33]byte
copy(destArray[:], dest)
return &LoopOutQuote{
PrepayAmount: btcutil.Amount(quoteResp.PrepayAmt),
SwapFee: btcutil.Amount(quoteResp.SwapFee),
SwapPaymentDest: destArray,
}, nil
}
func (s *grpcSwapServerClient) GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopInTerms(rpcCtx,
&looprpc.ServerLoopInTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopInTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
}, nil
}
func (s *grpcSwapServerClient) GetLoopInQuote(ctx context.Context,
amt btcutil.Amount) (*LoopInQuote, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
quoteResp, err := s.server.LoopInQuote(rpcCtx,
&looprpc.ServerLoopInQuoteRequest{
Amt: uint64(amt),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopInQuote{
SwapFee: btcutil.Amount(quoteResp.SwapFee),
CltvDelta: quoteResp.CltvDelta,
}, nil
}
func (s *grpcSwapServerClient) NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
swapResp, err := s.server.NewLoopOutSwap(rpcCtx,
&looprpc.ServerLoopOutRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
ReceiverKey: receiverKey[:],
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Expiry: expiry,
UserAgent: UserAgent(initiator),
},
)
if err != nil {
return nil, err
}
var senderKey [33]byte
copy(senderKey[:], swapResp.SenderKey)
// Validate sender key.
_, err = btcec.ParsePubKey(senderKey[:], btcec.S256())
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopOutResponse{
swapInvoice: swapResp.SwapInvoice,
prepayInvoice: swapResp.PrepayInvoice,
senderKey: senderKey,
serverMessage: swapResp.ServerMessage,
}, nil
}
// PushLoopOutPreimage pushes a preimage to the server.
func (s *grpcSwapServerClient) PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
_, err := s.server.LoopOutPushPreimage(rpcCtx,
&looprpc.ServerLoopOutPushPreimageRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Preimage: preimage[:],
},
)
return err
}
func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, senderKey [33]byte,
swapInvoice, probeInvoice string, lastHop *route.Vertex,
initiator string) (*newLoopInResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
req := &looprpc.ServerLoopInRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
SenderKey: senderKey[:],
SwapInvoice: swapInvoice,
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
ProbeInvoice: probeInvoice,
UserAgent: UserAgent(initiator),
}
if lastHop != nil {
req.LastHop = lastHop[:]
}
swapResp, err := s.server.NewLoopInSwap(rpcCtx, req)
if err != nil {
return nil, err
}
var receiverKey [33]byte
copy(receiverKey[:], swapResp.ReceiverKey)
// Validate receiver key.
_, err = btcec.ParsePubKey(receiverKey[:], btcec.S256())
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopInResponse{
receiverKey: receiverKey,
expiry: swapResp.Expiry,
serverMessage: swapResp.ServerMessage,
}, nil
}
// ServerUpdate summarizes an update from the swap server.
type ServerUpdate struct {
// State is the state that the server has sent us.
State looprpc.ServerSwapState
// Timestamp is the time of the server state update.
Timestamp time.Time
}
// SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopInUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopOutUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// makeServerUpdate takes a stream receive function and a channel that it
// should pipe updates into. It sends events into the updates channel until
// the client cancels, server client shuts down or the subscription is cancelled
// server side.
func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
<-chan error) {
// We will return exactly one error from this function so we buffer
// our error channel so that the function exit is not dependent on
// the error being read.
errChan := make(chan error, 1)
updateChan := make(chan *ServerUpdate)
// Create a goroutine that will pipe updates in to our updates channel.
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
// Try to receive from our stream. If there are no items
// to consume, this call will block. If our stream is
// cancelled by the server we will receive an error.
response, err := receive()
switch err {
// If we get a nil error, we proceed with to delivering
// the update we have just received.
case nil:
// If we get an EOF error, the server is finished
// sending us updates, so we return with a non-nil
// a subscription complete error to inform the caller
// that they will no longer receive updates.
case io.EOF:
errChan <- errServerSubscriptionComplete
return
// If we receive a non-nil error, we exit.
default:
// If we get a transport is closing error, we
// send a server restarting error so that the
// caller is informed that we will not get
// any more updates from the server (since we
// don't have retry logic yet).
if isErrConClosing(err) {
errChan <- errSubscriptionFailed
} else {
errChan <- err
}
return
}
select {
// Try to send our update to the update channel.
case updateChan <- response:
// If the client cancels their context, we exit with
// no error.
case <-ctx.Done():
errChan <- nil
return
}
}
}()
return updateChan, errChan
}
// getSwapServerConn returns a connection to the swap server. A non-empty
// proxyAddr indicates that a SOCKS proxy found at the address should be used to
// establish the connection.
func getSwapServerConn(address, proxyAddress string, insecure bool,
tlsPath string, interceptor *lsat.ClientInterceptor) (*grpc.ClientConn,
error) {
// Create a dial options array.
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(
interceptor.UnaryInterceptor,
),
grpc.WithStreamInterceptor(
interceptor.StreamInterceptor,
),
}
// There are three options to connect to a swap server, either insecure,
// using a self-signed certificate or with a certificate signed by a
// public CA.
switch {
case insecure:
opts = append(opts, grpc.WithInsecure())
case tlsPath != "":
// Load the specified TLS certificate and build
// transport credentials
creds, err := credentials.NewClientTLSFromFile(tlsPath, "")
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(creds))
default:
creds := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(creds))
}
// If a SOCKS proxy address was specified, then we should dial through
// it.
if proxyAddress != "" {
log.Infof("Proxying connection to %v over Tor SOCKS proxy %v",
address, proxyAddress)
torDialer := func(_ context.Context, addr string) (net.Conn, error) {
return tor.Dial(
addr, proxyAddress, false,
tor.DefaultConnTimeout,
)
}
opts = append(opts, grpc.WithContextDialer(torDialer))
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %v",
err)
}
return conn, nil
}
// isErrConClosing identifies whether we have received a "transport is closing"
// error from a grpc stream, indicating that the server has shutdown. We need
// to string match this error because ErrConnClosing is part of an internal
// grpc package, so cannot be used directly.
func isErrConClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "transport is closing")
}
type newLoopOutResponse struct {
swapInvoice string
prepayInvoice string
senderKey [33]byte
serverMessage string
}
type newLoopInResponse struct {
receiverKey [33]byte
expiry int32
serverMessage string
}