Refactor scanning code

Only use channels for signalling, have worker threads append to results
array directly since this is thread safe
This commit is contained in:
Kovid Goyal
2025-06-24 11:28:51 +05:30
parent 0bf7306958
commit 8b51f4da87
3 changed files with 342 additions and 286 deletions

View File

@@ -402,7 +402,7 @@ func main(_ *cli.Command, opts *Options, args []string) (rc int, err error) {
if err != nil {
return 1, err
}
handler := Handler{lp: lp, err_chan: make(chan error), rl: readline.New(lp, readline.RlInit{
handler := Handler{lp: lp, err_chan: make(chan error, 8), rl: readline.New(lp, readline.RlInit{
Prompt: "> ", ContinuationPrompt: ". ",
})}
if err = handler.set_state_from_config(conf, opts); err != nil {

View File

@@ -117,7 +117,7 @@ func icon_for(path string, x os.FileMode) string {
return ans
}
func (h *Handler) draw_column_of_matches(matches []ResultItem, current_idx int, x, available_width int) {
func (h *Handler) draw_column_of_matches(matches ResultsType, current_idx int, x, available_width int) {
for i, m := range matches {
h.lp.QueueWriteString("\r")
h.lp.MoveCursorHorizontally(x)
@@ -139,7 +139,7 @@ func (h *Handler) draw_column_of_matches(matches []ResultItem, current_idx int,
}
}
func (h *Handler) draw_list_of_results(matches []ResultItem, y, height int) int {
func (h *Handler) draw_list_of_results(matches ResultsType, y, height int) int {
if len(matches) == 0 || height < 2 {
return 0
}
@@ -198,7 +198,7 @@ func (h *Handler) draw_num_of_matches(num_shown, y int) {
}
}
func (h *Handler) draw_results(y, bottom_margin int, matches []ResultItem, in_progress bool) (height int) {
func (h *Handler) draw_results(y, bottom_margin int, matches ResultsType, in_progress bool) (height int) {
height = h.screen_size.height - y - bottom_margin
h.lp.MoveCursorTo(1, 1+y)
h.draw_frame(h.screen_size.width, height)

View File

@@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/kovidgoyal/kitty/tools/fzf"
@@ -25,6 +26,20 @@ type ResultItem struct {
positions []int // may be nil
score CombinedScore
}
type ResultsType []*ResultItem
func (r *ResultItem) SetScoreResult(x fzf.Result) {
r.positions = x.Positions
r.score.Set_score(uint32(math.MaxUint32 - x.Score))
}
func (r *ResultItem) Set_relpath(root_dir string) {
if ans, err := filepath.Rel(root_dir, r.abspath); err == nil {
r.text = ans
} else {
r.text = r.abspath
}
}
func (r ResultItem) String() string {
return fmt.Sprintf("{text: %#v, abspath: %#v, is_dir: %v, positions: %#v}", r.text, r.abspath, r.ftype.IsDir(), r.positions)
@@ -37,27 +52,304 @@ func (r *ResultItem) sorted_positions() []int {
return r.positions
}
type ScanRequest struct {
root_dir string
type FileSystemScanner struct {
listeners []chan int
in_progress, keep_going atomic.Bool
root_dir string
mutex sync.Mutex
results []ResultItem
dir_reader func(path string, level int) ([]fs.DirEntry, error)
err error
}
type ScanResult struct {
root_dir string
items []ResultItem
err error
is_finished bool
func NewFileSystemScanner(root_dir string, notify chan int) (fss *FileSystemScanner) {
ans := &FileSystemScanner{root_dir: root_dir, listeners: []chan int{notify}, results: make([]ResultItem, 0, 1024)}
ans.in_progress.Store(true)
ans.keep_going.Store(true)
ans.dir_reader = func(path string, level int) ([]fs.DirEntry, error) {
return os.ReadDir(path)
}
return ans
}
type ScoreRequest struct {
root_dir, query string
is_last_for_current_root_dir bool
items []ResultItem
type Scanner interface {
Start()
Cancel()
AddListener(chan int)
Len() int
Batch(offset int) []ResultItem
Finished() bool
Error() error
}
type ScoreResult struct {
query, root_dir string
is_last_for_current_root_dir bool
items []ResultItem
func (fss *FileSystemScanner) Error() error {
fss.mutex.Lock()
defer fss.mutex.Unlock()
return fss.err
}
func (fss *FileSystemScanner) Start() {
go fss.worker()
}
func (fss *FileSystemScanner) Cancel() {
fss.keep_going.Store(false)
}
func (fss *FileSystemScanner) AddListener(x chan int) {
fss.mutex.Lock()
defer fss.mutex.Unlock()
if !fss.in_progress.Load() {
close(x)
} else {
fss.listeners = append(fss.listeners, x)
}
}
func (fss *FileSystemScanner) Len() int {
fss.mutex.Lock()
defer fss.mutex.Unlock()
return len(fss.results)
}
func (fss *FileSystemScanner) Batch(offset int) []ResultItem {
fss.mutex.Lock()
defer fss.mutex.Unlock()
if offset >= len(fss.results) {
return nil
}
return fss.results[offset:]
}
func (fss *FileSystemScanner) Finished() bool {
return !fss.in_progress.Load()
}
func (fss *FileSystemScanner) worker() {
defer func() {
fss.mutex.Lock()
defer fss.mutex.Unlock()
fss.in_progress.Store(false)
if r := recover(); r != nil {
st, qerr := utils.Format_stacktrace_on_panic(r)
fss.err = fmt.Errorf("%w\n%s", qerr, st)
}
for _, l := range fss.listeners {
close(l)
}
}()
var scan_dir func(string, int)
seen_dirs := make(map[string]bool)
scan_dir = func(dir string, level int) {
if !fss.keep_going.Load() || seen_dirs[dir] {
return
}
seen_dirs[dir] = true
entries, err := fss.dir_reader(dir, level)
if err != nil {
if level == 0 {
fss.keep_going.Store(false)
fss.mutex.Lock()
fss.err = err
fss.mutex.Unlock()
}
return
}
ns := fss.results
new_sz := len(ns) + len(entries)
if cap(ns) < new_sz {
ns = make([]ResultItem, len(ns), max(16*1024, new_sz, cap(ns)*2))
copy(ns, fss.results)
}
new_items := ns[len(ns):new_sz]
for i, x := range entries {
ftype := x.Type()
if ftype&fs.ModeSymlink != 0 {
if st, err := x.Info(); err == nil && st.IsDir() {
ftype = fs.ModeDir
}
}
new_items[i].ftype = ftype
new_items[i].abspath = filepath.Join(dir, x.Name())
new_items[i].text = strings.ToLower(x.Name())
}
slices.SortFunc(new_items, func(a, b ResultItem) int {
if a.ftype&fs.ModeDir == b.ftype&fs.ModeDir {
return cmp.Compare(a.text, b.text)
}
if a.ftype.IsDir() {
return -1
}
return 1
})
ns = ns[0:new_sz]
fss.mutex.Lock()
fss.results = ns
listeners := fss.listeners
num := len(fss.results)
fss.mutex.Unlock()
for _, l := range listeners {
select {
case l <- num:
default:
}
}
for _, x := range new_items {
if x.ftype.IsDir() {
scan_dir(x.abspath, level+1)
}
}
}
scan_dir(fss.root_dir, 0)
}
type FileSystemScorer struct {
scanner Scanner
keep_going, is_complete atomic.Bool
root_dir, query string
only_dirs bool
mutex sync.Mutex
renderable_results []*ResultItem
on_results func(error)
current_worker_wait *sync.WaitGroup
scorer *fzf.FuzzyMatcher
}
func NewFileSystemScorer(root_dir, query string, only_dirs bool, on_results func(error)) (ans *FileSystemScorer) {
return &FileSystemScorer{
query: query, root_dir: root_dir, only_dirs: only_dirs, on_results: on_results,
scorer: fzf.NewFuzzyMatcher(fzf.PATH_SCHEME)}
}
func (fss *FileSystemScorer) Start() {
on_results := make(chan int)
fss.is_complete.Store(false)
fss.keep_going.Store(true)
if fss.scanner == nil {
fss.scanner = NewFileSystemScanner(fss.root_dir, on_results)
fss.scanner.Start()
} else {
fss.scanner.AddListener(on_results)
}
fss.current_worker_wait = &sync.WaitGroup{}
fss.current_worker_wait.Add(1)
go fss.worker(on_results, fss.current_worker_wait)
}
func (fss *FileSystemScorer) Change_query(query string) {
if fss.query == query {
return
}
fss.keep_going.Store(false)
if fss.current_worker_wait != nil {
fss.current_worker_wait.Wait()
}
fss.query = query
fss.Start()
}
func (fss *FileSystemScorer) worker(on_results chan int, worker_wait *sync.WaitGroup) {
defer func() {
fss.is_complete.Store(true)
defer worker_wait.Done()
if r := recover(); r != nil {
if fss.keep_going.Load() {
st, qerr := utils.Format_stacktrace_on_panic(r)
fss.on_results(fmt.Errorf("%w\n%s", qerr, st))
}
} else {
if fss.keep_going.Load() {
fss.on_results(nil)
}
}
}()
offset := 0
root_dir := fss.root_dir
global_min_score, global_max_score := CombinedScore(math.MaxUint64), CombinedScore(0)
var idx uint32
handle_batch := func(results []ResultItem) (err error) {
if err = fss.scanner.Error(); err != nil {
return
}
var rp []*ResultItem
if fss.only_dirs {
rp = make([]*ResultItem, 0, len(results))
for i, r := range results {
if r.ftype.IsDir() {
results[i].Set_relpath(root_dir)
results[i].score.Set_index(idx)
idx++
rp = append(rp, &results[i])
}
}
} else {
rp = make([]*ResultItem, len(results))
for i := range len(rp) {
results[i].Set_relpath(root_dir)
results[i].score.Set_index(idx)
idx++
rp[i] = &results[i]
}
}
if fss.query != "" && len(rp) > 0 {
scores, err := fss.scorer.ScoreWithCache(utils.Map(func(r *ResultItem) string { return r.text }, rp), fss.query)
if err != nil {
return err
}
for i, r := range rp {
r.SetScoreResult(scores[i])
}
}
min_score, max_score := CombinedScore(math.MaxUint64), CombinedScore(0)
if len(rp) > 0 {
slices.SortFunc(rp, func(a, b *ResultItem) int { return cmp.Compare(a.score, b.score) })
min_score, max_score = rp[0].score, rp[len(results)-1].score
}
var rr []*ResultItem
fss.mutex.Lock()
existing := fss.renderable_results
fss.mutex.Unlock()
switch {
case min_score >= global_max_score:
rr = append(existing, rp...)
case max_score < global_min_score:
rr = make([]*ResultItem, len(existing)+len(rp))
copy(rr, rp)
copy(rr[len(rp):], existing)
default:
rr = merge_sorted_slices(existing, rp)
}
fss.mutex.Lock()
fss.renderable_results = rr
global_min_score = min(global_min_score, min_score)
global_max_score = min(global_max_score, max_score)
fss.mutex.Unlock()
return
}
for range on_results {
if !fss.keep_going.Load() {
break
}
results := fss.scanner.Batch(offset)
if len(results) > 0 || fss.scanner.Error() != nil {
offset += len(results)
fss.on_results(handle_batch(results))
}
}
if fss.keep_going.Load() {
fss.on_results(handle_batch(fss.scanner.Batch(offset)))
}
}
func (fss *FileSystemScorer) Results() (ans ResultsType, is_finished bool) {
fss.mutex.Lock()
defer fss.mutex.Unlock()
return fss.renderable_results, fss.is_complete.Load()
}
func (fss *FileSystemScorer) Cancel() {
fss.keep_going.Store(false)
fss.scanner.Cancel()
}
type Settings interface {
@@ -67,183 +359,40 @@ type Settings interface {
}
type ResultManager struct {
current_root_dir string
current_root_dir_scan_complete bool
results_for_current_root_dir []ResultItem
scan_requests chan ScanRequest
scan_results chan ScanResult
current_query string
current_query_scoring_complete bool
matches_for_current_query []ResultItem
score_queries chan ScoreRequest
score_results chan ScoreResult
report_errors chan error
renderable_results []ResultItem
mutex sync.Mutex
scorer *fzf.FuzzyMatcher
settings Settings
report_errors chan error
WakeupMainThread func() bool
settings Settings
scorer *FileSystemScorer
mutex sync.Mutex
last_wakeup_at time.Time
}
func NewResultManager(err_chan chan error, settings Settings, WakeupMainThread func() bool) *ResultManager {
ans := &ResultManager{
scan_requests: make(chan ScanRequest, 256),
scan_results: make(chan ScanResult, 256),
score_queries: make(chan ScoreRequest, 256),
score_results: make(chan ScoreResult, 256),
report_errors: err_chan,
scorer: fzf.NewFuzzyMatcher(fzf.PATH_SCHEME),
settings: settings,
WakeupMainThread: WakeupMainThread,
}
go ans.scan_worker()
go ans.scan_result_handler()
go ans.score_worker()
go ans.sort_worker()
return ans
}
func (m *ResultManager) lock() {
m.mutex.Lock()
}
func (m *ResultManager) unlock() {
m.mutex.Unlock()
}
func (m *ResultManager) scan(dir, root_dir string, level int, idx *uint32) (err error) {
defer func() {
if r := recover(); r != nil {
st, qerr := utils.Format_stacktrace_on_panic(r)
err = fmt.Errorf("%w\n%s", qerr, st)
}
}()
items, err := os.ReadDir(dir)
func (m *ResultManager) on_results(err error) {
if err != nil {
if level == 0 {
return fmt.Errorf("failed to read directory: %s with error: %w", dir, err)
}
return nil
m.report_errors <- err
m.WakeupMainThread()
return
}
ritems := utils.Map(func(x os.DirEntry) ResultItem {
ans := ResultItem{abspath: filepath.Join(dir, x.Name()), text: strings.ToLower(x.Name()), ftype: x.Type()}
ans.score.Set_index(*idx)
*idx = *idx + 1
return ans
}, items)
slices.SortFunc(ritems, func(a, b ResultItem) int { return cmp.Compare(a.text, b.text) })
m.scan_results <- ScanResult{root_dir: root_dir, items: ritems}
for _, x := range ritems {
if x.ftype.IsDir() {
if !m.is_root_dir_current(root_dir) {
return
}
if err = m.scan(x.abspath, root_dir, level+1, idx); err != nil {
return
}
}
}
return
}
func (m *ResultManager) scan_worker() {
for r := range m.scan_requests {
st := time.Now()
var idx uint32
if err := m.scan(r.root_dir, r.root_dir, 0, &idx); err == nil {
m.scan_results <- ScanResult{root_dir: r.root_dir, is_finished: true}
}
debugprintln(111111111, time.Now().Sub(st), len(m.results_for_current_root_dir))
m.mutex.Lock()
defer m.mutex.Unlock()
if time.Since(m.last_wakeup_at) > time.Millisecond*50 {
m.WakeupMainThread()
m.last_wakeup_at = time.Now()
}
}
func (m *ResultManager) create_score_query(items []ResultItem, is_finished bool) ScoreRequest {
return ScoreRequest{root_dir: m.current_root_dir, query: m.current_query, items: utils.Map(
func(r ResultItem) ResultItem {
text, err := filepath.Rel(m.current_root_dir, r.abspath)
if err != nil {
text = r.abspath
}
return ResultItem{abspath: r.abspath, text: text, ftype: r.ftype}
}, items), is_last_for_current_root_dir: is_finished}
}
func (m *ResultManager) scan_result_handler() {
one := func(r ScanResult) {
if !m.is_root_dir_current(r.root_dir) {
return
}
var sqr ScoreRequest
has_items := len(r.items) > 0
m.lock()
if has_items {
m.results_for_current_root_dir = append(m.results_for_current_root_dir, r.items...)
}
sqr = m.create_score_query(r.items, r.is_finished)
if r.is_finished {
m.current_root_dir_scan_complete = true
}
m.unlock()
m.score_queries <- sqr
}
for r := range m.scan_results {
if r.err != nil {
m.report_errors <- r.err
continue
}
one(r)
}
}
func (r *ResultItem) SetScoreResult(x fzf.Result) {
r.positions = x.Positions
r.score.Set_score(uint32(math.MaxUint32 - x.Score))
}
func (m *ResultManager) score(r ScoreRequest) (err error) {
items := r.items
only_dirs := m.settings.OnlyDirs()
if only_dirs {
items = utils.Filter(items, func(r ResultItem) bool { return r.ftype.IsDir() })
}
res := ScoreResult{
query: r.query, items: items, root_dir: r.root_dir, is_last_for_current_root_dir: r.is_last_for_current_root_dir,
}
if r.query != "" {
var scores []fzf.Result
if scores, err = m.scorer.ScoreWithCache(utils.Map(func(r ResultItem) string { return r.text }, items), res.query); err != nil {
return
}
matched_items := make([]ResultItem, 0, len(items))
for i, x := range scores {
if x.Score > 0 {
matched_items = append(matched_items, items[i])
item := &matched_items[len(matched_items)-1]
item.SetScoreResult(x)
}
}
items = matched_items
}
m.score_results <- res
return
}
func (m *ResultManager) score_worker() {
for r := range m.score_queries {
if m.is_query_current(r.query, r.root_dir) {
if err := m.score(r); err != nil {
m.report_errors <- err
}
}
}
}
func merge_sorted_slices(a, b []ResultItem) []ResultItem {
result := make([]ResultItem, 0, len(a)+len(b))
func merge_sorted_slices(a, b []*ResultItem) []*ResultItem {
result := make([]*ResultItem, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].score <= b[j].score {
@@ -258,70 +407,6 @@ func merge_sorted_slices(a, b []ResultItem) []ResultItem {
return append(result, b[j:]...)
}
type ByRelevance []ResultItem
func (a ByRelevance) Len() int {
return len(a)
}
func (a ByRelevance) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a ByRelevance) Less(i, j int) bool {
return a[i].score < a[j].score
}
func (m *ResultManager) add_score_results(r ScoreResult) (err error) {
min_score, max_score := CombinedScore(math.MaxUint64), CombinedScore(0)
if len(r.items) > 0 {
sort.Sort(ByRelevance(r.items))
min_score = r.items[0].score
max_score = r.items[len(r.items)-1].score
}
_, _ = min_score, max_score
// renderable_results := merge_sorted_slices(m.renderable_results, r.items)
renderable_results := append(m.renderable_results, r.items...)
m.lock()
defer func() {
m.unlock()
if r := recover(); r != nil {
st, qerr := utils.Format_stacktrace_on_panic(r)
err = fmt.Errorf("%w\n%s", qerr, st)
}
}()
m.renderable_results = renderable_results
if r.is_last_for_current_root_dir {
m.current_query_scoring_complete = true
}
return
}
func (m *ResultManager) sort_worker() {
last_wakeup_at := time.Now()
for r := range m.score_results {
if m.is_query_current(r.query, r.root_dir) {
if err := m.add_score_results(r); err != nil {
m.report_errors <- err
} else {
m.lock()
is_complete := m.current_root_dir_scan_complete && m.current_query_scoring_complete
m.unlock()
if is_complete || time.Now().Sub(last_wakeup_at) > time.Millisecond*50 {
m.WakeupMainThread()
last_wakeup_at = time.Now()
}
}
}
}
}
func (m *ResultManager) is_root_dir_current(root_dir string) bool {
m.lock()
defer m.unlock()
return root_dir == m.current_root_dir
}
func (m *ResultManager) set_root_dir(root_dir string) {
var err error
if root_dir == "" || root_dir == "." {
@@ -333,54 +418,25 @@ func (m *ResultManager) set_root_dir(root_dir string) {
if root_dir, err = filepath.Abs(root_dir); err != nil {
return
}
m.lock()
defer m.unlock()
if m.current_root_dir == root_dir {
return
if m.scorer != nil {
m.scorer.Cancel()
}
m.current_root_dir = root_dir
m.results_for_current_root_dir = nil
m.matches_for_current_query = nil
m.renderable_results = nil
m.current_query_scoring_complete = false
m.current_root_dir_scan_complete = false
m.scan_requests <- ScanRequest{root_dir: m.current_root_dir}
}
func (m *ResultManager) is_query_current(query, root_dir string) bool {
m.lock()
defer m.unlock()
return root_dir == m.current_root_dir && query == m.current_query
m.scorer = NewFileSystemScorer(root_dir, "", m.settings.OnlyDirs(), m.on_results)
m.scorer.Start()
}
func (m *ResultManager) set_query(query string) {
var sqr *ScoreRequest
m.lock()
defer func() {
m.unlock()
if sqr != nil {
m.score_queries <- *sqr
}
}()
if query == m.current_query {
return
}
m.current_query = query
m.matches_for_current_query = nil
m.renderable_results = nil
m.current_query_scoring_complete = false
if m.results_for_current_root_dir != nil {
s := m.create_score_query(m.results_for_current_root_dir, m.current_root_dir_scan_complete)
sqr = &s
} else if m.current_root_dir_scan_complete {
m.current_query_scoring_complete = true
if m.scorer == nil {
m.scorer = NewFileSystemScorer(".", "", m.settings.OnlyDirs(), m.on_results)
m.scorer.Start()
} else {
m.scorer.Change_query(query)
}
}
func (h *Handler) get_results() (ans []ResultItem, in_progress bool) {
h.result_manager.lock()
defer h.result_manager.unlock()
ans = h.result_manager.renderable_results
in_progress = !h.result_manager.current_query_scoring_complete || !h.result_manager.current_root_dir_scan_complete
return
func (h *Handler) get_results() (ans ResultsType, in_progress bool) {
if h.result_manager.scorer == nil {
return
}
return h.result_manager.scorer.Results()
}