UltiSnips/test/vim_interface.py

248 lines
7.9 KiB
Python
Raw Normal View History

# encoding: utf-8
import os
import re
import subprocess
import tempfile
import textwrap
import time
from test.constant import *
def wait_until_file_exists(file_path, times=None, interval=0.01):
while times is None or times:
if os.path.exists(file_path):
return True
time.sleep(interval)
if times is not None:
times -= 1
return False
def read_text_file(filename):
"""Reads the contens of a text file."""
if PYTHON3:
return open(filename,"r", encoding="utf-8").read()
else:
return open(filename,"r").read()
def is_process_running(pid):
"""Returns true if a process with pid is running, false otherwise."""
# from http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def silent_call(cmd):
"""Calls 'cmd' and returns the exit value."""
return subprocess.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
def create_directory(dirname):
"""Creates 'dirname' and its parents if it does not exist."""
try:
os.makedirs(dirname)
except OSError:
pass
class TempFileManager(object):
def __init__(self, name=""):
self._temp_dir = tempfile.mkdtemp(prefix="UltiSnipsTest_" + name)
def name_temp(self, file_path):
return os.path.join(self._temp_dir, file_path)
def write_temp(self, file_path, content):
abs_path = self.name_temp(file_path)
create_directory(os.path.dirname(abs_path))
if PYTHON3:
with open(abs_path, "w", encoding="utf-8") as f:
f.write(content)
else:
with open(abs_path, "w") as f:
f.write(content)
return abs_path
def unique_name_temp(self, suffix="", prefix=""):
file_handler, abspath = tempfile.mkstemp(suffix, prefix, self._temp_dir)
os.close(file_handler)
os.remove(abspath)
return abspath
def clear_temp(self):
shutil.rmtree(self._temp_dir)
create_directory(self._temp_dir)
class VimInterface(TempFileManager):
def __init__(self, name=""):
TempFileManager.__init__(self, name)
def get_buffer_data(self):
buffer_path = self.unique_name_temp(prefix="buffer_")
self.send(ESC + ":w! %s\n" % buffer_path)
if wait_until_file_exists(buffer_path, 50):
return read_text_file(buffer_path)[:-1]
def send(self, s):
raise NotImplementedError()
def launch(self, config=[]):
pid_file = self.name_temp("vim.pid")
done_file = self.name_temp("loading_done")
if os.path.exists(done_file):
os.remove(done_file)
post_config = []
post_config.append("%s << EOF" % ("py3" if PYTHON3 else "py"))
post_config.append("import vim")
post_config.append("with open('%s', 'w') as pid_file: pid_file.write(vim.eval('getpid()'))" % pid_file)
post_config.append("with open('%s', 'w') as done_file: pass" % done_file)
post_config.append("EOF")
config_path = self.write_temp("vim_config.vim",
textwrap.dedent(os.linesep.join(config + post_config) + "\n"))
# Note the space to exclude it from shell history.
self.send(""" vim -u %s\r\n""" % config_path)
wait_until_file_exists(done_file)
self._vim_pid = int(open(pid_file, "r").read())
def leave_with_wait(self):
self.send(3*ESC + ":qa!\n")
while is_process_running(self._vim_pid):
time.sleep(.05)
class VimInterfaceScreen(VimInterface):
def __init__(self, session):
VimInterface.__init__(self, "Screen")
self.session = session
self.need_screen_escapes = 0
self.detect_parsing()
def send(self, s):
if self.need_screen_escapes:
# escape characters that are special to some versions of screen
repl = lambda m: '\\' + m.group(0)
s = re.sub( r"[$^#\\']", repl, s )
if PYTHON3:
s = s.encode("utf-8")
while True:
rv = 0
if len(s) > 30:
rv |= silent_call(["screen", "-x", self.session, "-X", "register", "S", s])
rv |= silent_call(["screen", "-x", self.session, "-X", "paste", "S"])
else:
rv |= silent_call(["screen", "-x", self.session, "-X", "stuff", s])
if not rv: break
time.sleep(.2)
def detect_parsing(self):
self.launch()
# Send a string where the interpretation will depend on version of screen
string = "$TERM"
self.send("i" + string + ESC)
output = self.get_buffer_data()
# If the output doesn't match the input, need to do additional escaping
if output != string:
self.need_screen_escapes = 1
self.leave_with_wait()
class VimInterfaceTmux(VimInterface):
def __init__(self, session):
VimInterface.__init__(self, "Tmux")
self.session = session
self._check_version()
def send(self, s):
# I did not find any documentation on what needs escaping when sending
# to tmux, but it seems like this is all that is needed for now.
s = s.replace(';', r'\;')
if PYTHON3:
s = s.encode("utf-8")
silent_call(["tmux", "send-keys", "-t", self.session, "-l", s])
def _check_version(self):
stdout, _ = subprocess.Popen(["tmux", "-V"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if PYTHON3:
stdout = stdout.decode("utf-8")
m = re.match(r"tmux (\d+).(\d+)", stdout)
if not m or not (int(m.group(1)), int(m.group(2))) >= (1, 8):
raise RuntimeError("Need at least tmux 1.8, you have %s." % stdout.strip())
class VimInterfaceWindows(VimInterface):
BRACES = re.compile("([}{])")
WIN_ESCAPES = ["+", "^", "%", "~", "[", "]", "<", ">", "(", ")"]
WIN_REPLACES = [
(BS, "{BS}"),
(ARR_L, "{LEFT}"),
(ARR_R, "{RIGHT}"),
(ARR_U, "{UP}"),
(ARR_D, "{DOWN}"),
("\t", "{TAB}"),
("\n", "~"),
(ESC, "{ESC}"),
# On my system ` waits for a second keystroke, so `+SPACE = "`". On
# most systems, `+Space = "` ". I work around this, by sending the host
# ` as `+_+BS. Awkward, but the only way I found to get this working.
("`", "`_{BS}"),
("´", "´_{BS}"),
("{^}", "{^}_{BS}"),
]
def __init__(self):
self.seq_buf = []
# import windows specific modules
import win32com.client, win32gui
self.win32gui = win32gui
self.shell = win32com.client.Dispatch("WScript.Shell")
def is_focused(self, title=None):
cur_title = self.win32gui.GetWindowText(self.win32gui.GetForegroundWindow())
if (title or "- GVIM") in cur_title:
return True
return False
def focus(self, title=None):
if not self.shell.AppActivate(title or "- GVIM"):
raise Exception("Failed to switch to GVim window")
time.sleep(1)
def convert_keys(self, keys):
keys = self.BRACES.sub(r"{\1}", keys)
for k in self.WIN_ESCAPES:
keys = keys.replace(k, "{%s}" % k)
for f, r in self.WIN_REPLACES:
keys = keys.replace(f, r)
return keys
def send(self, keys):
self.seq_buf.append(keys)
seq = "".join(self.seq_buf)
for f in SEQUENCES:
if f.startswith(seq) and f != seq:
return
self.seq_buf = []
seq = self.convert_keys(seq)
if not self.is_focused():
time.sleep(2)
self.focus()
if not self.is_focused():
# This is the only way I can find to stop test execution
raise KeyboardInterrupt("Failed to focus GVIM")
self.shell.SendKeys(seq)