From a72825fe83f04057d3c9e654bfccc1f862faea99 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 5 Jun 2023 19:42:55 +0530 Subject: [PATCH] More work on porting transfer kitten --- kittens/transfer/ftc.go | 17 ++ kittens/transfer/send.go | 381 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 395 insertions(+), 3 deletions(-) diff --git a/kittens/transfer/ftc.go b/kittens/transfer/ftc.go index 5c9b3aaf8..7166ed303 100644 --- a/kittens/transfer/ftc.go +++ b/kittens/transfer/ftc.go @@ -333,3 +333,20 @@ func NewFileTransmissionCommand(serialized string) (ans *FileTransmissionCommand } return } + +func split_for_transfer(data []byte, file_id string, mark_last bool, callback func(*FileTransmissionCommand)) { + const chunk_size = 4096 + for len(data) > 0 { + ac := Action_data + if mark_last && len(data) <= chunk_size { + ac = Action_end_data + } + chunk := data + if len(chunk) > chunk_size { + chunk = data[:chunk_size] + data = data[chunk_size:] + } + callback(&FileTransmissionCommand{Action: ac, File_id: file_id, Data: chunk}) + + } +} diff --git a/kittens/transfer/send.go b/kittens/transfer/send.go index b515170b9..e47cb149e 100644 --- a/kittens/transfer/send.go +++ b/kittens/transfer/send.go @@ -5,7 +5,9 @@ package transfer import ( "bytes" "compress/zlib" + "errors" "fmt" + "io" "io/fs" "os" "path/filepath" @@ -346,10 +348,11 @@ type SendManager struct { file_done func(*File) fid_map map[string]*File all_acknowledged, all_started, has_transmitting, has_rsync bool - active_idx, current_chunk_uncompressed_size int + active_idx int prefix, suffix string last_progress_file *File progress_tracker *ProgressTracker + current_chunk_uncompressed_sz int64 } func (self *SendManager) start_transfer() string { @@ -365,7 +368,7 @@ func (self *SendManager) initialize() { self.fid_map[f.file_id] = f } self.active_idx = -1 - self.current_chunk_uncompressed_size = -1 + self.current_chunk_uncompressed_sz = -1 self.prefix = fmt.Sprintf("\x1b]%d;id=%s;", kitty.FileTransferCode, self.request_id) self.suffix = "\x1b\\" for _, f := range self.files { @@ -610,7 +613,7 @@ func (self *SendHandler) erase_progress() { } } -func (self *SendHandler) refresh_progress(timer_id loop.IdType) (err error) { +func (self *SendHandler) refresh_progress(loop.IdType) (err error) { if !self.transmit_started { return nil } @@ -680,6 +683,273 @@ func (self *SendHandler) send_file_metadata() { } } +func (self *SendManager) update_collective_statuses() { + var found_not_started, found_not_done, has_rsync, has_transmitting bool + for _, f := range self.files { + if f.state != ACKNOWLEDGED { + found_not_done = true + } + if f.state == WAITING_FOR_START { + found_not_started = true + } else if f.state == TRANSMITTING { + has_transmitting = true + } + if f.ttype == TransmissionType_rsync { + has_rsync = true + } + } + self.all_acknowledged = !found_not_done + self.all_started = !found_not_started + self.has_rsync = has_rsync + self.has_transmitting = has_transmitting +} + +func (self *SendManager) on_file_status_update(ftc *FileTransmissionCommand) error { + file := self.fid_map[ftc.File_id] + if file == nil { + return nil + } + switch ftc.Status { + case `STARTED`: + file.remote_final_path = ftc.Name + file.remote_initial_size = int64(ftc.Size) + if file.file_type == FileType_directory { + file.state = FINISHED + } else { + if ftc.Ttype == TransmissionType_rsync { + file.state = WAITING_FOR_DATA + } else { + file.state = TRANSMITTING + } + if file.state == WAITING_FOR_DATA { + panic("TODO: Implement rsync support") + } + self.update_collective_statuses() + } + case `PROGRESS`: + self.last_progress_file = file + change := int64(ftc.Size) - file.reported_progress + file.reported_progress = int64(ftc.Size) + self.progress_tracker.on_file_progress(file, change) + self.file_progress(file, int(change)) + default: + if ftc.Name != "" && file.remote_final_path == "" { + file.remote_final_path = ftc.Name + } + file.state = ACKNOWLEDGED + if ftc.Status == `OK` { + if ftc.Size > 0 { + change := int64(ftc.Size) - file.reported_progress + file.reported_progress = int64(ftc.Size) + self.progress_tracker.on_file_progress(file, change) + self.file_progress(file, int(change)) + } + } else { + file.err_msg = ftc.Status + } + self.progress_tracker.on_file_done(file) + self.file_done(file) + if self.active_idx > -1 && file == self.files[self.active_idx] { + self.active_idx = -1 + } + self.update_collective_statuses() + } + return nil +} + +func (self *SendManager) on_signature_data_received(ftc *FileTransmissionCommand) error { + file := self.fid_map[ftc.File_id] + if file == nil || file.state != WAITING_FOR_DATA { + return nil + } + panic("TODO: Implement rsync support") +} + +func (self *SendManager) on_file_transfer_response(ftc *FileTransmissionCommand) error { + switch ftc.Action { + case Action_status: + if ftc.File_id != "" { + return self.on_file_status_update(ftc) + } + if ftc.Status == "OK" { + self.state = SEND_PERMISSION_GRANTED + } else { + self.state = SEND_PERMISSION_DENIED + } + case Action_data, Action_end_data: + if ftc.File_id != "" { + return self.on_signature_data_received(ftc) + } + } + return nil +} + +func (self *SendHandler) on_file_transfer_response(ftc *FileTransmissionCommand) error { + if ftc.Id != self.manager.request_id { + return nil + } + if ftc.Action == Action_status && ftc.Status == "CANCELED" { + self.lp.Quit(1) + return nil + } + if self.quit_after_write_code > -1 || self.manager.state == SEND_CANCELED { + return nil + } + before := self.manager.state + err := self.manager.on_file_transfer_response(ftc) + if err != nil { + return err + } + if before == SEND_WAITING_FOR_PERMISSION { + switch self.manager.state { + case SEND_PERMISSION_DENIED: + self.lp.Println(self.ctx.Err("Permission denied for this transfer")) + self.lp.Quit(1) + return nil + case SEND_PERMISSION_GRANTED: + self.lp.Println(self.ctx.Green("Permission granted for this transfer")) + self.send_file_metadata() + } + } + return nil +} + +func (self *SendManager) activate_next_ready_file() *File { + if self.active_idx > -1 && self.active_idx < len(self.files) { + self.files[self.active_idx].transmit_ended_at = time.Now() + } + for i, f := range self.files { + if f.state == TRANSMITTING { + self.active_idx = i + self.update_collective_statuses() + self.progress_tracker.change_active_file(f) + return f + } + } + self.active_idx = -1 + self.update_collective_statuses() + return nil +} + +func (self *SendManager) active_file() *File { + if self.active_idx > -1 && self.active_idx < len(self.files) { + return self.files[self.active_idx] + } + return nil +} + +func (self *File) next_chunk() (ans string, asz int, err error) { + const sz = 1024 * 1024 + switch self.file_type { + case FileType_symlink: + self.state = FINISHED + ans, asz = self.symbolic_link_target, len(self.symbolic_link_target) + return + case FileType_link: + self.state = FINISHED + ans, asz = self.hard_link_target, len(self.hard_link_target) + return + } + is_last := false + // TODO: self.delta_loader rsync support + if self.actual_file == nil { + self.actual_file, err = os.Open(self.expanded_local_path) + if err != nil { + return + } + } + chunk := make([]byte, sz) + n, err := self.actual_file.Read(chunk) + if err != nil && !errors.Is(err, io.EOF) { + return + } + if n <= 0 { + is_last = true + } else if pos, _ := self.actual_file.Seek(0, os.SEEK_CUR); pos >= self.file_size { + is_last = true + } + chunk = chunk[:n] + uncompressed_sz := len(chunk) + cchunk := self.compressor.Compress(chunk) + if is_last { + trail := self.compressor.Flush() + if len(trail) >= 0 { + cchunk = append(cchunk, trail...) + } + self.state = FINISHED + if self.actual_file != nil { + err = self.actual_file.Close() + self.actual_file = nil + if err != nil { + return + } + } + } + ans, asz = utils.UnsafeBytesToString(cchunk), uncompressed_sz + return +} + +func (self *SendManager) next_chunks(callback func(string)) error { + for { + if self.active_file() == nil { + self.activate_next_ready_file() + } + af := self.active_file() + if af == nil { + return nil + } + chunk := "" + self.current_chunk_uncompressed_sz = 0 + for af.state != FINISHED && len(chunk) == 0 { + c, usz, err := af.next_chunk() + if err != nil { + return err + } + self.current_chunk_uncompressed_sz += int64(usz) + chunk = c + } + is_last := af.state == FINISHED + if len(chunk) > 0 { + split_for_transfer(utils.UnsafeStringToBytes(chunk), af.file_id, is_last, func(ftc *FileTransmissionCommand) { callback(ftc.Serialize()) }) + } else if is_last { + callback(FileTransmissionCommand{Action: Action_end_data, File_id: af.file_id}.Serialize()) + } + } +} + +func (self *SendHandler) transmit_next_chunk() (err error) { + found_chunk := false + err = self.manager.next_chunks(func(chunk string) { + self.send_payload(chunk) + found_chunk = true + }) + if err != nil { + return err + } + if !found_chunk { + if self.manager.all_acknowledged { + self.transfer_finished() + } + } + return +} + +func (self *SendHandler) start_transfer() (err error) { + if self.manager.active_file() == nil { + self.manager.activate_next_ready_file() + } + if self.manager.active_file() != nil { + self.transmit_started = true + self.manager.progress_tracker.start_transfer() + err = self.transmit_next_chunk() + if err != nil { + return + } + self.draw_progress() + } + return +} + func (self *SendHandler) initialize() error { self.manager.initialize() self.spinner = tui.NewSpinner("dots") @@ -692,6 +962,93 @@ func (self *SendHandler) initialize() error { return nil } +func (self *SendHandler) transfer_finished() { + self.send_payload(FileTransmissionCommand{Action: Action_finish}.Serialize()) + if len(self.failed_files) > 0 { + self.quit_after_write_code = 1 + } else { + self.quit_after_write_code = 0 + } +} + +func (self *SendHandler) on_text(text string, from_key_event, in_bracketed_paste bool) error { + if self.quit_after_write_code > -1 { + return nil + } + if self.check_paths_printed && !self.transmit_started { + switch strings.ToLower(text) { + case "y": + err := self.start_transfer() + if err != nil { + return err + } + if self.manager.all_acknowledged { + self.refresh_progress(0) + self.transfer_finished() + } + return nil + case "n": + self.failed_files = nil + self.abort_transfer() + self.lp.Println(`Sending cancel request to terminal`) + return nil + } + self.print_continue_msg() + } + return nil +} + +func (self *SendHandler) print_continue_msg() { + self.lp.Println( + `Press`, self.ctx.Green(`y`), `to continue or`, self.ctx.BrightRed(`n`), `to abort`) +} + +func (self *SendHandler) abort_transfer(delay ...time.Duration) { + d := 5 * time.Second + if len(delay) > 0 { + d = delay[0] + } + self.send_payload(FileTransmissionCommand{Action: Action_cancel}.Serialize()) + self.manager.state = SEND_CANCELED + self.lp.AddTimer(d, false, func(loop.IdType) error { + self.lp.Quit(1) + return nil + }) +} + +func (self *SendHandler) on_key_event(ev *loop.KeyEvent) error { + if self.quit_after_write_code > -1 { + return nil + } + if ev.MatchesPressOrRepeat("esc") { + ev.Handled = true + if self.check_paths_printed && !self.transmit_started { + self.failed_files = nil + self.abort_transfer() + self.lp.Println(`Sending cancel request to terminal`) + return nil + } else { + self.on_interrupt() + } + } else if ev.MatchesPressOrRepeat("ctrl+c") { + self.on_interrupt() + ev.Handled = true + } + return nil +} + +func (self *SendHandler) on_interrupt() { + if self.quit_after_write_code > -1 { + return + } + if self.manager.state == SEND_CANCELED { + self.lp.Println(`Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received`) + return + } + self.lp.Println(self.ctx.BrightRed(`Interrupt requested, cancelling transfer, transferred files are in undefined state`)) + self.abort_transfer() +} + func send_loop(opts *Options, files []*File) (err error) { lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors) if err != nil { @@ -712,6 +1069,24 @@ func send_loop(opts *Options, files []*File) (err error) { lp.OnInitialize = func() (string, error) { return "", handler.initialize() } + ftc_code := strconv.Itoa(kitty.FileTransferCode) + lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error { + if et == loop.OSC { + if idx := bytes.IndexByte(payload, ';'); idx > 0 { + if utils.UnsafeBytesToString(payload[:idx]) == ftc_code { + ftc, err := NewFileTransmissionCommand(utils.UnsafeBytesToString(payload[idx+1:])) + if err != nil { + return fmt.Errorf("Received invalid FileTransmissionCommand from terminal with error: %w", err) + } + return handler.on_file_transfer_response(ftc) + } + } + } + return nil + } + lp.OnText = handler.on_text + lp.OnKeyEvent = handler.on_key_event + return }