mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-08 22:28:24 +02:00
Add tests for container transmission
This commit is contained in:
@@ -157,10 +157,14 @@ class TarExtractor:
|
|||||||
targetpath = resolve_name(tinfo.name, dest)
|
targetpath = resolve_name(tinfo.name, dest)
|
||||||
if targetpath is None:
|
if targetpath is None:
|
||||||
continue
|
continue
|
||||||
|
upperdirs = os.path.dirname(targetpath)
|
||||||
|
os.makedirs(upperdirs, exist_ok=True)
|
||||||
if tinfo.isdir():
|
if tinfo.isdir():
|
||||||
self.tf.makedir(tinfo, targetpath)
|
self.tf.makedir(tinfo, targetpath)
|
||||||
directories.append((targetpath, copy.copy(tinfo)))
|
directories.append((targetpath, copy.copy(tinfo)))
|
||||||
continue
|
continue
|
||||||
|
if tinfo.islnk():
|
||||||
|
tinfo._link_target = os.path.join(upperdirs, tinfo.linkname) # type: ignore
|
||||||
if tinfo.isfile():
|
if tinfo.isfile():
|
||||||
self.tf.makefile(tinfo, targetpath)
|
self.tf.makefile(tinfo, targetpath)
|
||||||
elif tinfo.isfifo():
|
elif tinfo.isfifo():
|
||||||
@@ -189,9 +193,8 @@ class ZipExtractor:
|
|||||||
def __call__(self, dest: str) -> None:
|
def __call__(self, dest: str) -> None:
|
||||||
for zinfo in self.zf.infolist():
|
for zinfo in self.zf.infolist():
|
||||||
targetpath = resolve_name(zinfo.filename, dest)
|
targetpath = resolve_name(zinfo.filename, dest)
|
||||||
if targetpath is None:
|
if targetpath is not None:
|
||||||
continue
|
self.zf.extract(zinfo, dest)
|
||||||
self.zf.extract(zinfo, targetpath)
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveCommand:
|
class ActiveCommand:
|
||||||
@@ -357,8 +360,10 @@ class FileTransmission:
|
|||||||
Container.extractor_for_container_fmt(cmd.file, cmd.ftc.container_fmt)(cmd.dest)
|
Container.extractor_for_container_fmt(cmd.file, cmd.ftc.container_fmt)(cmd.dest)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
self.send_fail_on_os_error(cmd.ftc, e, 'Failed to extract files from container')
|
self.send_fail_on_os_error(cmd.ftc, e, 'Failed to extract files from container')
|
||||||
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
self.send_response(cmd.ftc, status='EFAIL:Failed to extract files from container')
|
self.send_response(cmd.ftc, status='EFAIL:Failed to extract files from container')
|
||||||
|
raise
|
||||||
if not cmd.ftc.quiet:
|
if not cmd.ftc.quiet:
|
||||||
self.send_response(cmd.ftc, status='COMPLETED')
|
self.send_response(cmd.ftc, status='COMPLETED')
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -5,7 +5,11 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
import tarfile
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import zipfile
|
||||||
|
import zlib
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
from kitty.file_transmission import (
|
from kitty.file_transmission import (
|
||||||
Action, Compression, Container, FileTransmissionCommand,
|
Action, Compression, Container, FileTransmissionCommand,
|
||||||
@@ -15,6 +19,12 @@ from kitty.file_transmission import (
|
|||||||
from . import BaseTest
|
from . import BaseTest
|
||||||
|
|
||||||
|
|
||||||
|
def names_in(path):
|
||||||
|
for dirpath, dirnames, filenames in os.walk(path):
|
||||||
|
for d in dirnames + filenames:
|
||||||
|
yield os.path.relpath(os.path.join(dirpath, d), path)
|
||||||
|
|
||||||
|
|
||||||
def serialized_cmd(**fields) -> str:
|
def serialized_cmd(**fields) -> str:
|
||||||
for k, A in (('action', Action), ('container_fmt', Container), ('compression', Compression)):
|
for k, A in (('action', Action), ('container_fmt', Container), ('compression', Compression)):
|
||||||
if k in fields:
|
if k in fields:
|
||||||
@@ -35,6 +45,10 @@ class TestFileTransmission(BaseTest):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
shutil.rmtree(self.tdir)
|
shutil.rmtree(self.tdir)
|
||||||
|
|
||||||
|
def clean_tdir(self):
|
||||||
|
shutil.rmtree(self.tdir)
|
||||||
|
self.tdir = tempfile.mkdtemp()
|
||||||
|
|
||||||
def assertPathEqual(self, a, b):
|
def assertPathEqual(self, a, b):
|
||||||
a = os.path.abspath(os.path.realpath(a))
|
a = os.path.abspath(os.path.realpath(a))
|
||||||
b = os.path.abspath(os.path.realpath(b))
|
b = os.path.abspath(os.path.realpath(b))
|
||||||
@@ -74,3 +88,77 @@ class TestFileTransmission(BaseTest):
|
|||||||
self.ae(ft.test_responses, [{'status': 'OK'}])
|
self.ae(ft.test_responses, [{'status': 'OK'}])
|
||||||
self.assertFalse(os.path.exists(dest))
|
self.assertFalse(os.path.exists(dest))
|
||||||
self.assertFalse(ft.active_cmds)
|
self.assertFalse(ft.active_cmds)
|
||||||
|
# compress with zlib
|
||||||
|
ft = FileTransmission()
|
||||||
|
dest = os.path.join(self.tdir, '3.bin')
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='send', dest=dest, compression='zlib'))
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}])
|
||||||
|
odata = 'abcd' * 1024
|
||||||
|
data = zlib.compress(odata.encode('ascii'))
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='data', data=data[:len(data)//2]))
|
||||||
|
self.assertTrue(os.path.exists(dest))
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='end_data', data=data[len(data)//2:]))
|
||||||
|
with open(dest) as f:
|
||||||
|
self.ae(f.read(), odata)
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}, {'status': 'COMPLETED'}])
|
||||||
|
del odata
|
||||||
|
del data
|
||||||
|
|
||||||
|
# zip send
|
||||||
|
self.clean_tdir()
|
||||||
|
buf = BytesIO()
|
||||||
|
with zipfile.ZipFile(buf, 'w') as zf:
|
||||||
|
zf.writestr('one.txt', '1' * 1111)
|
||||||
|
zf.writestr('two/one', '2' * 2222)
|
||||||
|
zf.writestr('onex/../../three', '3333')
|
||||||
|
zf.writestr('/onex', '3333')
|
||||||
|
ft = FileTransmission()
|
||||||
|
dest = os.path.join(self.tdir, 'zf')
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='send', dest=dest, container_fmt='zip'))
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}])
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='end_data', data=buf.getvalue()))
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}, {'status': 'COMPLETED'}])
|
||||||
|
with open(os.path.join(dest, 'one.txt')) as f:
|
||||||
|
self.ae(f.read(), '1' * 1111)
|
||||||
|
with open(os.path.join(dest, 'two', 'one')) as f:
|
||||||
|
self.ae(f.read(), '2' * 2222)
|
||||||
|
self.ae({'zf', 'zf/two', 'zf/one.txt', 'zf/two/one'}, set(names_in(self.tdir)))
|
||||||
|
|
||||||
|
# tar send
|
||||||
|
for mode in ('', 'gz', 'bz2', 'xz'):
|
||||||
|
buf = BytesIO()
|
||||||
|
with tarfile.open(fileobj=buf, mode=f'w:{mode}') as tf:
|
||||||
|
def a(name, data, mode=0o717, lt=None):
|
||||||
|
ti = tarfile.TarInfo(name)
|
||||||
|
ti.mtime = 13
|
||||||
|
ti.size = len(data)
|
||||||
|
ti.mode = mode
|
||||||
|
if lt:
|
||||||
|
ti.linkname = data
|
||||||
|
ti.type = lt
|
||||||
|
tf.addfile(ti)
|
||||||
|
else:
|
||||||
|
tf.addfile(ti, BytesIO(data.encode('utf-8')))
|
||||||
|
a('a.txt', 'abcd')
|
||||||
|
a('/b.txt', 'abcd')
|
||||||
|
a('../c.txt', 'abcd')
|
||||||
|
a('sym', 'a.txt', lt=tarfile.SYMTYPE)
|
||||||
|
a('asym', '/abstarget', lt=tarfile.SYMTYPE)
|
||||||
|
a('link', 'a.txt', lt=tarfile.LNKTYPE)
|
||||||
|
self.clean_tdir()
|
||||||
|
ft = FileTransmission()
|
||||||
|
dest = os.path.join(self.tdir, 'tf')
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='send', dest=dest, container_fmt='t' + (mode or 'ar')))
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}])
|
||||||
|
ft.handle_serialized_command(serialized_cmd(action='end_data', data=buf.getvalue()))
|
||||||
|
self.ae(ft.test_responses, [{'status': 'OK'}, {'status': 'COMPLETED'}])
|
||||||
|
with open(os.path.join(dest, 'a.txt')) as f:
|
||||||
|
self.ae(f.read(), 'abcd')
|
||||||
|
st = os.stat(f.name)
|
||||||
|
self.ae(st.st_mode & 0b111111111, 0o717)
|
||||||
|
self.ae(st.st_mtime, 13)
|
||||||
|
self.ae(os.path.realpath(os.path.join(dest, 'sym')), f.name)
|
||||||
|
self.ae(os.path.realpath(os.path.join(dest, 'asym')), '/abstarget')
|
||||||
|
self.assertTrue(os.path.samefile(f.name, os.path.join(dest, 'link')))
|
||||||
|
self.ae({'tf', 'tf/a.txt', 'tf/sym', 'tf/asym', 'tf/link'}, set(names_in(self.tdir)))
|
||||||
|
self.ae(len(os.listdir(self.tdir)), 1)
|
||||||
|
|||||||
Reference in New Issue
Block a user