# Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. # # Copyright (C) 2013 Strahinja Val Markovic # Changes to this file are licensed under the same terms as the original file # (the Python Software Foundation License). from __future__ import with_statement import threading import weakref import sys from concurrent.futures import _base try: import queue except ImportError: import Queue as queue # This file provides an UnsafeThreadPoolExecutor, which operates exactly like # the upstream Python version of ThreadPoolExecutor with one exception: it # doesn't wait for worker threads to finish before shutting down the Python # interpreter. # # This is dangerous for many workloads, but fine for some (like when threads # only send network requests). The YCM workload is one of those workloads where # it's safe (the aforementioned network requests case). class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException: e = sys.exc_info()[1] self.future.set_exception(e) else: self.future.set_result(result) def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() continue executor = executor_reference() # Exit if: # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class UnsafeThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. """ self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__