diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 0c308c4..1f444d5 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -138,8 +138,14 @@ type batchConfig struct { // clock provides methods to work with time and timers. clock clock.Clock - // batchPublishDelay is the delay between receiving a new block and - // publishing the batch transaction. + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). + initialDelay time.Duration + + // batchPublishDelay is the delay between receiving a new block or + // initial delay completion and publishing the batch transaction. batchPublishDelay time.Duration // noBumping instructs sweepbatcher not to fee bump itself and rely on @@ -511,6 +517,11 @@ func (b *batch) Wait() { <-b.finished } +// stillWaitingMsg is the format of the message printed if the batch is about +// to publish, but initial delay has not ended yet. +const stillWaitingMsg = "Skipping publishing, initial delay will end at " + + "%v, now is %v." + // Run is the batch's main event loop. func (b *batch) Run(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) @@ -550,10 +561,25 @@ func (b *batch) Run(ctx context.Context) error { } } + // skipBefore is the time before which we skip batch publishing. + // This is needed to facilitate better grouping of sweeps. + // For batches loaded from DB initialDelay should be 0. + skipBefore := clock.Now().Add(b.cfg.initialDelay) + + // initialDelayChan is a timer which fires upon initial delay end. + // If initialDelay is 0, it does not fire to prevent race with + // blockChan which also fires immediately with current tip. Such a race + // may result in double publishing if batchPublishDelay is also 0. + var initialDelayChan <-chan time.Time + if b.cfg.initialDelay > 0 { + initialDelayChan = clock.TickAfter(b.cfg.initialDelay) + } + // We use a timer in order to not publish new transactions at the same // time as the block epoch notification. This is done to prevent // unnecessary transaction publishments when a spend is detected on that - // block. + // block. This timer starts after new block arrives or initialDelay + // completes. var timerChan <-chan time.Time b.log.Infof("started, primary %x, total sweeps %v", @@ -564,6 +590,7 @@ func (b *batch) Run(ctx context.Context) error { case <-b.callEnter: <-b.callLeave + // blockChan provides immediately the current tip. case height := <-blockChan: b.log.Debugf("received block %v", height) @@ -572,12 +599,39 @@ func (b *batch) Run(ctx context.Context) error { timerChan = clock.TickAfter(b.cfg.batchPublishDelay) b.currentHeight = height + case <-initialDelayChan: + b.log.Debugf("initial delay of duration %v has ended", + b.cfg.initialDelay) + + // Set the timer to publish the batch transaction after + // the configured delay. + timerChan = clock.TickAfter(b.cfg.batchPublishDelay) + case <-timerChan: - if b.state == Open { - err := b.publish(ctx) - if err != nil { - return err - } + // Check that batch is still open. + if b.state != Open { + b.log.Debugf("Skipping publishing, because the"+ + "batch is not open (%v).", b.state) + continue + } + + // If the batch became urgent, skipBefore is set to now. + if b.isUrgent(skipBefore) { + skipBefore = clock.Now() + } + + // Check that the initial delay has ended. We have also + // batchPublishDelay on top of initialDelay, so if + // initialDelayChan has just fired, this check passes. + now := clock.Now() + if skipBefore.After(now) { + b.log.Debugf(stillWaitingMsg, skipBefore, now) + continue + } + + err := b.publish(ctx) + if err != nil { + return err } case spend := <-b.spendChan: @@ -611,6 +665,57 @@ func (b *batch) Run(ctx context.Context) error { } } +// timeout returns minimum timeout as block height among sweeps of the batch. +// If the batch is empty, return -1. +func (b *batch) timeout() int32 { + // Find minimum among sweeps' timeouts. + minTimeout := int32(-1) + for _, sweep := range b.sweeps { + if minTimeout == -1 || minTimeout > sweep.timeout { + minTimeout = sweep.timeout + } + } + + return minTimeout +} + +// isUrgent checks if the batch became urgent. This is determined by comparing +// the remaining number of blocks until timeout to the initial delay remained, +// given one block is 10 minutes. +func (b *batch) isUrgent(skipBefore time.Time) bool { + timeout := b.timeout() + if timeout <= 0 { + b.log.Warnf("Method timeout() returned %v. Number of"+ + " sweeps: %d. It may be an empty batch.", + timeout, len(b.sweeps)) + return false + } + + if b.currentHeight == 0 { + // currentHeight is not initiated yet. + return false + } + + blocksToTimeout := timeout - b.currentHeight + const blockTime = 10 * time.Minute + timeBank := time.Duration(blocksToTimeout) * blockTime + + // We want to have at least 2x as much time to be safe. + const safetyFactor = 2 + remainingWaiting := skipBefore.Sub(b.cfg.clock.Now()) + + if timeBank >= safetyFactor*remainingWaiting { + // There is enough time, keep waiting. + return false + } + + b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+ + "remainingWaiting is %v)", timeBank, remainingWaiting) + + // Signal to the caller to cancel initialDelay. + return true +} + // publish creates and publishes the latest batch transaction to the network. func (b *batch) publish(ctx context.Context) error { var ( diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index a2ee99f..5f1a113 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -257,6 +257,14 @@ type Batcher struct { // clock provides methods to work with time and timers. clock clock.Clock + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). If a sweep is about to expire + // (time until timeout is less that 2x initialDelay), then waiting is + // skipped. + initialDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -274,6 +282,14 @@ type BatcherConfig struct { // clock provides methods to work with time and timers. clock clock.Clock + // initialDelay is the delay of first batch publishing after creation. + // It only affects newly created batches, not batches loaded from DB, + // so publishing does happen in case of a daemon restart (especially + // important in case of a crashloop). If a sweep is about to expire + // (time until timeout is less that 2x initialDelay), then waiting is + // skipped. + initialDelay time.Duration + // customFeeRate provides custom min fee rate per swap. The batch uses // max of the fee rates of its swaps. In this mode confTarget is // ignored and fee bumping by sweepbatcher is disabled. @@ -297,6 +313,17 @@ func WithClock(clock clock.Clock) BatcherOption { } } +// WithInitialDelay instructs sweepbatcher to wait for the duration provided +// after new batch creation before it is first published. This facilitates +// better grouping. Defaults to 0s (no initial delay). If a sweep is about +// to expire (time until timeout is less that 2x initialDelay), then waiting +// is skipped. +func WithInitialDelay(initialDelay time.Duration) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.initialDelay = initialDelay + } +} + // WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on // external source of fee rates (FeeRateProvider). To apply a fee rate change, // the caller should re-add the sweep by calling AddSweep. @@ -355,6 +382,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, store: store, sweepStore: sweepStore, clock: cfg.clock, + initialDelay: cfg.initialDelay, customFeeRate: cfg.customFeeRate, customMuSig2Signer: cfg.customMuSig2Signer, } @@ -560,6 +588,12 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { cfg.batchPublishDelay = defaultTestnetPublishDelay } + if b.initialDelay < 0 { + return nil, fmt.Errorf("negative initialDelay: %v", + b.initialDelay) + } + cfg.initialDelay = b.initialDelay + batchKit := b.newBatchKit() batch := NewBatch(cfg, batchKit) @@ -647,6 +681,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance) + // Note that initialDelay and batchPublishDelay are 0 for batches + // recovered from DB so publishing happen in case of a daemon restart + // (especially important in case of a crashloop). newBatch, err := NewBatchFromDB(cfg, batchKit) if err != nil { return fmt.Errorf("failed in NewBatchFromDB: %w", err)