# Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. # # Copyright (C) 2013 Google Inc. # Changes to this file are licensed under the same terms as the original file # (the Python Software Foundation License). from __future__ import with_statement import threading import weakref import sys from concurrent.futures import _base try: import queue except ImportError: import Queue as queue # This file provides an UnsafeThreadPoolExecutor, which operates exactly like # the upstream Python version of ThreadPoolExecutor with one exception: it # doesn't wait for worker threads to finish before shutting down the Python # interpreter. # # This is dangerous for many workloads, but fine for some (like when threads # only send network requests). The YCM workload is one of those workloads where # it's safe (the aforementioned network requests case). class _WorkItem( object ): def __init__( self, future, fn, args, kwargs ): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run( self ): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn( *self.args, **self.kwargs ) except BaseException: e = sys.exc_info()[ 1 ] self.future.set_exception( e ) else: self.future.set_result( result ) def _worker( executor_reference, work_queue ): try: while True: work_item = work_queue.get( block=True ) if work_item is not None: work_item.run() continue executor = executor_reference() # Exit if: # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if executor is None or executor._shutdown: # Notice other workers work_queue.put( None ) return del executor except BaseException: _base.LOGGER.critical( 'Exception in worker', exc_info=True ) class UnsafeThreadPoolExecutor( _base.Executor ): def __init__( self, max_workers ): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. """ self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() def submit( self, fn, *args, **kwargs ): with self._shutdown_lock: if self._shutdown: raise RuntimeError( 'cannot schedule new futures after shutdown' ) f = _base.Future() w = _WorkItem( f, fn, args, kwargs ) self._work_queue.put( w ) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count( self ): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb( _, q=self._work_queue ): q.put( None ) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len( self._threads ) < self._max_workers: t = threading.Thread( target=_worker, args=( weakref.ref( self, weakref_cb ), self._work_queue ) ) t.daemon = True t.start() self._threads.add( t ) def shutdown( self, wait=True ): with self._shutdown_lock: self._shutdown = True self._work_queue.put( None ) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__