2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-11 13:11:12 +00:00
loop/swap_server_client.go
2021-01-07 17:33:21 +01:00

543 lines
15 KiB
Go

package loop
import (
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/lsat"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/tor"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var (
// errServerSubscriptionComplete is returned when our subscription to
// server updates exits because the server has no more updates to
// provide us, because its part in the swap is complete.
errServerSubscriptionComplete = errors.New("server finished serving " +
"updates")
// errSubscriptionFailed is returned when our subscription returns with
// and EOF, indicating that the server restarted, we had an unexpected
// network failure. Since we do not have restart-recovery, we note that
// we will not resume our subscription once this error occurs.
errSubscriptionFailed = errors.New("failed, no further updates will " +
"be provided")
)
type swapServerClient interface {
GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error)
GetLoopOutQuote(ctx context.Context, amt btcutil.Amount, expiry int32,
swapPublicationDeadline time.Time) (
*LoopOutQuote, error)
GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error)
GetLoopInQuote(ctx context.Context, amt btcutil.Amount) (
*LoopInQuote, error)
NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error)
PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error
NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount,
senderKey [33]byte, swapInvoice, probeInvoice string,
lastHop *route.Vertex, initiator string) (*newLoopInResponse,
error)
// SubscribeLoopOutUpdates subscribes to loop out server state.
SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
// SubscribeLoopInUpdates subscribes to loop in server state.
SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
}
type grpcSwapServerClient struct {
server looprpc.SwapServerClient
conn *grpc.ClientConn
wg sync.WaitGroup
}
// stop sends the signal for the server's goroutines to shutdown and waits for
// them to complete.
func (s *grpcSwapServerClient) stop() {
if err := s.conn.Close(); err != nil {
log.Warnf("could not close connection: %v", err)
}
s.wg.Wait()
}
var _ swapServerClient = (*grpcSwapServerClient)(nil)
func newSwapServerClient(cfg *ClientConfig, lsatStore lsat.Store) (
*grpcSwapServerClient, error) {
// Create the server connection with the interceptor that will handle
// the LSAT protocol for us.
clientInterceptor := lsat.NewInterceptor(
cfg.Lnd, lsatStore, serverRPCTimeout, cfg.MaxLsatCost,
cfg.MaxLsatFee,
)
serverConn, err := getSwapServerConn(
cfg.ServerAddress, cfg.ProxyAddress, cfg.SwapServerNoTLS,
cfg.TLSPathServer, clientInterceptor,
)
if err != nil {
return nil, err
}
server := looprpc.NewSwapServerClient(serverConn)
return &grpcSwapServerClient{
conn: serverConn,
server: server,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopOutTerms(rpcCtx,
&looprpc.ServerLoopOutTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopOutTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
MinCltvDelta: terms.MinCltvDelta,
MaxCltvDelta: terms.MaxCltvDelta,
}, nil
}
func (s *grpcSwapServerClient) GetLoopOutQuote(ctx context.Context,
amt btcutil.Amount, expiry int32, swapPublicationDeadline time.Time) (
*LoopOutQuote, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
quoteResp, err := s.server.LoopOutQuote(rpcCtx,
&looprpc.ServerLoopOutQuoteRequest{
Amt: uint64(amt),
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Expiry: expiry,
},
)
if err != nil {
return nil, err
}
dest, err := hex.DecodeString(quoteResp.SwapPaymentDest)
if err != nil {
return nil, err
}
if len(dest) != 33 {
return nil, errors.New("invalid payment dest")
}
var destArray [33]byte
copy(destArray[:], dest)
return &LoopOutQuote{
PrepayAmount: btcutil.Amount(quoteResp.PrepayAmt),
SwapFee: btcutil.Amount(quoteResp.SwapFee),
SwapPaymentDest: destArray,
}, nil
}
func (s *grpcSwapServerClient) GetLoopInTerms(ctx context.Context) (
*LoopInTerms, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
terms, err := s.server.LoopInTerms(rpcCtx,
&looprpc.ServerLoopInTermsRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopInTerms{
MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
}, nil
}
func (s *grpcSwapServerClient) GetLoopInQuote(ctx context.Context,
amt btcutil.Amount) (*LoopInQuote, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
quoteResp, err := s.server.LoopInQuote(rpcCtx,
&looprpc.ServerLoopInQuoteRequest{
Amt: uint64(amt),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
},
)
if err != nil {
return nil, err
}
return &LoopInQuote{
SwapFee: btcutil.Amount(quoteResp.SwapFee),
CltvDelta: quoteResp.CltvDelta,
}, nil
}
func (s *grpcSwapServerClient) NewLoopOutSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
receiverKey [33]byte, swapPublicationDeadline time.Time,
initiator string) (*newLoopOutResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
swapResp, err := s.server.NewLoopOutSwap(rpcCtx,
&looprpc.ServerLoopOutRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
ReceiverKey: receiverKey[:],
SwapPublicationDeadline: swapPublicationDeadline.Unix(),
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Expiry: expiry,
UserAgent: UserAgent(initiator),
},
)
if err != nil {
return nil, err
}
var senderKey [33]byte
copy(senderKey[:], swapResp.SenderKey)
// Validate sender key.
_, err = btcec.ParsePubKey(senderKey[:], btcec.S256())
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopOutResponse{
swapInvoice: swapResp.SwapInvoice,
prepayInvoice: swapResp.PrepayInvoice,
senderKey: senderKey,
serverMessage: swapResp.ServerMessage,
}, nil
}
// PushLoopOutPreimage pushes a preimage to the server.
func (s *grpcSwapServerClient) PushLoopOutPreimage(ctx context.Context,
preimage lntypes.Preimage) error {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
_, err := s.server.LoopOutPushPreimage(rpcCtx,
&looprpc.ServerLoopOutPushPreimageRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
Preimage: preimage[:],
},
)
return err
}
func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount, senderKey [33]byte,
swapInvoice, probeInvoice string, lastHop *route.Vertex,
initiator string) (*newLoopInResponse, error) {
rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
defer rpcCancel()
req := &looprpc.ServerLoopInRequest{
SwapHash: swapHash[:],
Amt: uint64(amount),
SenderKey: senderKey[:],
SwapInvoice: swapInvoice,
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
ProbeInvoice: probeInvoice,
UserAgent: UserAgent(initiator),
}
if lastHop != nil {
req.LastHop = lastHop[:]
}
swapResp, err := s.server.NewLoopInSwap(rpcCtx, req)
if err != nil {
return nil, err
}
var receiverKey [33]byte
copy(receiverKey[:], swapResp.ReceiverKey)
// Validate receiver key.
_, err = btcec.ParsePubKey(receiverKey[:], btcec.S256())
if err != nil {
return nil, fmt.Errorf("invalid sender key: %v", err)
}
return &newLoopInResponse{
receiverKey: receiverKey,
expiry: swapResp.Expiry,
serverMessage: swapResp.ServerMessage,
}, nil
}
// ServerUpdate summarizes an update from the swap server.
type ServerUpdate struct {
// State is the state that the server has sent us.
State looprpc.ServerSwapState
// Timestamp is the time of the server state update.
Timestamp time.Time
}
// SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopInUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopOutUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// makeServerUpdate takes a stream receive function and a channel that it
// should pipe updates into. It sends events into the updates channel until
// the client cancels, server client shuts down or the subscription is cancelled
// server side.
func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
<-chan error) {
// We will return exactly one error from this function so we buffer
// our error channel so that the function exit is not dependent on
// the error being read.
errChan := make(chan error, 1)
updateChan := make(chan *ServerUpdate)
// Create a goroutine that will pipe updates in to our updates channel.
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
// Try to receive from our stream. If there are no items
// to consume, this call will block. If our stream is
// cancelled by the server we will receive an error.
response, err := receive()
switch err {
// If we get a nil error, we proceed with to delivering
// the update we have just received.
case nil:
// If we get an EOF error, the server is finished
// sending us updates, so we return with a non-nil
// a subscription complete error to inform the caller
// that they will no longer receive updates.
case io.EOF:
errChan <- errServerSubscriptionComplete
return
// If we receive a non-nil error, we exit.
default:
// If we get a transport is closing error, we
// send a server restarting error so that the
// caller is informed that we will not get
// any more updates from the server (since we
// don't have retry logic yet).
if isErrConClosing(err) {
errChan <- errSubscriptionFailed
} else {
errChan <- err
}
return
}
select {
// Try to send our update to the update channel.
case updateChan <- response:
// If the client cancels their context, we exit with
// no error.
case <-ctx.Done():
errChan <- nil
return
}
}
}()
return updateChan, errChan
}
// getSwapServerConn returns a connection to the swap server. A non-empty
// proxyAddr indicates that a SOCKS proxy found at the address should be used to
// establish the connection.
func getSwapServerConn(address, proxyAddress string, insecure bool,
tlsPath string, interceptor *lsat.Interceptor) (*grpc.ClientConn, error) {
// Create a dial options array.
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(
interceptor.UnaryInterceptor,
),
grpc.WithStreamInterceptor(
interceptor.StreamInterceptor,
),
}
// There are three options to connect to a swap server, either insecure,
// using a self-signed certificate or with a certificate signed by a
// public CA.
switch {
case insecure:
opts = append(opts, grpc.WithInsecure())
case tlsPath != "":
// Load the specified TLS certificate and build
// transport credentials
creds, err := credentials.NewClientTLSFromFile(tlsPath, "")
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(creds))
default:
creds := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(creds))
}
// If a SOCKS proxy address was specified, then we should dial through
// it.
if proxyAddress != "" {
log.Infof("Proxying connection to %v over Tor SOCKS proxy %v",
address, proxyAddress)
torDialer := func(_ context.Context, addr string) (net.Conn, error) {
return tor.Dial(
addr, proxyAddress, false,
tor.DefaultConnTimeout,
)
}
opts = append(opts, grpc.WithContextDialer(torDialer))
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %v",
err)
}
return conn, nil
}
// isErrConClosing identifies whether we have received a "transport is closing"
// error from a grpc stream, indicating that the server has shutdown. We need
// to string match this error because ErrConnClosing is part of an internal
// grpc package, so cannot be used directly.
func isErrConClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "transport is closing")
}
type newLoopOutResponse struct {
swapInvoice string
prepayInvoice string
senderKey [33]byte
serverMessage string
}
type newLoopInResponse struct {
receiverKey [33]byte
expiry int32
serverMessage string
}