More work on porting transfer kitten

This commit is contained in:
Kovid Goyal
2023-06-05 19:42:55 +05:30
parent 8a2fca1c30
commit a72825fe83
2 changed files with 395 additions and 3 deletions

View File

@@ -333,3 +333,20 @@ func NewFileTransmissionCommand(serialized string) (ans *FileTransmissionCommand
}
return
}
func split_for_transfer(data []byte, file_id string, mark_last bool, callback func(*FileTransmissionCommand)) {
const chunk_size = 4096
for len(data) > 0 {
ac := Action_data
if mark_last && len(data) <= chunk_size {
ac = Action_end_data
}
chunk := data
if len(chunk) > chunk_size {
chunk = data[:chunk_size]
data = data[chunk_size:]
}
callback(&FileTransmissionCommand{Action: ac, File_id: file_id, Data: chunk})
}
}

View File

@@ -5,7 +5,9 @@ package transfer
import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
@@ -346,10 +348,11 @@ type SendManager struct {
file_done func(*File)
fid_map map[string]*File
all_acknowledged, all_started, has_transmitting, has_rsync bool
active_idx, current_chunk_uncompressed_size int
active_idx int
prefix, suffix string
last_progress_file *File
progress_tracker *ProgressTracker
current_chunk_uncompressed_sz int64
}
func (self *SendManager) start_transfer() string {
@@ -365,7 +368,7 @@ func (self *SendManager) initialize() {
self.fid_map[f.file_id] = f
}
self.active_idx = -1
self.current_chunk_uncompressed_size = -1
self.current_chunk_uncompressed_sz = -1
self.prefix = fmt.Sprintf("\x1b]%d;id=%s;", kitty.FileTransferCode, self.request_id)
self.suffix = "\x1b\\"
for _, f := range self.files {
@@ -610,7 +613,7 @@ func (self *SendHandler) erase_progress() {
}
}
func (self *SendHandler) refresh_progress(timer_id loop.IdType) (err error) {
func (self *SendHandler) refresh_progress(loop.IdType) (err error) {
if !self.transmit_started {
return nil
}
@@ -680,6 +683,273 @@ func (self *SendHandler) send_file_metadata() {
}
}
func (self *SendManager) update_collective_statuses() {
var found_not_started, found_not_done, has_rsync, has_transmitting bool
for _, f := range self.files {
if f.state != ACKNOWLEDGED {
found_not_done = true
}
if f.state == WAITING_FOR_START {
found_not_started = true
} else if f.state == TRANSMITTING {
has_transmitting = true
}
if f.ttype == TransmissionType_rsync {
has_rsync = true
}
}
self.all_acknowledged = !found_not_done
self.all_started = !found_not_started
self.has_rsync = has_rsync
self.has_transmitting = has_transmitting
}
func (self *SendManager) on_file_status_update(ftc *FileTransmissionCommand) error {
file := self.fid_map[ftc.File_id]
if file == nil {
return nil
}
switch ftc.Status {
case `STARTED`:
file.remote_final_path = ftc.Name
file.remote_initial_size = int64(ftc.Size)
if file.file_type == FileType_directory {
file.state = FINISHED
} else {
if ftc.Ttype == TransmissionType_rsync {
file.state = WAITING_FOR_DATA
} else {
file.state = TRANSMITTING
}
if file.state == WAITING_FOR_DATA {
panic("TODO: Implement rsync support")
}
self.update_collective_statuses()
}
case `PROGRESS`:
self.last_progress_file = file
change := int64(ftc.Size) - file.reported_progress
file.reported_progress = int64(ftc.Size)
self.progress_tracker.on_file_progress(file, change)
self.file_progress(file, int(change))
default:
if ftc.Name != "" && file.remote_final_path == "" {
file.remote_final_path = ftc.Name
}
file.state = ACKNOWLEDGED
if ftc.Status == `OK` {
if ftc.Size > 0 {
change := int64(ftc.Size) - file.reported_progress
file.reported_progress = int64(ftc.Size)
self.progress_tracker.on_file_progress(file, change)
self.file_progress(file, int(change))
}
} else {
file.err_msg = ftc.Status
}
self.progress_tracker.on_file_done(file)
self.file_done(file)
if self.active_idx > -1 && file == self.files[self.active_idx] {
self.active_idx = -1
}
self.update_collective_statuses()
}
return nil
}
func (self *SendManager) on_signature_data_received(ftc *FileTransmissionCommand) error {
file := self.fid_map[ftc.File_id]
if file == nil || file.state != WAITING_FOR_DATA {
return nil
}
panic("TODO: Implement rsync support")
}
func (self *SendManager) on_file_transfer_response(ftc *FileTransmissionCommand) error {
switch ftc.Action {
case Action_status:
if ftc.File_id != "" {
return self.on_file_status_update(ftc)
}
if ftc.Status == "OK" {
self.state = SEND_PERMISSION_GRANTED
} else {
self.state = SEND_PERMISSION_DENIED
}
case Action_data, Action_end_data:
if ftc.File_id != "" {
return self.on_signature_data_received(ftc)
}
}
return nil
}
func (self *SendHandler) on_file_transfer_response(ftc *FileTransmissionCommand) error {
if ftc.Id != self.manager.request_id {
return nil
}
if ftc.Action == Action_status && ftc.Status == "CANCELED" {
self.lp.Quit(1)
return nil
}
if self.quit_after_write_code > -1 || self.manager.state == SEND_CANCELED {
return nil
}
before := self.manager.state
err := self.manager.on_file_transfer_response(ftc)
if err != nil {
return err
}
if before == SEND_WAITING_FOR_PERMISSION {
switch self.manager.state {
case SEND_PERMISSION_DENIED:
self.lp.Println(self.ctx.Err("Permission denied for this transfer"))
self.lp.Quit(1)
return nil
case SEND_PERMISSION_GRANTED:
self.lp.Println(self.ctx.Green("Permission granted for this transfer"))
self.send_file_metadata()
}
}
return nil
}
func (self *SendManager) activate_next_ready_file() *File {
if self.active_idx > -1 && self.active_idx < len(self.files) {
self.files[self.active_idx].transmit_ended_at = time.Now()
}
for i, f := range self.files {
if f.state == TRANSMITTING {
self.active_idx = i
self.update_collective_statuses()
self.progress_tracker.change_active_file(f)
return f
}
}
self.active_idx = -1
self.update_collective_statuses()
return nil
}
func (self *SendManager) active_file() *File {
if self.active_idx > -1 && self.active_idx < len(self.files) {
return self.files[self.active_idx]
}
return nil
}
func (self *File) next_chunk() (ans string, asz int, err error) {
const sz = 1024 * 1024
switch self.file_type {
case FileType_symlink:
self.state = FINISHED
ans, asz = self.symbolic_link_target, len(self.symbolic_link_target)
return
case FileType_link:
self.state = FINISHED
ans, asz = self.hard_link_target, len(self.hard_link_target)
return
}
is_last := false
// TODO: self.delta_loader rsync support
if self.actual_file == nil {
self.actual_file, err = os.Open(self.expanded_local_path)
if err != nil {
return
}
}
chunk := make([]byte, sz)
n, err := self.actual_file.Read(chunk)
if err != nil && !errors.Is(err, io.EOF) {
return
}
if n <= 0 {
is_last = true
} else if pos, _ := self.actual_file.Seek(0, os.SEEK_CUR); pos >= self.file_size {
is_last = true
}
chunk = chunk[:n]
uncompressed_sz := len(chunk)
cchunk := self.compressor.Compress(chunk)
if is_last {
trail := self.compressor.Flush()
if len(trail) >= 0 {
cchunk = append(cchunk, trail...)
}
self.state = FINISHED
if self.actual_file != nil {
err = self.actual_file.Close()
self.actual_file = nil
if err != nil {
return
}
}
}
ans, asz = utils.UnsafeBytesToString(cchunk), uncompressed_sz
return
}
func (self *SendManager) next_chunks(callback func(string)) error {
for {
if self.active_file() == nil {
self.activate_next_ready_file()
}
af := self.active_file()
if af == nil {
return nil
}
chunk := ""
self.current_chunk_uncompressed_sz = 0
for af.state != FINISHED && len(chunk) == 0 {
c, usz, err := af.next_chunk()
if err != nil {
return err
}
self.current_chunk_uncompressed_sz += int64(usz)
chunk = c
}
is_last := af.state == FINISHED
if len(chunk) > 0 {
split_for_transfer(utils.UnsafeStringToBytes(chunk), af.file_id, is_last, func(ftc *FileTransmissionCommand) { callback(ftc.Serialize()) })
} else if is_last {
callback(FileTransmissionCommand{Action: Action_end_data, File_id: af.file_id}.Serialize())
}
}
}
func (self *SendHandler) transmit_next_chunk() (err error) {
found_chunk := false
err = self.manager.next_chunks(func(chunk string) {
self.send_payload(chunk)
found_chunk = true
})
if err != nil {
return err
}
if !found_chunk {
if self.manager.all_acknowledged {
self.transfer_finished()
}
}
return
}
func (self *SendHandler) start_transfer() (err error) {
if self.manager.active_file() == nil {
self.manager.activate_next_ready_file()
}
if self.manager.active_file() != nil {
self.transmit_started = true
self.manager.progress_tracker.start_transfer()
err = self.transmit_next_chunk()
if err != nil {
return
}
self.draw_progress()
}
return
}
func (self *SendHandler) initialize() error {
self.manager.initialize()
self.spinner = tui.NewSpinner("dots")
@@ -692,6 +962,93 @@ func (self *SendHandler) initialize() error {
return nil
}
func (self *SendHandler) transfer_finished() {
self.send_payload(FileTransmissionCommand{Action: Action_finish}.Serialize())
if len(self.failed_files) > 0 {
self.quit_after_write_code = 1
} else {
self.quit_after_write_code = 0
}
}
func (self *SendHandler) on_text(text string, from_key_event, in_bracketed_paste bool) error {
if self.quit_after_write_code > -1 {
return nil
}
if self.check_paths_printed && !self.transmit_started {
switch strings.ToLower(text) {
case "y":
err := self.start_transfer()
if err != nil {
return err
}
if self.manager.all_acknowledged {
self.refresh_progress(0)
self.transfer_finished()
}
return nil
case "n":
self.failed_files = nil
self.abort_transfer()
self.lp.Println(`Sending cancel request to terminal`)
return nil
}
self.print_continue_msg()
}
return nil
}
func (self *SendHandler) print_continue_msg() {
self.lp.Println(
`Press`, self.ctx.Green(`y`), `to continue or`, self.ctx.BrightRed(`n`), `to abort`)
}
func (self *SendHandler) abort_transfer(delay ...time.Duration) {
d := 5 * time.Second
if len(delay) > 0 {
d = delay[0]
}
self.send_payload(FileTransmissionCommand{Action: Action_cancel}.Serialize())
self.manager.state = SEND_CANCELED
self.lp.AddTimer(d, false, func(loop.IdType) error {
self.lp.Quit(1)
return nil
})
}
func (self *SendHandler) on_key_event(ev *loop.KeyEvent) error {
if self.quit_after_write_code > -1 {
return nil
}
if ev.MatchesPressOrRepeat("esc") {
ev.Handled = true
if self.check_paths_printed && !self.transmit_started {
self.failed_files = nil
self.abort_transfer()
self.lp.Println(`Sending cancel request to terminal`)
return nil
} else {
self.on_interrupt()
}
} else if ev.MatchesPressOrRepeat("ctrl+c") {
self.on_interrupt()
ev.Handled = true
}
return nil
}
func (self *SendHandler) on_interrupt() {
if self.quit_after_write_code > -1 {
return
}
if self.manager.state == SEND_CANCELED {
self.lp.Println(`Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received`)
return
}
self.lp.Println(self.ctx.BrightRed(`Interrupt requested, cancelling transfer, transferred files are in undefined state`))
self.abort_transfer()
}
func send_loop(opts *Options, files []*File) (err error) {
lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
if err != nil {
@@ -712,6 +1069,24 @@ func send_loop(opts *Options, files []*File) (err error) {
lp.OnInitialize = func() (string, error) {
return "", handler.initialize()
}
ftc_code := strconv.Itoa(kitty.FileTransferCode)
lp.OnEscapeCode = func(et loop.EscapeCodeType, payload []byte) error {
if et == loop.OSC {
if idx := bytes.IndexByte(payload, ';'); idx > 0 {
if utils.UnsafeBytesToString(payload[:idx]) == ftc_code {
ftc, err := NewFileTransmissionCommand(utils.UnsafeBytesToString(payload[idx+1:]))
if err != nil {
return fmt.Errorf("Received invalid FileTransmissionCommand from terminal with error: %w", err)
}
return handler.on_file_transfer_response(ftc)
}
}
}
return nil
}
lp.OnText = handler.on_text
lp.OnKeyEvent = handler.on_key_event
return
}