|
|
|
@ -4,6 +4,7 @@ import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"runtime"
|
|
|
|
|
"sort"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -134,10 +135,13 @@ func (m *Matcher) scan(request MatchRequest, limit int) (*Merger, bool) {
|
|
|
|
|
slices := m.sliceChunks(request.chunks)
|
|
|
|
|
numSlices := len(slices)
|
|
|
|
|
resultChan := make(chan partialResult, numSlices)
|
|
|
|
|
countChan := make(chan int, numSlices)
|
|
|
|
|
countChan := make(chan int, numChunks)
|
|
|
|
|
waitGroup := sync.WaitGroup{}
|
|
|
|
|
|
|
|
|
|
for idx, chunks := range slices {
|
|
|
|
|
waitGroup.Add(1)
|
|
|
|
|
go func(idx int, chunks []*Chunk) {
|
|
|
|
|
defer func() { waitGroup.Done() }()
|
|
|
|
|
sliceMatches := []*Item{}
|
|
|
|
|
for _, chunk := range chunks {
|
|
|
|
|
var matches []*Item
|
|
|
|
@ -159,6 +163,12 @@ func (m *Matcher) scan(request MatchRequest, limit int) (*Merger, bool) {
|
|
|
|
|
}(idx, chunks)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wait := func() bool {
|
|
|
|
|
cancelled.Set(true)
|
|
|
|
|
waitGroup.Wait()
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
count := 0
|
|
|
|
|
matchCount := 0
|
|
|
|
|
for matchesInChunk := range countChan {
|
|
|
|
@ -166,7 +176,7 @@ func (m *Matcher) scan(request MatchRequest, limit int) (*Merger, bool) {
|
|
|
|
|
matchCount += matchesInChunk
|
|
|
|
|
|
|
|
|
|
if limit > 0 && matchCount > limit {
|
|
|
|
|
return nil, true // For --select-1 and --exit-0
|
|
|
|
|
return nil, wait() // For --select-1 and --exit-0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if count == numChunks {
|
|
|
|
@ -174,8 +184,7 @@ func (m *Matcher) scan(request MatchRequest, limit int) (*Merger, bool) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !empty && m.reqBox.Peak(REQ_RESET) {
|
|
|
|
|
cancelled.Set(true)
|
|
|
|
|
return nil, true
|
|
|
|
|
return nil, wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if time.Now().Sub(startedAt) > PROGRESS_MIN_DURATION {
|
|
|
|
|