Speed up write performance to loop

20x speed for large numbers of queued writes by avoiding pinging between
the writer and main goroutines
This commit is contained in:
Kovid Goyal
2023-08-05 13:53:32 +05:30
parent 16132d0309
commit 2a5a89e01c
3 changed files with 44 additions and 25 deletions

View File

@@ -47,7 +47,8 @@ type Loop struct {
timers, timers_temp []*timer
timer_id_counter, write_msg_id_counter IdType
wakeup_channel chan byte
pending_writes []*write_msg
pending_writes []write_msg
tty_write_channel chan write_msg
pending_mouse_events *utils.RingBuffer[MouseEvent]
on_SIGTSTP func() error
style_cache map[string]func(...any) string
@@ -292,7 +293,7 @@ func (self *Loop) WakeupMainThread() bool {
func (self *Loop) QueueWriteString(data string) IdType {
self.write_msg_id_counter++
msg := write_msg{str: data, bytes: nil, id: self.write_msg_id_counter}
self.add_write_to_pending_queue(&msg)
self.add_write_to_pending_queue(msg)
return msg.id
}
@@ -301,7 +302,7 @@ func (self *Loop) QueueWriteString(data string) IdType {
func (self *Loop) UnsafeQueueWriteBytes(data []byte) IdType {
self.write_msg_id_counter++
msg := write_msg{bytes: data, id: self.write_msg_id_counter}
self.add_write_to_pending_queue(&msg)
self.add_write_to_pending_queue(msg)
return msg.id
}

View File

@@ -281,10 +281,13 @@ func (self *Loop) run() (err error) {
self.keep_going = true
self.pending_mouse_events = utils.NewRingBuffer[MouseEvent](4)
tty_write_channel := make(chan *write_msg, 1) // buffered so there is no race between initial queueing and startup of writer thread
// tty_write_channel is buffered so there is no race between initial
// queueing and startup of writer thread and also as a performance
// optimization to avoid copying unnecessarily to pending_writes
self.tty_write_channel = make(chan write_msg, 512)
write_done_channel := make(chan IdType)
self.wakeup_channel = make(chan byte, 256)
self.pending_writes = make([]*write_msg, 0, 256)
self.pending_writes = make([]write_msg, 0, 256)
err_channel := make(chan error, 8)
self.death_signal = SIGNULL
self.escape_code_parser.Reset()
@@ -348,12 +351,13 @@ func (self *Loop) run() (err error) {
self.QueueWriteString(self.terminal_options.ResetStateEscapeCodes())
}
// flush queued data and wait for it to be written for a timeout, then wait for writer to shutdown
flush_writer(w_w, tty_write_channel, write_done_channel, self.pending_writes, 2*time.Second)
flush_writer(w_w, self.tty_write_channel, write_done_channel, self.pending_writes, 2*time.Second)
self.pending_writes = nil
self.tty_write_channel = nil
wait_for_tty_reader_to_quit()
}()
go write_to_tty(w_r, controlling_term, tty_write_channel, err_channel, write_done_channel)
go write_to_tty(w_r, controlling_term, self.tty_write_channel, err_channel, write_done_channel)
if self.OnInitialize != nil {
finalizer, err = self.OnInitialize()
@@ -365,7 +369,7 @@ func (self *Loop) run() (err error) {
self.SuspendAndRun = func(run func() error) (err error) {
write_id := self.QueueWriteString(self.terminal_options.ResetStateEscapeCodes())
needs_reset_escape_codes = false
if err = self.wait_for_write_to_complete(write_id, tty_write_channel, write_done_channel, 2*time.Second); err != nil {
if err = self.wait_for_write_to_complete(write_id, self.tty_write_channel, write_done_channel, 2*time.Second); err != nil {
return err
}
shutdown_tty_reader()
@@ -385,13 +389,13 @@ func (self *Loop) run() (err error) {
}
write_id = self.QueueWriteString(self.terminal_options.SetStateEscapeCodes())
needs_reset_escape_codes = true
return self.wait_for_write_to_complete(write_id, tty_write_channel, write_done_channel, 2*time.Second)
return self.wait_for_write_to_complete(write_id, self.tty_write_channel, write_done_channel, 2*time.Second)
}
self.on_SIGTSTP = func() error {
write_id := self.QueueWriteString(self.terminal_options.ResetStateEscapeCodes())
needs_reset_escape_codes = false
err := self.wait_for_write_to_complete(write_id, tty_write_channel, write_done_channel, 2*time.Second)
err := self.wait_for_write_to_complete(write_id, self.tty_write_channel, write_done_channel, 2*time.Second)
if err != nil {
return err
}
@@ -405,7 +409,7 @@ func (self *Loop) run() (err error) {
}
write_id = self.QueueWriteString(self.terminal_options.SetStateEscapeCodes())
needs_reset_escape_codes = true
err = self.wait_for_write_to_complete(write_id, tty_write_channel, write_done_channel, 2*time.Second)
err = self.wait_for_write_to_complete(write_id, self.tty_write_channel, write_done_channel, 2*time.Second)
if err != nil {
return err
}
@@ -416,7 +420,7 @@ func (self *Loop) run() (err error) {
}
for self.keep_going {
self.flush_pending_writes(tty_write_channel)
self.flush_pending_writes(self.tty_write_channel)
timeout_chan := no_timeout_channel
if len(self.timers) > 0 {
now := time.Now()
@@ -446,7 +450,7 @@ func (self *Loop) run() (err error) {
}
}
case msg_id := <-write_done_channel:
self.flush_pending_writes(tty_write_channel)
self.flush_pending_writes(self.tty_write_channel)
if self.OnWriteComplete != nil {
err = self.OnWriteComplete(msg_id)
if err != nil {

View File

@@ -59,19 +59,25 @@ func writestring_ignoring_temporary_errors(f *tty.Term, buf string) (int, error)
return n, err
}
func (self *Loop) flush_pending_writes(tty_write_channel chan<- *write_msg) {
for len(self.pending_writes) > 0 {
select {
case tty_write_channel <- self.pending_writes[0]:
n := copy(self.pending_writes, self.pending_writes[1:])
func (self *Loop) flush_pending_writes(tty_write_channel chan<- write_msg) (num_sent int) {
defer func() {
if num_sent > 0 {
n := copy(self.pending_writes, self.pending_writes[num_sent:])
self.pending_writes = self.pending_writes[:n]
}
}()
for len(self.pending_writes) > num_sent {
select {
case tty_write_channel <- self.pending_writes[num_sent]:
num_sent++
default:
return
}
}
return
}
func (self *Loop) wait_for_write_to_complete(sentinel IdType, tty_write_channel chan<- *write_msg, write_done_channel <-chan IdType, timeout time.Duration) error {
func (self *Loop) wait_for_write_to_complete(sentinel IdType, tty_write_channel chan<- write_msg, write_done_channel <-chan IdType, timeout time.Duration) error {
for len(self.pending_writes) > 0 {
select {
case tty_write_channel <- self.pending_writes[0]:
@@ -96,11 +102,19 @@ func (self *Loop) wait_for_write_to_complete(sentinel IdType, tty_write_channel
return nil
}
func (self *Loop) add_write_to_pending_queue(data *write_msg) {
self.pending_writes = append(self.pending_writes, data)
func (self *Loop) add_write_to_pending_queue(data write_msg) {
if len(self.pending_writes) > 0 || self.tty_write_channel == nil {
self.pending_writes = append(self.pending_writes, data)
} else {
select {
case self.tty_write_channel <- data:
default:
self.pending_writes = append(self.pending_writes, data)
}
}
}
func create_write_dispatcher(msg *write_msg) *write_dispatcher {
func create_write_dispatcher(msg write_msg) *write_dispatcher {
self := write_dispatcher{str: msg.str, bytes: msg.bytes, is_string: msg.bytes == nil}
if self.is_string {
self.is_empty = self.str == ""
@@ -129,7 +143,7 @@ func (self *write_dispatcher) slice(n int) {
func write_to_tty(
pipe_r *os.File, term *tty.Term,
job_channel <-chan *write_msg, err_channel chan<- error, write_done_channel chan<- IdType,
job_channel <-chan write_msg, err_channel chan<- error, write_done_channel chan<- IdType,
) {
keep_going := true
defer func() {
@@ -159,7 +173,7 @@ func write_to_tty(
}
}
write_data := func(msg *write_msg) {
write_data := func(msg write_msg) {
data := create_write_dispatcher(msg)
for !data.is_empty {
wait_for_write_available()
@@ -193,7 +207,7 @@ func write_to_tty(
}
}
func flush_writer(pipe_w *os.File, tty_write_channel chan<- *write_msg, write_done_channel <-chan IdType, pending_writes []*write_msg, timeout time.Duration) {
func flush_writer(pipe_w *os.File, tty_write_channel chan<- write_msg, write_done_channel <-chan IdType, pending_writes []write_msg, timeout time.Duration) {
writer_quit := false
defer func() {
if tty_write_channel != nil {