Cache failed routes, use context with timeouts

pull/1/head
rkfg 2 years ago
parent 0ead4ec4ee
commit a3ce2794d1

@ -9,8 +9,8 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
)
func (r *regolancer) getChannels() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
func (r *regolancer) getChannels(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
channels, err := r.lnClient.ListChannels(ctx, &lnrpc.ListChannelsRequest{ActiveOnly: true, PublicOnly: true})
if err != nil {
@ -49,11 +49,22 @@ func min(args ...int64) (result int64) {
return
}
func (r *regolancer) pickChannelPair(amount int64) (from uint64, to uint64, maxAmount int64) {
fromIdx := rand.Int31n(int32(len(r.fromChannels)))
toIdx := rand.Int31n(int32(len(r.toChannels)))
fromChan := r.fromChannels[fromIdx]
toChan := r.toChannels[toIdx]
func (r *regolancer) pickChannelPair(ctx context.Context, amount int64) (from uint64, to uint64, maxAmount int64, err error) {
var fromChan, toChan *lnrpc.Channel
for {
select {
case <-ctx.Done():
return 0, 0, 0, ctx.Err()
default:
}
fromIdx := rand.Int31n(int32(len(r.fromChannels)))
toIdx := rand.Int31n(int32(len(r.toChannels)))
fromChan = r.fromChannels[fromIdx]
toChan = r.toChannels[toIdx]
if !r.isFailedRoute(fromChan.ChanId, toChan.ChanId) {
break
}
}
maxFrom := fromChan.Capacity/2 - fromChan.RemoteBalance
maxTo := toChan.Capacity/2 - toChan.LocalBalance
if amount == 0 {
@ -61,5 +72,5 @@ func (r *regolancer) pickChannelPair(amount int64) (from uint64, to uint64, maxA
} else {
maxAmount = min(maxFrom, maxTo, amount)
}
return fromChan.ChanId, toChan.ChanId, maxAmount
return fromChan.ChanId, toChan.ChanId, maxAmount, nil
}

@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"os"
@ -14,6 +15,8 @@ import (
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
)
var ErrRepeat = fmt.Errorf("repeat")
var mainParams struct {
Config string `short:"f" long:"config" description:"config file path"`
}
@ -42,7 +45,7 @@ type regolancer struct {
toChannels []*lnrpc.Channel
nodeCache map[string]*lnrpc.NodeInfo
chanCache map[uint64]*lnrpc.ChannelEdge
ignoredPairs []*lnrpc.NodePair
failureCache map[string]*time.Time
}
func loadConfig() {
@ -62,6 +65,58 @@ func loadConfig() {
}
}
func tryRebalance(ctx context.Context, r *regolancer, invoice **lnrpc.AddInvoiceResponse, attempt *int) error {
attemptCtx, attemptCancel := context.WithTimeout(ctx, time.Minute*30)
defer attemptCancel()
from, to, amt, err := r.pickChannelPair(attemptCtx, params.Amount)
if err != nil {
log.Printf("Error during picking channel: %s", err)
return ErrRepeat
}
if params.Amount == 0 || *invoice == nil {
*invoice, err = r.createInvoice(attemptCtx, from, to, amt)
if err != nil {
log.Print("Error creating invoice: ", err)
return ErrRepeat
}
}
routes, fee, err := r.getRoutes(attemptCtx, from, to, amt*1000, params.EconRatio)
if err != nil {
r.addFailedRoute(from, to)
return ErrRepeat
}
for _, route := range routes {
log.Printf("Attempt %s, amount: %s (max fee: %s)", hiWhiteColorF("#%d", *attempt),
hiWhiteColor(amt), hiWhiteColor(fee/1000))
r.printRoute(attemptCtx, route)
err = r.pay(attemptCtx, *invoice, amt, route, params.ProbeSteps)
if err == nil {
return nil
}
if retryErr, ok := err.(ErrRetry); ok {
amt = retryErr.amount
log.Printf("Trying to rebalance again with %s", hiWhiteColor(amt))
probedInvoice, err := r.createInvoice(attemptCtx, from, to, amt)
if err != nil {
log.Print("Error creating invoice: ", err)
return ErrRepeat
}
if err != nil {
log.Printf("Error rebuilding the route for probed payment: %s", errColor(err))
} else {
err = r.pay(attemptCtx, probedInvoice, amt, retryErr.route, 0)
if err == nil {
return nil
} else {
log.Printf("Probed rebalance failed with error: %s", errColor(err))
}
}
}
*attempt++
}
return ErrRepeat
}
func main() {
rand.Seed(time.Now().UnixNano())
loadConfig()
@ -96,15 +151,22 @@ func main() {
if err != nil {
log.Fatal(err)
}
r := regolancer{nodeCache: map[string]*lnrpc.NodeInfo{}, chanCache: map[uint64]*lnrpc.ChannelEdge{}}
r := regolancer{
nodeCache: map[string]*lnrpc.NodeInfo{},
chanCache: map[uint64]*lnrpc.ChannelEdge{},
failureCache: map[string]*time.Time{},
}
r.lnClient = lnrpc.NewLightningClient(conn)
r.routerClient = routerrpc.NewRouterClient(conn)
info, err := r.lnClient.GetInfo(context.Background(), &lnrpc.GetInfoRequest{})
mainCtx, cancel := context.WithTimeout(context.Background(), time.Hour*6)
defer cancel()
infoCtx, infoCancel := context.WithTimeout(mainCtx, time.Second*30)
info, err := r.lnClient.GetInfo(infoCtx, &lnrpc.GetInfoRequest{})
if err != nil {
log.Fatal(err)
}
r.myPK = info.IdentityPubkey
err = r.getChannels()
err = r.getChannels(infoCtx)
if err != nil {
log.Fatal("Error listing own channels: ", err)
}
@ -118,47 +180,22 @@ func main() {
if len(r.toChannels) == 0 {
log.Fatal("No target channels selected")
}
infoCancel()
var invoice *lnrpc.AddInvoiceResponse
attempt := 1
for {
from, to, amt := r.pickChannelPair(params.Amount)
if params.Amount == 0 || invoice == nil {
invoice, err = r.createInvoice(from, to, amt)
if err != nil {
log.Fatal("Error creating invoice: ", err)
}
select {
case <-mainCtx.Done():
log.Println("Rebalancing timed out")
return
default:
}
routes, fee, err := r.getRoutes(from, to, amt*1000, params.EconRatio)
if err != nil {
continue
err = tryRebalance(mainCtx, &r, &invoice, &attempt)
if err == nil {
return
}
for _, route := range routes {
log.Printf("Attempt %s, amount: %s (max fee: %s)", hiWhiteColorF("#%d", attempt),
hiWhiteColor(amt), hiWhiteColor(fee/1000))
r.printRoute(route)
err = r.pay(invoice, amt, route, params.ProbeSteps)
if err == nil {
return
}
if retryErr, ok := err.(ErrRetry); ok {
amt = retryErr.amount
log.Printf("Trying to rebalance again with %s", hiWhiteColor(amt))
probedInvoice, err := r.createInvoice(from, to, amt)
if err != nil {
log.Fatal("Error creating invoice: ", err)
}
if err != nil {
log.Printf("Error rebuilding the route for probed payment: %s", errColor(err))
} else {
err = r.pay(probedInvoice, amt, retryErr.route, 0)
if err == nil {
return
} else {
log.Printf("Probed rebalance failed with error: %s", errColor(err))
}
}
}
attempt++
if err != ErrRepeat {
log.Println(err)
}
}
}

@ -21,19 +21,19 @@ func (e ErrRetry) Error() string {
var ErrProbeFailed = fmt.Errorf("probe failed")
func (r *regolancer) createInvoice(from, to uint64, amount int64) (*lnrpc.AddInvoiceResponse, error) {
return r.lnClient.AddInvoice(context.Background(), &lnrpc.Invoice{Value: amount,
func (r *regolancer) createInvoice(ctx context.Context, from, to uint64, amount int64) (*lnrpc.AddInvoiceResponse, error) {
return r.lnClient.AddInvoice(ctx, &lnrpc.Invoice{Value: amount,
Memo: fmt.Sprintf("Rebalance %d ⇒ %d", from, to),
Expiry: int64(time.Hour.Seconds() * 24)})
}
func (r *regolancer) pay(invoice *lnrpc.AddInvoiceResponse, amount int64, route *lnrpc.Route, probeSteps int) error {
func (r *regolancer) pay(ctx context.Context, invoice *lnrpc.AddInvoiceResponse, amount int64, route *lnrpc.Route, probeSteps int) error {
lastHop := route.Hops[len(route.Hops)-1]
lastHop.MppRecord = &lnrpc.MPPRecord{
PaymentAddr: invoice.PaymentAddr,
TotalAmtMsat: amount * 1000,
}
result, err := r.routerClient.SendToRouteV2(context.Background(),
result, err := r.routerClient.SendToRouteV2(ctx,
&routerrpc.SendToRouteRequest{
PaymentHash: invoice.RHash,
Route: route,
@ -42,7 +42,9 @@ func (r *regolancer) pay(invoice *lnrpc.AddInvoiceResponse, amount int64, route
return err
}
if result.Status == lnrpc.HTLCAttempt_FAILED {
node1, err := r.getNodeInfo(route.Hops[result.Failure.FailureSourceIndex-1].PubKey)
nodeCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
node1, err := r.getNodeInfo(nodeCtx, route.Hops[result.Failure.FailureSourceIndex-1].PubKey)
node1name := ""
node2name := ""
if err != nil {
@ -50,7 +52,7 @@ func (r *regolancer) pay(invoice *lnrpc.AddInvoiceResponse, amount int64, route
} else {
node1name = node1.Node.Alias
}
node2, err := r.getNodeInfo(route.Hops[result.Failure.FailureSourceIndex].PubKey)
node2, err := r.getNodeInfo(nodeCtx, route.Hops[result.Failure.FailureSourceIndex].PubKey)
if err != nil {
node2name = fmt.Sprintf("node%d", result.Failure.FailureSourceIndex)
} else {
@ -60,7 +62,7 @@ func (r *regolancer) pay(invoice *lnrpc.AddInvoiceResponse, amount int64, route
cyanColor(node1name), cyanColor(node2name))
if int(result.Failure.FailureSourceIndex) == len(route.Hops)-2 && probeSteps > 0 {
fmt.Println("Probing route...")
maxAmount, goodRoute, err := r.probeRoute(route, 0, amount, amount/2, probeSteps)
maxAmount, goodRoute, err := r.probeRoute(ctx, route, 0, amount, amount/2, probeSteps)
if err != nil {
return err
}

@ -28,32 +28,10 @@ func (r *regolancer) getChanInfo(ctx context.Context, chanId uint64) (*lnrpc.Cha
return c, nil
}
// currently unused
func (r *regolancer) buildIgnoredPairs(channels []uint64) error {
for _, chanId := range channels {
c, err := r.getChanInfo(context.Background(), chanId)
if err != nil {
return err
}
node1pk, err := hex.DecodeString(c.Node1Pub)
if err != nil {
return err
}
node2pk, err := hex.DecodeString(c.Node2Pub)
if err != nil {
return err
}
pair1 := lnrpc.NodePair{From: node1pk, To: node2pk}
pair2 := lnrpc.NodePair{From: node2pk, To: node1pk}
r.ignoredPairs = append(r.ignoredPairs, &pair1, &pair2)
}
return nil
}
func (r *regolancer) getRoutes(from, to uint64, amtMsat int64, ratio float64) ([]*lnrpc.Route, int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
func (r *regolancer) getRoutes(ctx context.Context, from, to uint64, amtMsat int64, ratio float64) ([]*lnrpc.Route, int64, error) {
routeCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
c, err := r.getChanInfo(ctx, to)
c, err := r.getChanInfo(routeCtx, to)
if err != nil {
return nil, 0, err
}
@ -68,7 +46,7 @@ func (r *regolancer) getRoutes(from, to uint64, amtMsat int64, ratio float64) ([
if err != nil {
return nil, 0, err
}
routes, err := r.lnClient.QueryRoutes(ctx, &lnrpc.QueryRoutesRequest{
routes, err := r.lnClient.QueryRoutes(routeCtx, &lnrpc.QueryRoutesRequest{
PubKey: r.myPK,
OutgoingChanId: from,
LastHopPubkey: lastPK,
@ -82,25 +60,25 @@ func (r *regolancer) getRoutes(from, to uint64, amtMsat int64, ratio float64) ([
return routes.Routes, feeMsat, nil
}
func (r *regolancer) getNodeInfo(pk string) (*lnrpc.NodeInfo, error) {
func (r *regolancer) getNodeInfo(ctx context.Context, pk string) (*lnrpc.NodeInfo, error) {
if nodeInfo, ok := r.nodeCache[pk]; ok {
return nodeInfo, nil
}
nodeInfo, err := r.lnClient.GetNodeInfo(context.Background(), &lnrpc.NodeInfoRequest{PubKey: pk})
nodeInfo, err := r.lnClient.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{PubKey: pk})
if err == nil {
r.nodeCache[pk] = nodeInfo
}
return nodeInfo, err
}
func (r *regolancer) printRoute(route *lnrpc.Route) {
func (r *regolancer) printRoute(ctx context.Context, route *lnrpc.Route) {
if len(route.Hops) == 0 {
return
}
errs := ""
fmt.Printf("%s %s\n", faintWhiteColor("Total fee:"), hiWhiteColor(route.TotalFeesMsat/1000))
for i, hop := range route.Hops {
nodeInfo, err := r.getNodeInfo(hop.PubKey)
nodeInfo, err := r.getNodeInfo(ctx, hop.PubKey)
if err != nil {
errs = errs + err.Error() + "\n"
continue
@ -116,13 +94,13 @@ func (r *regolancer) printRoute(route *lnrpc.Route) {
}
}
func (r *regolancer) rebuildRoute(route *lnrpc.Route, amount int64) (*lnrpc.Route, error) {
func (r *regolancer) rebuildRoute(ctx context.Context, route *lnrpc.Route, amount int64) (*lnrpc.Route, error) {
pks := [][]byte{}
for _, h := range route.Hops {
pk, _ := hex.DecodeString(h.PubKey)
pks = append(pks, pk)
}
resultRoute, err := r.routerClient.BuildRoute(context.Background(), &routerrpc.BuildRouteRequest{
resultRoute, err := r.routerClient.BuildRoute(ctx, &routerrpc.BuildRouteRequest{
AmtMsat: amount * 1000,
OutgoingChanId: route.Hops[0].ChanId,
HopPubkeys: pks,
@ -134,15 +112,15 @@ func (r *regolancer) rebuildRoute(route *lnrpc.Route, amount int64) (*lnrpc.Rout
return resultRoute.Route, err
}
func (r *regolancer) probeRoute(route *lnrpc.Route, goodAmount, badAmount, amount int64, steps int) (maxAmount int64,
func (r *regolancer) probeRoute(ctx context.Context, route *lnrpc.Route, goodAmount, badAmount, amount int64, steps int) (maxAmount int64,
goodRoute *lnrpc.Route, err error) {
goodRoute, err = r.rebuildRoute(route, amount)
goodRoute, err = r.rebuildRoute(ctx, route, amount)
if err != nil {
return 0, nil, err
}
fakeHash := make([]byte, 32)
rand.Read(fakeHash)
result, err := r.routerClient.SendToRouteV2(context.Background(),
result, err := r.routerClient.SendToRouteV2(ctx,
&routerrpc.SendToRouteRequest{
PaymentHash: fakeHash,
Route: goodRoute,
@ -162,7 +140,7 @@ func (r *regolancer) probeRoute(route *lnrpc.Route, goodAmount, badAmount, amoun
nextAmount := amount + (badAmount-amount)/2
log.Printf("%s is good enough, trying amount %s, %s steps left",
hiWhiteColor(amount), hiWhiteColor(nextAmount), hiWhiteColor(steps-1))
return r.probeRoute(route, amount, badAmount, nextAmount, steps-1)
return r.probeRoute(ctx, route, amount, badAmount, nextAmount, steps-1)
}
if result.Failure.Code == lnrpc.Failure_TEMPORARY_CHANNEL_FAILURE {
if steps == 1 {
@ -176,12 +154,27 @@ func (r *regolancer) probeRoute(route *lnrpc.Route, goodAmount, badAmount, amoun
nextAmount := amount + (goodAmount-amount)/2
log.Printf("%s is too much, lowering amount to %s, %s steps left",
hiWhiteColor(amount), hiWhiteColor(nextAmount), hiWhiteColor(steps-1))
return r.probeRoute(route, goodAmount, amount, nextAmount, steps-1)
return r.probeRoute(ctx, route, goodAmount, amount, nextAmount, steps-1)
}
if result.Failure.Code == lnrpc.Failure_FEE_INSUFFICIENT {
log.Printf("Fee insufficient, retrying...")
return r.probeRoute(route, goodAmount, badAmount, amount, steps)
return r.probeRoute(ctx, route, goodAmount, badAmount, amount, steps)
}
}
return 0, nil, fmt.Errorf("unknown error: %+v", result)
}
func (r *regolancer) addFailedRoute(from, to uint64) {
t := time.Now().Add(time.Hour)
r.failureCache[fmt.Sprintf("%d-%d", from, to)] = &t
for k, v := range r.failureCache {
if v.After(time.Now()) {
delete(r.failureCache, k)
}
}
}
func (r *regolancer) isFailedRoute(from, to uint64) bool {
_, ok := r.failureCache[fmt.Sprintf("%d-%d", from, to)]
return ok
}

Loading…
Cancel
Save