424 lines
15 KiB
Python
424 lines
15 KiB
Python
|
# encoding: utf-8
|
|||
|
|
|||
|
# pylint: skip-file
|
|||
|
|
|||
|
import os
|
|||
|
import re
|
|||
|
import shutil
|
|||
|
import subprocess
|
|||
|
import sys
|
|||
|
import tempfile
|
|||
|
import textwrap
|
|||
|
import time
|
|||
|
import unittest
|
|||
|
|
|||
|
from constant import *
|
|||
|
|
|||
|
def is_process_running(pid):
|
|||
|
"""Returns true if a process with pid is running, false otherwise."""
|
|||
|
# from http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid
|
|||
|
try:
|
|||
|
os.kill(pid, 0)
|
|||
|
except OSError:
|
|||
|
return False
|
|||
|
else:
|
|||
|
return True
|
|||
|
|
|||
|
def silent_call(cmd):
|
|||
|
"""Calls 'cmd' and returns the exit value."""
|
|||
|
return subprocess.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
|||
|
|
|||
|
def create_directory(dirname):
|
|||
|
"""Creates 'dirname' and its parents if it does not exist."""
|
|||
|
try:
|
|||
|
os.makedirs(dirname)
|
|||
|
except OSError:
|
|||
|
pass
|
|||
|
|
|||
|
def plugin_cache_dir():
|
|||
|
"""The directory that we check out our bundles to."""
|
|||
|
return os.path.join(tempfile.gettempdir(), "UltiSnips_test_vim_plugins")
|
|||
|
|
|||
|
def read_text_file(filename):
|
|||
|
"""Reads the contens of a text file."""
|
|||
|
if PYTHON3:
|
|||
|
return open(filename,"r", encoding="utf-8").read()
|
|||
|
else:
|
|||
|
return open(filename,"r").read()
|
|||
|
|
|||
|
def wait_until_file_exists(file_path, times=None, interval=0.01):
|
|||
|
while times is None or times:
|
|||
|
if os.path.exists(file_path):
|
|||
|
return True
|
|||
|
time.sleep(interval)
|
|||
|
if times is not None:
|
|||
|
times -= 1
|
|||
|
return False
|
|||
|
|
|||
|
class TempFileManager(object):
|
|||
|
"""A TempFileManager keeps a unique prefix path for temp
|
|||
|
files. A temp file, or a name for a temp file generate by a
|
|||
|
TempFileManager always belongs to the same directory.
|
|||
|
"""
|
|||
|
def __init__(self, name=""):
|
|||
|
"""The unique prefix path is UltiSnipsTest_{name}XXXXXX.
|
|||
|
"""
|
|||
|
self._temp_dir = tempfile.mkdtemp(prefix="UltiSnipsTest_" + name)
|
|||
|
|
|||
|
def name_temp(self, file_path):
|
|||
|
"""Get the absolute path of a temp file by given file path.
|
|||
|
"""
|
|||
|
return os.path.join(self._temp_dir, file_path)
|
|||
|
|
|||
|
def write_temp(self, file_path, content):
|
|||
|
"""Write the content to a temp file by given file path inside
|
|||
|
the _temp_dir, and return the absolute path of that file.
|
|||
|
"""
|
|||
|
abs_path = self.name_temp(file_path)
|
|||
|
create_directory(os.path.dirname(abs_path))
|
|||
|
if PYTHON3:
|
|||
|
with open(abs_path, "w", encoding="utf-8") as f:
|
|||
|
f.write(content)
|
|||
|
else:
|
|||
|
with open(abs_path, "w") as f:
|
|||
|
f.write(content)
|
|||
|
return abs_path
|
|||
|
|
|||
|
def unique_name_temp(self, suffix="", prefix=""):
|
|||
|
"""Generate a unique name for a temp file with given suffix and
|
|||
|
prefix, and return full absolute path.
|
|||
|
"""
|
|||
|
file_handler, abspath = tempfile.mkstemp(suffix, prefix, self._temp_dir)
|
|||
|
os.close(file_handler)
|
|||
|
os.remove(abspath)
|
|||
|
return abspath
|
|||
|
|
|||
|
def clear_temp(self):
|
|||
|
"""Clear the temp file directory, but the directory itself is
|
|||
|
not removed.
|
|||
|
"""
|
|||
|
shutil.rmtree(self._temp_dir)
|
|||
|
create_directory(self._temp_dir)
|
|||
|
|
|||
|
|
|||
|
class VimInterface(TempFileManager):
|
|||
|
def __init__(self, name=""):
|
|||
|
TempFileManager.__init__(self, name)
|
|||
|
|
|||
|
def get_buffer_data(self):
|
|||
|
buffer_path = self.unique_name_temp(prefix="buffer_")
|
|||
|
self.send(ESC + ":w! %s\n" % buffer_path)
|
|||
|
if wait_until_file_exists(buffer_path, 50):
|
|||
|
return read_text_file(buffer_path)[:-1]
|
|||
|
|
|||
|
def send(self, s):
|
|||
|
raise NotImplementedError()
|
|||
|
|
|||
|
def launch(self, config=[]):
|
|||
|
pid_file = self.name_temp("vim.pid")
|
|||
|
done_file = self.name_temp("loading_done")
|
|||
|
if os.path.exists(done_file):
|
|||
|
os.remove(done_file)
|
|||
|
|
|||
|
post_config = []
|
|||
|
post_config.append("%s << EOF" % ("py3" if PYTHON3 else "py"))
|
|||
|
post_config.append("import vim")
|
|||
|
post_config.append("with open('%s', 'w') as pid_file: pid_file.write(vim.eval('getpid()'))" % pid_file)
|
|||
|
post_config.append("with open('%s', 'w') as done_file: pass" % done_file)
|
|||
|
post_config.append("EOF")
|
|||
|
|
|||
|
config_path = self.write_temp("vim_config.vim",
|
|||
|
textwrap.dedent(os.linesep.join(config + post_config) + "\n"))
|
|||
|
|
|||
|
# Note the space to exclude it from shell history.
|
|||
|
self.send(""" vim -u %s\r\n""" % config_path)
|
|||
|
wait_until_file_exists(done_file)
|
|||
|
self._vim_pid = int(open(pid_file, "r").read())
|
|||
|
|
|||
|
def leave_with_wait(self):
|
|||
|
self.send(3*ESC + ":qa!\n")
|
|||
|
while is_process_running(self._vim_pid):
|
|||
|
time.sleep(.05)
|
|||
|
|
|||
|
|
|||
|
class VimInterfaceScreen(VimInterface):
|
|||
|
def __init__(self, session):
|
|||
|
VimInterface.__init__(self, "Screen")
|
|||
|
self.session = session
|
|||
|
self.need_screen_escapes = 0
|
|||
|
self.detect_parsing()
|
|||
|
|
|||
|
def send(self, s):
|
|||
|
if self.need_screen_escapes:
|
|||
|
# escape characters that are special to some versions of screen
|
|||
|
repl = lambda m: '\\' + m.group(0)
|
|||
|
s = re.sub( r"[$^#\\']", repl, s )
|
|||
|
|
|||
|
if PYTHON3:
|
|||
|
s = s.encode("utf-8")
|
|||
|
|
|||
|
while True:
|
|||
|
rv = 0
|
|||
|
if len(s) > 30:
|
|||
|
rv |= silent_call(["screen", "-x", self.session, "-X", "register", "S", s])
|
|||
|
rv |= silent_call(["screen", "-x", self.session, "-X", "paste", "S"])
|
|||
|
else:
|
|||
|
rv |= silent_call(["screen", "-x", self.session, "-X", "stuff", s])
|
|||
|
if not rv: break
|
|||
|
time.sleep(.2)
|
|||
|
|
|||
|
def detect_parsing(self):
|
|||
|
self.launch()
|
|||
|
# Send a string where the interpretation will depend on version of screen
|
|||
|
string = "$TERM"
|
|||
|
self.send("i" + string + ESC)
|
|||
|
output = self.get_buffer_data()
|
|||
|
# If the output doesn't match the input, need to do additional escaping
|
|||
|
if output != string:
|
|||
|
self.need_screen_escapes = 1
|
|||
|
self.leave_with_wait()
|
|||
|
|
|||
|
class VimInterfaceTmux(VimInterface):
|
|||
|
def __init__(self, session):
|
|||
|
self.session = session
|
|||
|
self._check_version()
|
|||
|
|
|||
|
def send(self, s):
|
|||
|
# I did not find any documentation on what needs escaping when sending
|
|||
|
# to tmux, but it seems like this is all that is needed for now.
|
|||
|
s = s.replace(';', r'\;')
|
|||
|
|
|||
|
if PYTHON3:
|
|||
|
s = s.encode("utf-8")
|
|||
|
silent_call(["tmux", "send-keys", "-t", self.session, "-l", s])
|
|||
|
|
|||
|
def _check_version(self):
|
|||
|
stdout, _ = subprocess.Popen(["tmux", "-V"],
|
|||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
|||
|
if PYTHON3:
|
|||
|
stdout = stdout.decode("utf-8")
|
|||
|
m = re.match(r"tmux (\d+).(\d+)", stdout)
|
|||
|
if not m or not (int(m.group(1)), int(m.group(2))) >= (1, 9):
|
|||
|
raise RuntimeError("Need at least tmux 1.9, you have %s." % stdout.strip())
|
|||
|
|
|||
|
class VimInterfaceWindows(VimInterface):
|
|||
|
BRACES = re.compile("([}{])")
|
|||
|
WIN_ESCAPES = ["+", "^", "%", "~", "[", "]", "<", ">", "(", ")"]
|
|||
|
WIN_REPLACES = [
|
|||
|
(BS, "{BS}"),
|
|||
|
(ARR_L, "{LEFT}"),
|
|||
|
(ARR_R, "{RIGHT}"),
|
|||
|
(ARR_U, "{UP}"),
|
|||
|
(ARR_D, "{DOWN}"),
|
|||
|
("\t", "{TAB}"),
|
|||
|
("\n", "~"),
|
|||
|
(ESC, "{ESC}"),
|
|||
|
|
|||
|
# On my system ` waits for a second keystroke, so `+SPACE = "`". On
|
|||
|
# most systems, `+Space = "` ". I work around this, by sending the host
|
|||
|
# ` as `+_+BS. Awkward, but the only way I found to get this working.
|
|||
|
("`", "`_{BS}"),
|
|||
|
("´", "´_{BS}"),
|
|||
|
("{^}", "{^}_{BS}"),
|
|||
|
]
|
|||
|
|
|||
|
def __init__(self):
|
|||
|
self.seq_buf = []
|
|||
|
# import windows specific modules
|
|||
|
import win32com.client, win32gui
|
|||
|
self.win32gui = win32gui
|
|||
|
self.shell = win32com.client.Dispatch("WScript.Shell")
|
|||
|
|
|||
|
def is_focused(self, title=None):
|
|||
|
cur_title = self.win32gui.GetWindowText(self.win32gui.GetForegroundWindow())
|
|||
|
if (title or "- GVIM") in cur_title:
|
|||
|
return True
|
|||
|
return False
|
|||
|
|
|||
|
def focus(self, title=None):
|
|||
|
if not self.shell.AppActivate(title or "- GVIM"):
|
|||
|
raise Exception("Failed to switch to GVim window")
|
|||
|
time.sleep(1)
|
|||
|
|
|||
|
def convert_keys(self, keys):
|
|||
|
keys = self.BRACES.sub(r"{\1}", keys)
|
|||
|
for k in self.WIN_ESCAPES:
|
|||
|
keys = keys.replace(k, "{%s}" % k)
|
|||
|
for f, r in self.WIN_REPLACES:
|
|||
|
keys = keys.replace(f, r)
|
|||
|
return keys
|
|||
|
|
|||
|
def send(self, keys):
|
|||
|
self.seq_buf.append(keys)
|
|||
|
seq = "".join(self.seq_buf)
|
|||
|
|
|||
|
for f in SEQUENCES:
|
|||
|
if f.startswith(seq) and f != seq:
|
|||
|
return
|
|||
|
self.seq_buf = []
|
|||
|
|
|||
|
seq = self.convert_keys(seq)
|
|||
|
|
|||
|
if not self.is_focused():
|
|||
|
time.sleep(2)
|
|||
|
self.focus()
|
|||
|
if not self.is_focused():
|
|||
|
# This is the only way I can find to stop test execution
|
|||
|
raise KeyboardInterrupt("Failed to focus GVIM")
|
|||
|
|
|||
|
self.shell.SendKeys(seq)
|
|||
|
|
|||
|
|
|||
|
class VimTestCase(unittest.TestCase, TempFileManager):
|
|||
|
snippets = ()
|
|||
|
files = {}
|
|||
|
text_before = " --- some text before --- \n\n"
|
|||
|
text_after = "\n\n --- some text after --- "
|
|||
|
expected_error = ""
|
|||
|
wanted = ""
|
|||
|
keys = ""
|
|||
|
sleeptime = 0.00
|
|||
|
output = ""
|
|||
|
plugins = []
|
|||
|
# Skip this test for the given reason or None for not skipping it.
|
|||
|
skip_if = lambda self: None
|
|||
|
version = None # Will be set to vim --version output
|
|||
|
|
|||
|
def __init__(self, *args, **kwargs):
|
|||
|
unittest.TestCase.__init__(self, *args, **kwargs)
|
|||
|
TempFileManager.__init__(self, "Case")
|
|||
|
|
|||
|
def runTest(self):
|
|||
|
# Only checks the output. All work is done in setUp().
|
|||
|
wanted = self.text_before + self.wanted + self.text_after
|
|||
|
if self.expected_error:
|
|||
|
self.assertRegexpMatches(self.output, self.expected_error)
|
|||
|
return
|
|||
|
for i in range(self.retries):
|
|||
|
if self.output != wanted:
|
|||
|
# Redo this, but slower
|
|||
|
self.sleeptime += 0.02
|
|||
|
self.tearDown()
|
|||
|
self.setUp()
|
|||
|
self.assertEqual(self.output, wanted)
|
|||
|
|
|||
|
def _extra_options_pre_init(self, vim_config):
|
|||
|
"""Adds extra lines to the vim_config list."""
|
|||
|
pass
|
|||
|
|
|||
|
def _extra_options_post_init(self, vim_config):
|
|||
|
"""Adds extra lines to the vim_config list."""
|
|||
|
pass
|
|||
|
|
|||
|
def _before_test(self):
|
|||
|
"""Send these keys before the test runs. Used for buffer local
|
|||
|
variables and other options."""
|
|||
|
pass
|
|||
|
|
|||
|
def _create_file(self, file_path, content):
|
|||
|
"""Creates a file in the runtimepath that is created for this test.
|
|||
|
Returns the absolute path to the file."""
|
|||
|
return self.write_temp(file_path, textwrap.dedent(content + "\n"))
|
|||
|
|
|||
|
def _link_file(self, source, relative_destination):
|
|||
|
"""Creates a link from 'source' to the 'relative_destination' in our temp dir."""
|
|||
|
absdir = self.name_temp(relative_destination)
|
|||
|
create_directory(absdir)
|
|||
|
os.symlink(source, os.path.join(absdir, os.path.basename(source)))
|
|||
|
|
|||
|
def setUp(self):
|
|||
|
if not VimTestCase.version:
|
|||
|
VimTestCase.version, _ = subprocess.Popen(["vim", "--version"],
|
|||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
|||
|
if PYTHON3:
|
|||
|
VimTestCase.version = VimTestCase.version.decode("utf-8")
|
|||
|
|
|||
|
if self.plugins and not self.test_plugins:
|
|||
|
return self.skipTest("Not testing integration with other plugins.")
|
|||
|
reason_for_skipping = self.skip_if()
|
|||
|
if reason_for_skipping is not None:
|
|||
|
return self.skipTest(reason_for_skipping)
|
|||
|
|
|||
|
vim_config = []
|
|||
|
vim_config.append('set nocompatible')
|
|||
|
vim_config.append('set runtimepath=$VIMRUNTIME,.,%s' % self._temp_dir)
|
|||
|
|
|||
|
if self.plugins:
|
|||
|
self._link_file(os.path.join(plugin_cache_dir(), "vim-pathogen", "autoload"), ".")
|
|||
|
for plugin in self.plugins:
|
|||
|
self._link_file(os.path.join(plugin_cache_dir(), os.path.basename(plugin)), "bundle")
|
|||
|
vim_config.append("execute pathogen#infect()")
|
|||
|
|
|||
|
# Vim parameters.
|
|||
|
vim_config.append('syntax on')
|
|||
|
vim_config.append('filetype plugin indent on')
|
|||
|
vim_config.append('set clipboard=""')
|
|||
|
vim_config.append('set encoding=utf-8')
|
|||
|
vim_config.append('set fileencoding=utf-8')
|
|||
|
vim_config.append('set buftype=nofile')
|
|||
|
vim_config.append('set shortmess=at')
|
|||
|
vim_config.append('let g:UltiSnipsExpandTrigger="<tab>"')
|
|||
|
vim_config.append('let g:UltiSnipsJumpForwardTrigger="?"')
|
|||
|
vim_config.append('let g:UltiSnipsJumpBackwardTrigger="+"')
|
|||
|
vim_config.append('let g:UltiSnipsListSnippets="@"')
|
|||
|
vim_config.append('let g:UltiSnipsUsePythonVersion="%i"' % (3 if PYTHON3 else 2))
|
|||
|
vim_config.append('let g:UltiSnipsSnippetDirectories=["us"]')
|
|||
|
|
|||
|
self._extra_options_pre_init(vim_config)
|
|||
|
|
|||
|
# Now activate UltiSnips.
|
|||
|
vim_config.append('call UltiSnips#bootstrap#Bootstrap()')
|
|||
|
|
|||
|
self._extra_options_post_init(vim_config)
|
|||
|
|
|||
|
# Finally, add the snippets and some configuration for the test.
|
|||
|
vim_config.append("%s << EOF" % ("py3" if PYTHON3 else "py"))
|
|||
|
|
|||
|
if len(self.snippets) and not isinstance(self.snippets[0],tuple):
|
|||
|
self.snippets = ( self.snippets, )
|
|||
|
for s in self.snippets:
|
|||
|
sv, content = s[:2]
|
|||
|
description = ""
|
|||
|
options = ""
|
|||
|
priority = 0
|
|||
|
if len(s) > 2:
|
|||
|
description = s[2]
|
|||
|
if len(s) > 3:
|
|||
|
options = s[3]
|
|||
|
if len(s) > 4:
|
|||
|
priority = s[4]
|
|||
|
vim_config.append("UltiSnips_Manager.add_snippet(%r, %r, %r, %r, priority=%i)" % (
|
|||
|
sv, content, description, options, priority))
|
|||
|
|
|||
|
# fill buffer with default text and place cursor in between.
|
|||
|
prefilled_text = (self.text_before + self.text_after).splitlines()
|
|||
|
vim_config.append("vim.current.buffer[:] = %r\n" % prefilled_text)
|
|||
|
vim_config.append("vim.current.window.cursor = (max(len(vim.current.buffer)//2, 1), 0)")
|
|||
|
|
|||
|
# End of python stuff.
|
|||
|
vim_config.append("EOF")
|
|||
|
|
|||
|
for name, content in self.files.items():
|
|||
|
self._create_file(name, content)
|
|||
|
|
|||
|
self.vim.launch(vim_config)
|
|||
|
|
|||
|
self._before_test()
|
|||
|
|
|||
|
if not self.interrupt:
|
|||
|
# Go into insert mode and type the keys but leave Vim some time to
|
|||
|
# react.
|
|||
|
for c in 'i' + self.keys:
|
|||
|
self.vim.send(c)
|
|||
|
time.sleep(self.sleeptime)
|
|||
|
self.output = self.vim.get_buffer_data()
|
|||
|
|
|||
|
def tearDown(self):
|
|||
|
if self.interrupt:
|
|||
|
print("Working directory: %s" % (self._temp_dir))
|
|||
|
return
|
|||
|
self.vim.leave_with_wait()
|
|||
|
self.clear_temp()
|
|||
|
|
|||
|
# vim:fileencoding=utf-8:foldmarker={{{#,#}}}:
|