remove duplicate things in vim_test_case.py

This commit is contained in:
spacewander 2015-03-05 18:21:09 +08:00
parent 56ac14840f
commit 4d88df461f
2 changed files with 5 additions and 279 deletions

View File

@ -2,12 +2,14 @@
import os import os
import re import re
import shutil
import subprocess import subprocess
import tempfile import tempfile
import textwrap import textwrap
import time import time
from test.constant import * from test.constant import (ARR_D, ARR_L, ARR_R, ARR_U, BS, ESC, PYTHON3,
SEQUENCES)
def wait_until_file_exists(file_path, times=None, interval=0.01): def wait_until_file_exists(file_path, times=None, interval=0.01):

View File

@ -3,40 +3,14 @@
# pylint: skip-file # pylint: skip-file
import os import os
import re
import shutil
import subprocess import subprocess
import tempfile import tempfile
import textwrap import textwrap
import time import time
import unittest import unittest
from test.constant import (ARR_D, ARR_L, ARR_R, ARR_U, BS, ESC, PYTHON3,
SEQUENCES)
from test.constant import PYTHON3
def is_process_running(pid): from test.vim_interface import create_directory, TempFileManager
"""Returns true if a process with pid is running, false otherwise."""
# from
# http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def silent_call(cmd):
"""Calls 'cmd' and returns the exit value."""
return subprocess.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
def create_directory(dirname):
"""Creates 'dirname' and its parents if it does not exist."""
try:
os.makedirs(dirname)
except OSError:
pass
def plugin_cache_dir(): def plugin_cache_dir():
@ -44,256 +18,6 @@ def plugin_cache_dir():
return os.path.join(tempfile.gettempdir(), 'UltiSnips_test_vim_plugins') return os.path.join(tempfile.gettempdir(), 'UltiSnips_test_vim_plugins')
def read_text_file(filename):
"""Reads the contens of a text file."""
if PYTHON3:
return open(filename, 'r', encoding='utf-8').read()
else:
return open(filename, 'r').read()
def wait_until_file_exists(file_path, times=None, interval=0.01):
while times is None or times:
if os.path.exists(file_path):
return True
time.sleep(interval)
if times is not None:
times -= 1
return False
class TempFileManager(object):
"""A TempFileManager keeps a unique prefix path for temp files.
A temp file, or a name for a temp file generate by a TempFileManager
always belongs to the same directory.
"""
def __init__(self, name=''):
"""The unique prefix path is UltiSnipsTest_{name}XXXXXX."""
self._temp_dir = tempfile.mkdtemp(prefix='UltiSnipsTest_' + name)
def name_temp(self, file_path):
"""Get the absolute path of a temp file by given file path."""
return os.path.join(self._temp_dir, file_path)
def write_temp(self, file_path, content):
"""Write the content to a temp file by given file path inside the
_temp_dir, and return the absolute path of that file."""
abs_path = self.name_temp(file_path)
create_directory(os.path.dirname(abs_path))
if PYTHON3:
with open(abs_path, 'w', encoding='utf-8') as f:
f.write(content)
else:
with open(abs_path, 'w') as f:
f.write(content)
return abs_path
def unique_name_temp(self, suffix='', prefix=''):
"""Generate a unique name for a temp file with given suffix and prefix,
and return full absolute path."""
file_handler, abspath = tempfile.mkstemp(
suffix, prefix, self._temp_dir)
os.close(file_handler)
os.remove(abspath)
return abspath
def clear_temp(self):
"""Clear the temp file directory, but the directory itself is not
removed."""
shutil.rmtree(self._temp_dir)
create_directory(self._temp_dir)
class VimInterface(TempFileManager):
def __init__(self, name=''):
TempFileManager.__init__(self, name)
def get_buffer_data(self):
buffer_path = self.unique_name_temp(prefix='buffer_')
self.send(ESC + ':w! %s\n' % buffer_path)
if wait_until_file_exists(buffer_path, 50):
return read_text_file(buffer_path)[:-1]
def send(self, s):
raise NotImplementedError()
def launch(self, config=[]):
pid_file = self.name_temp('vim.pid')
done_file = self.name_temp('loading_done')
if os.path.exists(done_file):
os.remove(done_file)
post_config = []
post_config.append('%s << EOF' % ('py3' if PYTHON3 else 'py'))
post_config.append('import vim')
post_config.append(
"with open('%s', 'w') as pid_file: pid_file.write(vim.eval('getpid()'))" %
pid_file)
post_config.append(
"with open('%s', 'w') as done_file: pass" %
done_file)
post_config.append('EOF')
config_path = self.write_temp('vim_config.vim',
textwrap.dedent(os.linesep.join(config + post_config) + '\n'))
# Note the space to exclude it from shell history.
self.send(""" vim -u %s\r\n""" % config_path)
wait_until_file_exists(done_file)
self._vim_pid = int(open(pid_file, 'r').read())
def leave_with_wait(self):
self.send(3 * ESC + ':qa!\n')
while is_process_running(self._vim_pid):
time.sleep(.05)
class VimInterfaceScreen(VimInterface):
def __init__(self, session):
VimInterface.__init__(self, 'Screen')
self.session = session
self.need_screen_escapes = 0
self.detect_parsing()
def send(self, s):
if self.need_screen_escapes:
# escape characters that are special to some versions of screen
repl = lambda m: '\\' + m.group(0)
s = re.sub(r"[$^#\\']", repl, s)
if PYTHON3:
s = s.encode('utf-8')
while True:
rv = 0
if len(s) > 30:
rv |= silent_call(
['screen', '-x', self.session, '-X', 'register', 'S', s])
rv |= silent_call(
['screen', '-x', self.session, '-X', 'paste', 'S'])
else:
rv |= silent_call(
['screen', '-x', self.session, '-X', 'stuff', s])
if not rv:
break
time.sleep(.2)
def detect_parsing(self):
self.launch()
# Send a string where the interpretation will depend on version of
# screen
string = '$TERM'
self.send('i' + string + ESC)
output = self.get_buffer_data()
# If the output doesn't match the input, need to do additional escaping
if output != string:
self.need_screen_escapes = 1
self.leave_with_wait()
class VimInterfaceTmux(VimInterface):
def __init__(self, session):
self.session = session
self._check_version()
def send(self, s):
# I did not find any documentation on what needs escaping when sending
# to tmux, but it seems like this is all that is needed for now.
s = s.replace(';', r'\;')
if PYTHON3:
s = s.encode('utf-8')
silent_call(['tmux', 'send-keys', '-t', self.session, '-l', s])
def _check_version(self):
stdout, _ = subprocess.Popen(['tmux', '-V'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if PYTHON3:
stdout = stdout.decode('utf-8')
m = re.match(r"tmux (\d+).(\d+)", stdout)
if not m or not (int(m.group(1)), int(m.group(2))) >= (1, 9):
raise RuntimeError(
'Need at least tmux 1.9, you have %s.' %
stdout.strip())
class VimInterfaceWindows(VimInterface):
BRACES = re.compile('([}{])')
WIN_ESCAPES = ['+', '^', '%', '~', '[', ']', '<', '>', '(', ')']
WIN_REPLACES = [
(BS, '{BS}'),
(ARR_L, '{LEFT}'),
(ARR_R, '{RIGHT}'),
(ARR_U, '{UP}'),
(ARR_D, '{DOWN}'),
('\t', '{TAB}'),
('\n', '~'),
(ESC, '{ESC}'),
# On my system ` waits for a second keystroke, so `+SPACE = "`". On
# most systems, `+Space = "` ". I work around this, by sending the host
# ` as `+_+BS. Awkward, but the only way I found to get this working.
('`', '`_{BS}'),
('´', '´_{BS}'),
('{^}', '{^}_{BS}'),
]
def __init__(self):
self.seq_buf = []
# import windows specific modules
import win32com.client
import win32gui
self.win32gui = win32gui
self.shell = win32com.client.Dispatch('WScript.Shell')
def is_focused(self, title=None):
cur_title = self.win32gui.GetWindowText(
self.win32gui.GetForegroundWindow())
if (title or '- GVIM') in cur_title:
return True
return False
def focus(self, title=None):
if not self.shell.AppActivate(title or '- GVIM'):
raise Exception('Failed to switch to GVim window')
time.sleep(1)
def convert_keys(self, keys):
keys = self.BRACES.sub(r"{\1}", keys)
for k in self.WIN_ESCAPES:
keys = keys.replace(k, '{%s}' % k)
for f, r in self.WIN_REPLACES:
keys = keys.replace(f, r)
return keys
def send(self, keys):
self.seq_buf.append(keys)
seq = ''.join(self.seq_buf)
for f in SEQUENCES:
if f.startswith(seq) and f != seq:
return
self.seq_buf = []
seq = self.convert_keys(seq)
if not self.is_focused():
time.sleep(2)
self.focus()
if not self.is_focused():
# This is the only way I can find to stop test execution
raise KeyboardInterrupt('Failed to focus GVIM')
self.shell.SendKeys(seq)
class VimTestCase(unittest.TestCase, TempFileManager): class VimTestCase(unittest.TestCase, TempFileManager):
snippets = () snippets = ()
files = {} files = {}