diff --git a/kittens/choose_files/main.go b/kittens/choose_files/main.go index 4579642e9..8881cd7fd 100644 --- a/kittens/choose_files/main.go +++ b/kittens/choose_files/main.go @@ -402,7 +402,7 @@ func main(_ *cli.Command, opts *Options, args []string) (rc int, err error) { if err != nil { return 1, err } - handler := Handler{lp: lp, err_chan: make(chan error), rl: readline.New(lp, readline.RlInit{ + handler := Handler{lp: lp, err_chan: make(chan error, 8), rl: readline.New(lp, readline.RlInit{ Prompt: "> ", ContinuationPrompt: ". ", })} if err = handler.set_state_from_config(conf, opts); err != nil { diff --git a/kittens/choose_files/results.go b/kittens/choose_files/results.go index 14630b695..49c46ea79 100644 --- a/kittens/choose_files/results.go +++ b/kittens/choose_files/results.go @@ -117,7 +117,7 @@ func icon_for(path string, x os.FileMode) string { return ans } -func (h *Handler) draw_column_of_matches(matches []ResultItem, current_idx int, x, available_width int) { +func (h *Handler) draw_column_of_matches(matches ResultsType, current_idx int, x, available_width int) { for i, m := range matches { h.lp.QueueWriteString("\r") h.lp.MoveCursorHorizontally(x) @@ -139,7 +139,7 @@ func (h *Handler) draw_column_of_matches(matches []ResultItem, current_idx int, } } -func (h *Handler) draw_list_of_results(matches []ResultItem, y, height int) int { +func (h *Handler) draw_list_of_results(matches ResultsType, y, height int) int { if len(matches) == 0 || height < 2 { return 0 } @@ -198,7 +198,7 @@ func (h *Handler) draw_num_of_matches(num_shown, y int) { } } -func (h *Handler) draw_results(y, bottom_margin int, matches []ResultItem, in_progress bool) (height int) { +func (h *Handler) draw_results(y, bottom_margin int, matches ResultsType, in_progress bool) (height int) { height = h.screen_size.height - y - bottom_margin h.lp.MoveCursorTo(1, 1+y) h.draw_frame(h.screen_size.width, height) diff --git a/kittens/choose_files/scan.go b/kittens/choose_files/scan.go index 8676a5777..7a674ed4b 100644 --- a/kittens/choose_files/scan.go +++ b/kittens/choose_files/scan.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/kovidgoyal/kitty/tools/fzf" @@ -25,6 +26,20 @@ type ResultItem struct { positions []int // may be nil score CombinedScore } +type ResultsType []*ResultItem + +func (r *ResultItem) SetScoreResult(x fzf.Result) { + r.positions = x.Positions + r.score.Set_score(uint32(math.MaxUint32 - x.Score)) +} + +func (r *ResultItem) Set_relpath(root_dir string) { + if ans, err := filepath.Rel(root_dir, r.abspath); err == nil { + r.text = ans + } else { + r.text = r.abspath + } +} func (r ResultItem) String() string { return fmt.Sprintf("{text: %#v, abspath: %#v, is_dir: %v, positions: %#v}", r.text, r.abspath, r.ftype.IsDir(), r.positions) @@ -37,27 +52,304 @@ func (r *ResultItem) sorted_positions() []int { return r.positions } -type ScanRequest struct { - root_dir string +type FileSystemScanner struct { + listeners []chan int + in_progress, keep_going atomic.Bool + root_dir string + mutex sync.Mutex + results []ResultItem + dir_reader func(path string, level int) ([]fs.DirEntry, error) + err error } -type ScanResult struct { - root_dir string - items []ResultItem - err error - is_finished bool +func NewFileSystemScanner(root_dir string, notify chan int) (fss *FileSystemScanner) { + ans := &FileSystemScanner{root_dir: root_dir, listeners: []chan int{notify}, results: make([]ResultItem, 0, 1024)} + ans.in_progress.Store(true) + ans.keep_going.Store(true) + ans.dir_reader = func(path string, level int) ([]fs.DirEntry, error) { + return os.ReadDir(path) + } + return ans } -type ScoreRequest struct { - root_dir, query string - is_last_for_current_root_dir bool - items []ResultItem +type Scanner interface { + Start() + Cancel() + AddListener(chan int) + Len() int + Batch(offset int) []ResultItem + Finished() bool + Error() error } -type ScoreResult struct { - query, root_dir string - is_last_for_current_root_dir bool - items []ResultItem +func (fss *FileSystemScanner) Error() error { + fss.mutex.Lock() + defer fss.mutex.Unlock() + return fss.err +} + +func (fss *FileSystemScanner) Start() { + go fss.worker() +} + +func (fss *FileSystemScanner) Cancel() { + fss.keep_going.Store(false) +} + +func (fss *FileSystemScanner) AddListener(x chan int) { + fss.mutex.Lock() + defer fss.mutex.Unlock() + if !fss.in_progress.Load() { + close(x) + } else { + fss.listeners = append(fss.listeners, x) + } +} + +func (fss *FileSystemScanner) Len() int { + fss.mutex.Lock() + defer fss.mutex.Unlock() + return len(fss.results) +} + +func (fss *FileSystemScanner) Batch(offset int) []ResultItem { + fss.mutex.Lock() + defer fss.mutex.Unlock() + if offset >= len(fss.results) { + return nil + } + return fss.results[offset:] +} + +func (fss *FileSystemScanner) Finished() bool { + return !fss.in_progress.Load() +} + +func (fss *FileSystemScanner) worker() { + defer func() { + fss.mutex.Lock() + defer fss.mutex.Unlock() + fss.in_progress.Store(false) + if r := recover(); r != nil { + st, qerr := utils.Format_stacktrace_on_panic(r) + fss.err = fmt.Errorf("%w\n%s", qerr, st) + } + for _, l := range fss.listeners { + close(l) + } + }() + var scan_dir func(string, int) + seen_dirs := make(map[string]bool) + scan_dir = func(dir string, level int) { + if !fss.keep_going.Load() || seen_dirs[dir] { + return + } + seen_dirs[dir] = true + entries, err := fss.dir_reader(dir, level) + if err != nil { + if level == 0 { + fss.keep_going.Store(false) + fss.mutex.Lock() + fss.err = err + fss.mutex.Unlock() + } + return + } + ns := fss.results + new_sz := len(ns) + len(entries) + if cap(ns) < new_sz { + ns = make([]ResultItem, len(ns), max(16*1024, new_sz, cap(ns)*2)) + copy(ns, fss.results) + } + new_items := ns[len(ns):new_sz] + for i, x := range entries { + ftype := x.Type() + if ftype&fs.ModeSymlink != 0 { + if st, err := x.Info(); err == nil && st.IsDir() { + ftype = fs.ModeDir + } + } + new_items[i].ftype = ftype + new_items[i].abspath = filepath.Join(dir, x.Name()) + new_items[i].text = strings.ToLower(x.Name()) + } + slices.SortFunc(new_items, func(a, b ResultItem) int { + if a.ftype&fs.ModeDir == b.ftype&fs.ModeDir { + return cmp.Compare(a.text, b.text) + } + if a.ftype.IsDir() { + return -1 + } + return 1 + }) + ns = ns[0:new_sz] + fss.mutex.Lock() + fss.results = ns + listeners := fss.listeners + num := len(fss.results) + fss.mutex.Unlock() + for _, l := range listeners { + select { + case l <- num: + default: + } + } + for _, x := range new_items { + if x.ftype.IsDir() { + scan_dir(x.abspath, level+1) + } + } + } + scan_dir(fss.root_dir, 0) +} + +type FileSystemScorer struct { + scanner Scanner + keep_going, is_complete atomic.Bool + root_dir, query string + only_dirs bool + mutex sync.Mutex + renderable_results []*ResultItem + on_results func(error) + current_worker_wait *sync.WaitGroup + scorer *fzf.FuzzyMatcher +} + +func NewFileSystemScorer(root_dir, query string, only_dirs bool, on_results func(error)) (ans *FileSystemScorer) { + return &FileSystemScorer{ + query: query, root_dir: root_dir, only_dirs: only_dirs, on_results: on_results, + scorer: fzf.NewFuzzyMatcher(fzf.PATH_SCHEME)} +} + +func (fss *FileSystemScorer) Start() { + on_results := make(chan int) + fss.is_complete.Store(false) + fss.keep_going.Store(true) + if fss.scanner == nil { + fss.scanner = NewFileSystemScanner(fss.root_dir, on_results) + fss.scanner.Start() + } else { + fss.scanner.AddListener(on_results) + } + fss.current_worker_wait = &sync.WaitGroup{} + fss.current_worker_wait.Add(1) + go fss.worker(on_results, fss.current_worker_wait) +} + +func (fss *FileSystemScorer) Change_query(query string) { + if fss.query == query { + return + } + fss.keep_going.Store(false) + if fss.current_worker_wait != nil { + fss.current_worker_wait.Wait() + } + fss.query = query + fss.Start() +} + +func (fss *FileSystemScorer) worker(on_results chan int, worker_wait *sync.WaitGroup) { + defer func() { + fss.is_complete.Store(true) + defer worker_wait.Done() + if r := recover(); r != nil { + if fss.keep_going.Load() { + st, qerr := utils.Format_stacktrace_on_panic(r) + fss.on_results(fmt.Errorf("%w\n%s", qerr, st)) + } + } else { + if fss.keep_going.Load() { + fss.on_results(nil) + } + } + }() + offset := 0 + root_dir := fss.root_dir + global_min_score, global_max_score := CombinedScore(math.MaxUint64), CombinedScore(0) + var idx uint32 + handle_batch := func(results []ResultItem) (err error) { + if err = fss.scanner.Error(); err != nil { + return + } + var rp []*ResultItem + if fss.only_dirs { + rp = make([]*ResultItem, 0, len(results)) + for i, r := range results { + if r.ftype.IsDir() { + results[i].Set_relpath(root_dir) + results[i].score.Set_index(idx) + idx++ + rp = append(rp, &results[i]) + } + } + } else { + rp = make([]*ResultItem, len(results)) + for i := range len(rp) { + results[i].Set_relpath(root_dir) + results[i].score.Set_index(idx) + idx++ + rp[i] = &results[i] + } + } + if fss.query != "" && len(rp) > 0 { + scores, err := fss.scorer.ScoreWithCache(utils.Map(func(r *ResultItem) string { return r.text }, rp), fss.query) + if err != nil { + return err + } + for i, r := range rp { + r.SetScoreResult(scores[i]) + } + } + min_score, max_score := CombinedScore(math.MaxUint64), CombinedScore(0) + if len(rp) > 0 { + slices.SortFunc(rp, func(a, b *ResultItem) int { return cmp.Compare(a.score, b.score) }) + min_score, max_score = rp[0].score, rp[len(results)-1].score + } + var rr []*ResultItem + fss.mutex.Lock() + existing := fss.renderable_results + fss.mutex.Unlock() + switch { + case min_score >= global_max_score: + rr = append(existing, rp...) + case max_score < global_min_score: + rr = make([]*ResultItem, len(existing)+len(rp)) + copy(rr, rp) + copy(rr[len(rp):], existing) + default: + rr = merge_sorted_slices(existing, rp) + } + fss.mutex.Lock() + fss.renderable_results = rr + global_min_score = min(global_min_score, min_score) + global_max_score = min(global_max_score, max_score) + fss.mutex.Unlock() + return + } + for range on_results { + if !fss.keep_going.Load() { + break + } + results := fss.scanner.Batch(offset) + if len(results) > 0 || fss.scanner.Error() != nil { + offset += len(results) + fss.on_results(handle_batch(results)) + } + } + if fss.keep_going.Load() { + fss.on_results(handle_batch(fss.scanner.Batch(offset))) + } +} + +func (fss *FileSystemScorer) Results() (ans ResultsType, is_finished bool) { + fss.mutex.Lock() + defer fss.mutex.Unlock() + return fss.renderable_results, fss.is_complete.Load() +} + +func (fss *FileSystemScorer) Cancel() { + fss.keep_going.Store(false) + fss.scanner.Cancel() } type Settings interface { @@ -67,183 +359,40 @@ type Settings interface { } type ResultManager struct { - current_root_dir string - current_root_dir_scan_complete bool - results_for_current_root_dir []ResultItem - scan_requests chan ScanRequest - scan_results chan ScanResult - - current_query string - current_query_scoring_complete bool - matches_for_current_query []ResultItem - score_queries chan ScoreRequest - score_results chan ScoreResult - report_errors chan error - - renderable_results []ResultItem - - mutex sync.Mutex - scorer *fzf.FuzzyMatcher - settings Settings - + report_errors chan error WakeupMainThread func() bool + settings Settings + + scorer *FileSystemScorer + mutex sync.Mutex + last_wakeup_at time.Time } func NewResultManager(err_chan chan error, settings Settings, WakeupMainThread func() bool) *ResultManager { ans := &ResultManager{ - scan_requests: make(chan ScanRequest, 256), - scan_results: make(chan ScanResult, 256), - score_queries: make(chan ScoreRequest, 256), - score_results: make(chan ScoreResult, 256), report_errors: err_chan, - scorer: fzf.NewFuzzyMatcher(fzf.PATH_SCHEME), settings: settings, WakeupMainThread: WakeupMainThread, } - go ans.scan_worker() - go ans.scan_result_handler() - go ans.score_worker() - go ans.sort_worker() return ans } -func (m *ResultManager) lock() { - m.mutex.Lock() -} - -func (m *ResultManager) unlock() { - m.mutex.Unlock() -} - -func (m *ResultManager) scan(dir, root_dir string, level int, idx *uint32) (err error) { - defer func() { - if r := recover(); r != nil { - st, qerr := utils.Format_stacktrace_on_panic(r) - err = fmt.Errorf("%w\n%s", qerr, st) - } - }() - items, err := os.ReadDir(dir) +func (m *ResultManager) on_results(err error) { if err != nil { - if level == 0 { - return fmt.Errorf("failed to read directory: %s with error: %w", dir, err) - } - return nil + m.report_errors <- err + m.WakeupMainThread() + return } - ritems := utils.Map(func(x os.DirEntry) ResultItem { - ans := ResultItem{abspath: filepath.Join(dir, x.Name()), text: strings.ToLower(x.Name()), ftype: x.Type()} - ans.score.Set_index(*idx) - *idx = *idx + 1 - return ans - }, items) - slices.SortFunc(ritems, func(a, b ResultItem) int { return cmp.Compare(a.text, b.text) }) - m.scan_results <- ScanResult{root_dir: root_dir, items: ritems} - for _, x := range ritems { - if x.ftype.IsDir() { - if !m.is_root_dir_current(root_dir) { - return - } - if err = m.scan(x.abspath, root_dir, level+1, idx); err != nil { - return - } - } - } - return -} - -func (m *ResultManager) scan_worker() { - for r := range m.scan_requests { - st := time.Now() - var idx uint32 - if err := m.scan(r.root_dir, r.root_dir, 0, &idx); err == nil { - m.scan_results <- ScanResult{root_dir: r.root_dir, is_finished: true} - } - debugprintln(111111111, time.Now().Sub(st), len(m.results_for_current_root_dir)) + m.mutex.Lock() + defer m.mutex.Unlock() + if time.Since(m.last_wakeup_at) > time.Millisecond*50 { + m.WakeupMainThread() + m.last_wakeup_at = time.Now() } } -func (m *ResultManager) create_score_query(items []ResultItem, is_finished bool) ScoreRequest { - return ScoreRequest{root_dir: m.current_root_dir, query: m.current_query, items: utils.Map( - func(r ResultItem) ResultItem { - text, err := filepath.Rel(m.current_root_dir, r.abspath) - if err != nil { - text = r.abspath - } - return ResultItem{abspath: r.abspath, text: text, ftype: r.ftype} - }, items), is_last_for_current_root_dir: is_finished} -} - -func (m *ResultManager) scan_result_handler() { - one := func(r ScanResult) { - if !m.is_root_dir_current(r.root_dir) { - return - } - var sqr ScoreRequest - has_items := len(r.items) > 0 - m.lock() - if has_items { - m.results_for_current_root_dir = append(m.results_for_current_root_dir, r.items...) - } - sqr = m.create_score_query(r.items, r.is_finished) - if r.is_finished { - m.current_root_dir_scan_complete = true - } - m.unlock() - m.score_queries <- sqr - } - for r := range m.scan_results { - if r.err != nil { - m.report_errors <- r.err - continue - } - one(r) - } -} - -func (r *ResultItem) SetScoreResult(x fzf.Result) { - r.positions = x.Positions - r.score.Set_score(uint32(math.MaxUint32 - x.Score)) -} - -func (m *ResultManager) score(r ScoreRequest) (err error) { - items := r.items - only_dirs := m.settings.OnlyDirs() - if only_dirs { - items = utils.Filter(items, func(r ResultItem) bool { return r.ftype.IsDir() }) - } - res := ScoreResult{ - query: r.query, items: items, root_dir: r.root_dir, is_last_for_current_root_dir: r.is_last_for_current_root_dir, - } - if r.query != "" { - var scores []fzf.Result - if scores, err = m.scorer.ScoreWithCache(utils.Map(func(r ResultItem) string { return r.text }, items), res.query); err != nil { - return - } - matched_items := make([]ResultItem, 0, len(items)) - for i, x := range scores { - if x.Score > 0 { - matched_items = append(matched_items, items[i]) - item := &matched_items[len(matched_items)-1] - item.SetScoreResult(x) - } - } - items = matched_items - } - m.score_results <- res - return -} - -func (m *ResultManager) score_worker() { - for r := range m.score_queries { - if m.is_query_current(r.query, r.root_dir) { - if err := m.score(r); err != nil { - m.report_errors <- err - } - } - } -} - -func merge_sorted_slices(a, b []ResultItem) []ResultItem { - result := make([]ResultItem, 0, len(a)+len(b)) +func merge_sorted_slices(a, b []*ResultItem) []*ResultItem { + result := make([]*ResultItem, 0, len(a)+len(b)) i, j := 0, 0 for i < len(a) && j < len(b) { if a[i].score <= b[j].score { @@ -258,70 +407,6 @@ func merge_sorted_slices(a, b []ResultItem) []ResultItem { return append(result, b[j:]...) } -type ByRelevance []ResultItem - -func (a ByRelevance) Len() int { - return len(a) -} - -func (a ByRelevance) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -func (a ByRelevance) Less(i, j int) bool { - return a[i].score < a[j].score -} - -func (m *ResultManager) add_score_results(r ScoreResult) (err error) { - min_score, max_score := CombinedScore(math.MaxUint64), CombinedScore(0) - if len(r.items) > 0 { - sort.Sort(ByRelevance(r.items)) - min_score = r.items[0].score - max_score = r.items[len(r.items)-1].score - } - _, _ = min_score, max_score - // renderable_results := merge_sorted_slices(m.renderable_results, r.items) - renderable_results := append(m.renderable_results, r.items...) - m.lock() - defer func() { - m.unlock() - if r := recover(); r != nil { - st, qerr := utils.Format_stacktrace_on_panic(r) - err = fmt.Errorf("%w\n%s", qerr, st) - } - }() - m.renderable_results = renderable_results - if r.is_last_for_current_root_dir { - m.current_query_scoring_complete = true - } - return -} - -func (m *ResultManager) sort_worker() { - last_wakeup_at := time.Now() - for r := range m.score_results { - if m.is_query_current(r.query, r.root_dir) { - if err := m.add_score_results(r); err != nil { - m.report_errors <- err - } else { - m.lock() - is_complete := m.current_root_dir_scan_complete && m.current_query_scoring_complete - m.unlock() - if is_complete || time.Now().Sub(last_wakeup_at) > time.Millisecond*50 { - m.WakeupMainThread() - last_wakeup_at = time.Now() - } - } - } - } -} - -func (m *ResultManager) is_root_dir_current(root_dir string) bool { - m.lock() - defer m.unlock() - return root_dir == m.current_root_dir -} - func (m *ResultManager) set_root_dir(root_dir string) { var err error if root_dir == "" || root_dir == "." { @@ -333,54 +418,25 @@ func (m *ResultManager) set_root_dir(root_dir string) { if root_dir, err = filepath.Abs(root_dir); err != nil { return } - m.lock() - defer m.unlock() - if m.current_root_dir == root_dir { - return + if m.scorer != nil { + m.scorer.Cancel() } - m.current_root_dir = root_dir - m.results_for_current_root_dir = nil - m.matches_for_current_query = nil - m.renderable_results = nil - m.current_query_scoring_complete = false - m.current_root_dir_scan_complete = false - m.scan_requests <- ScanRequest{root_dir: m.current_root_dir} -} - -func (m *ResultManager) is_query_current(query, root_dir string) bool { - m.lock() - defer m.unlock() - return root_dir == m.current_root_dir && query == m.current_query + m.scorer = NewFileSystemScorer(root_dir, "", m.settings.OnlyDirs(), m.on_results) + m.scorer.Start() } func (m *ResultManager) set_query(query string) { - var sqr *ScoreRequest - m.lock() - defer func() { - m.unlock() - if sqr != nil { - m.score_queries <- *sqr - } - }() - if query == m.current_query { - return - } - m.current_query = query - m.matches_for_current_query = nil - m.renderable_results = nil - m.current_query_scoring_complete = false - if m.results_for_current_root_dir != nil { - s := m.create_score_query(m.results_for_current_root_dir, m.current_root_dir_scan_complete) - sqr = &s - } else if m.current_root_dir_scan_complete { - m.current_query_scoring_complete = true + if m.scorer == nil { + m.scorer = NewFileSystemScorer(".", "", m.settings.OnlyDirs(), m.on_results) + m.scorer.Start() + } else { + m.scorer.Change_query(query) } } -func (h *Handler) get_results() (ans []ResultItem, in_progress bool) { - h.result_manager.lock() - defer h.result_manager.unlock() - ans = h.result_manager.renderable_results - in_progress = !h.result_manager.current_query_scoring_complete || !h.result_manager.current_root_dir_scan_complete - return +func (h *Handler) get_results() (ans ResultsType, in_progress bool) { + if h.result_manager.scorer == nil { + return + } + return h.result_manager.scorer.Results() }