diff --git a/python/ycm/client/base_request.py b/python/ycm/client/base_request.py index ad3f1fbf..b2969c5a 100644 --- a/python/ycm/client/base_request.py +++ b/python/ycm/client/base_request.py @@ -23,12 +23,12 @@ import requests import urlparse from retries import retries from requests_futures.sessions import FuturesSession -from concurrent.futures import ThreadPoolExecutor +from ycm.unsafe_thread_pool_executor import UnsafeThreadPoolExecutor from ycm import vimsupport from ycm.server.responses import ServerError, UnknownExtraConf HEADERS = {'content-type': 'application/json'} -EXECUTOR = ThreadPoolExecutor( max_workers = 10 ) +EXECUTOR = UnsafeThreadPoolExecutor( max_workers = 10 ) # Setting this to None seems to screw up the Requests/urllib3 libs. DEFAULT_TIMEOUT_SEC = 30 diff --git a/python/ycm/unsafe_thread_pool_executor.py b/python/ycm/unsafe_thread_pool_executor.py new file mode 100644 index 00000000..3840dd37 --- /dev/null +++ b/python/ycm/unsafe_thread_pool_executor.py @@ -0,0 +1,119 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. +# +# Copyright (C) 2013 Strahinja Val Markovic +# Changes to this file are licensed under the same terms as the original file +# (the Python Software Foundation License). + + +from __future__ import with_statement +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + + +# This file provides an UnsafeThreadPoolExecutor, which operates exactly like +# the upstream Python version of ThreadPoolExecutor with one exception: it +# doesn't wait for worker threads to finish before shutting down the Python +# interpreter. +# +# This is dangerous for many workloads, but fine for some (like when threads +# only send network requests). The YCM workload is one of those workloads where +# it's safe (the aforementioned network requests case). + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException: + e = sys.exc_info()[1] + self.future.set_exception(e) + else: + self.future.set_result(result) + +def _worker(executor_reference, work_queue): + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + +class UnsafeThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + """ + self._max_workers = max_workers + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: + t = threading.Thread(target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue)) + t.daemon = True + t.start() + self._threads.add(t) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ +