2
0
mirror of https://github.com/lightninglabs/loop synced 2024-11-17 21:25:56 +00:00

sweepbatcher: fix race condition when stopping

The race was detected in CI and locally when running with -race.
It happened between the following calls:

WARNING: DATA RACE
Write at 0x00c0003e6638 by goroutine 1374:
  runtime.racewrite()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Wait()
      sweepbatcher/sweep_batch.go:463 +0x6e
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).Run.func1()
      sweepbatcher/sweep_batcher.go:272 +0x10e

Previous read at 0x00c0003e6638 by goroutine 1388:
  runtime.raceread()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).monitorConfirmations()
      sweepbatcher/sweep_batch.go:1144 +0x285
  github.com/lightninglabs/loop/sweepbatcher.(*batch).handleSpend()
      sweepbatcher/sweep_batch.go:1309 +0x10e4
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Run()
      sweepbatcher/sweep_batch.go:526 +0xb04
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).spinUpBatch.func1()
      sweepbatcher/sweep_batcher.go:455 +0xbd

The race was caused because wg.Add(1) and wg.Wait() were running from different
goroutines (one goroutine was running batch.Run() and another - batcher.Run()).

To avoid this scenario, wg.Wait() call was moved into batch.Run() call, so it
waits itself for its children goroutines, after which the channel b.finished
is closed, and it serves a signal for external waiters (the batcher, calling
batch.Wait()).

Also the channel batch.stopped was renamed to batch.stopping to better reflect
its nature.

Added TestSweepBatcherCloseDuringAdding to make sure adding a sweep during
shutting down does not cause a crash. The test did not catch the original
race condition.
This commit is contained in:
Boris Nagaev 2024-06-14 23:31:23 -03:00
parent 8f985143e2
commit c34d04e2f7
No known key found for this signature in database
2 changed files with 116 additions and 7 deletions

View File

@ -193,8 +193,12 @@ type batch struct {
// main event loop.
callLeave chan struct{}
// stopped signals that the batch has stopped.
stopped chan struct{}
// stopping signals that the batch is stopping.
stopping chan struct{}
// finished signals that the batch has stopped and all child goroutines
// have finished.
finished chan struct{}
// quit is owned by the parent batcher and signals that the batch must
// stop.
@ -273,7 +277,10 @@ func (b *batch) scheduleNextCall() (func(), error) {
case <-b.quit:
return func() {}, ErrBatcherShuttingDown
case <-b.stopped:
case <-b.stopping:
return func() {}, ErrBatchShuttingDown
case <-b.finished:
return func() {}, ErrBatchShuttingDown
}
@ -297,7 +304,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopped: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
@ -340,7 +348,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopped: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
@ -460,7 +469,7 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
// Wait waits for the batch to gracefully stop.
func (b *batch) Wait() {
b.log.Infof("Stopping")
b.wg.Wait()
<-b.finished
}
// Run is the batch's main event loop.
@ -468,8 +477,12 @@ func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.stopped)
close(b.stopping)
// Make sure not to call b.wg.Wait from any other place to avoid
// race condition between b.wg.Add(1) and b.wg.Wait().
b.wg.Wait()
close(b.finished)
}()
if b.muSig2SignSweep == nil {

View File

@ -1538,3 +1538,99 @@ func TestSweepFetcher(t *testing.T) {
// Make sure the batcher exited without an error.
checkBatcherError(t, runErr)
}
// TestSweepBatcherCloseDuringAdding tests that sweep batcher works correctly
// if it is closed (stops running) during AddSweep call.
func TestSweepBatcherCloseDuringAdding(t *testing.T) {
defer test.Guard(t)()
lnd := test.NewMockLnd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := loopdb.NewStoreMock(t)
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
require.NoError(t, err)
batcherStore := NewStoreMock()
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
sweepStore)
go func() {
err := batcher.Run(ctx)
checkBatcherError(t, err)
}()
// Add many swaps.
for i := byte(1); i < 255; i++ {
swapHash := lntypes.Hash{i, i, i}
// Create a swap contract.
swap := &loopdb.LoopOutContract{
SwapContract: loopdb.SwapContract{
CltvExpiry: 111,
AmountRequested: 111,
},
SwapInvoice: swapInvoice,
}
err = store.CreateLoopOut(ctx, swapHash, swap)
require.NoError(t, err)
store.AssertLoopOutStored()
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Add many sweeps.
for i := byte(1); i < 255; i++ {
// Create a sweep request.
sweepReq := SweepRequest{
SwapHash: lntypes.Hash{i, i, i},
Value: 111,
Outpoint: wire.OutPoint{
Hash: chainhash.Hash{i, i},
Index: 1,
},
Notifier: &dummyNotifier,
}
// Deliver sweep request to batcher.
err := batcher.AddSweep(&sweepReq)
if err == ErrBatcherShuttingDown {
break
}
require.NoError(t, err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
// Close sweepbatcher during addings.
time.Sleep(1 * time.Millisecond)
cancel()
}()
// We don't know how many spend notification registrations will be
// issued, so accept them while waiting for two goroutines to stop.
quit := make(chan struct{})
registrationChan := make(chan struct{})
go func() {
defer close(registrationChan)
for {
select {
case <-lnd.RegisterSpendChannel:
case <-quit:
return
}
}
}()
wg.Wait()
close(quit)
<-registrationChan
}