2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-16 00:12:52 +00:00
loop/cmd/loopd/daemon.go

195 lines
4.2 KiB
Go
Raw Normal View History

2019-03-06 20:13:50 +00:00
package main
import (
"context"
"errors"
2019-03-06 20:13:50 +00:00
"fmt"
"net"
2019-03-12 22:35:53 +00:00
"net/http"
2019-03-06 20:13:50 +00:00
"os"
"os/signal"
"runtime/pprof"
"sync"
"time"
2019-03-12 22:35:53 +00:00
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/looprpc"
2019-03-06 20:13:50 +00:00
"google.golang.org/grpc"
)
2019-03-07 10:37:28 +00:00
// daemon runs loopd in daemon mode. It will listen for grpc connections,
2019-03-06 20:13:50 +00:00
// execute commands and pass back swap status information.
2019-03-07 10:37:28 +00:00
func daemon(config *config) error {
lnd, err := getLnd(config.Network, config.Lnd)
2019-03-06 20:13:50 +00:00
if err != nil {
return err
}
defer lnd.Close()
// If no swap server is specified, use the default addresses for mainnet
// and testnet.
if config.SwapServer == "" {
switch config.Network {
case "mainnet":
config.SwapServer = mainnetServer
case "testnet":
config.SwapServer = testnetServer
default:
return errors.New("no swap server address specified")
}
}
logger.Infof("Swap server address: %v", config.SwapServer)
// Create an instance of the loop client library.
2019-03-07 10:37:28 +00:00
swapClient, cleanup, err := getClient(
config.Network, config.SwapServer, config.Insecure,
&lnd.LndServices,
2019-03-07 10:37:28 +00:00
)
2019-03-06 20:13:50 +00:00
if err != nil {
return err
}
defer cleanup()
// Retrieve all currently existing swaps from the database.
swapsList, err := swapClient.FetchSwaps()
2019-03-06 20:13:50 +00:00
if err != nil {
return err
}
for _, s := range swapsList {
swaps[s.SwapHash] = *s
2019-03-12 15:10:37 +00:00
}
2019-03-07 10:37:28 +00:00
// Instantiate the loopd gRPC server.
2019-03-06 20:13:50 +00:00
server := swapClientServer{
impl: swapClient,
lnd: &lnd.LndServices,
}
serverOpts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(serverOpts...)
looprpc.RegisterSwapClientServer(grpcServer, &server)
2019-03-06 20:13:50 +00:00
2019-03-12 22:35:53 +00:00
// Next, start the gRPC server listening for HTTP/2 connections.
logger.Infof("Starting gRPC listener")
grpcListener, err := net.Listen("tcp", config.RPCListen)
2019-03-06 20:13:50 +00:00
if err != nil {
return fmt.Errorf("RPC server unable to listen on %s",
2019-03-12 22:35:53 +00:00
config.RPCListen)
2019-03-06 20:13:50 +00:00
}
2019-03-12 22:35:53 +00:00
defer grpcListener.Close()
// We'll also create and start an accompanying proxy to serve clients
// through REST.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := proxy.NewServeMux()
proxyOpts := []grpc.DialOption{grpc.WithInsecure()}
err = looprpc.RegisterSwapClientHandlerFromEndpoint(
ctx, mux, config.RPCListen, proxyOpts,
)
if err != nil {
return err
}
logger.Infof("Starting REST proxy listener")
restListener, err := net.Listen("tcp", config.RESTListen)
if err != nil {
return fmt.Errorf("REST proxy unable to listen on %s",
config.RESTListen)
}
defer restListener.Close()
proxy := &http.Server{Handler: mux}
go proxy.Serve(restListener)
2019-03-06 20:13:50 +00:00
statusChan := make(chan loop.SwapInfo)
2019-03-06 20:13:50 +00:00
mainCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Start the swap client itself.
wg.Add(1)
go func() {
defer wg.Done()
logger.Infof("Starting swap client")
err := swapClient.Run(mainCtx, statusChan)
if err != nil {
logger.Error(err)
}
logger.Infof("Swap client stopped")
logger.Infof("Stopping gRPC server")
grpcServer.Stop()
cancel()
}()
// Start a goroutine that broadcasts swap updates to clients.
wg.Add(1)
go func() {
defer wg.Done()
logger.Infof("Waiting for updates")
for {
select {
case swap := <-statusChan:
swapsLock.Lock()
swaps[swap.SwapHash] = swap
for _, subscriber := range subscribers {
select {
case subscriber <- swap:
case <-mainCtx.Done():
return
}
}
swapsLock.Unlock()
case <-mainCtx.Done():
return
}
}
}()
// Start the grpc server.
wg.Add(1)
go func() {
defer wg.Done()
2019-03-12 22:35:53 +00:00
logger.Infof("RPC server listening on %s", grpcListener.Addr())
logger.Infof("REST proxy listening on %s", restListener.Addr())
2019-03-06 20:13:50 +00:00
2019-03-12 22:35:53 +00:00
err = grpcServer.Serve(grpcListener)
2019-03-06 20:13:50 +00:00
if err != nil {
logger.Error(err)
}
}()
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Interrupt)
2019-03-07 10:37:28 +00:00
// Run until the users terminates loopd or an error occurred.
2019-03-06 20:13:50 +00:00
select {
case <-interruptChannel:
logger.Infof("Received SIGINT (Ctrl+C).")
// TODO: Remove debug code.
// Debug code to dump goroutines on hanging exit.
go func() {
time.Sleep(5 * time.Second)
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
}()
cancel()
case <-mainCtx.Done():
}
wg.Wait()
return nil
}