7bfb4c3402
By default, a ThreadPoolExecutor will wait at Python interpreter shutdown for all the threads to stop by themselves before letting the interpreter shut down. We don't want that for the network requests thread pool, it causes a shutdown latency if there are outstanding requests. Killing the threads in our pool is perfectly safe so we can avoid the latency by introducing an UnsafeThreadPoolExecutor.
120 lines
3.9 KiB
Python
120 lines
3.9 KiB
Python
# Copyright 2009 Brian Quinlan. All Rights Reserved.
|
|
# Licensed to PSF under a Contributor Agreement.
|
|
#
|
|
# Copyright (C) 2013 Strahinja Val Markovic <val@markovic.io>
|
|
# Changes to this file are licensed under the same terms as the original file
|
|
# (the Python Software Foundation License).
|
|
|
|
|
|
from __future__ import with_statement
|
|
import threading
|
|
import weakref
|
|
import sys
|
|
|
|
from concurrent.futures import _base
|
|
|
|
try:
|
|
import queue
|
|
except ImportError:
|
|
import Queue as queue
|
|
|
|
|
|
# This file provides an UnsafeThreadPoolExecutor, which operates exactly like
|
|
# the upstream Python version of ThreadPoolExecutor with one exception: it
|
|
# doesn't wait for worker threads to finish before shutting down the Python
|
|
# interpreter.
|
|
#
|
|
# This is dangerous for many workloads, but fine for some (like when threads
|
|
# only send network requests). The YCM workload is one of those workloads where
|
|
# it's safe (the aforementioned network requests case).
|
|
|
|
class _WorkItem(object):
|
|
def __init__(self, future, fn, args, kwargs):
|
|
self.future = future
|
|
self.fn = fn
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def run(self):
|
|
if not self.future.set_running_or_notify_cancel():
|
|
return
|
|
|
|
try:
|
|
result = self.fn(*self.args, **self.kwargs)
|
|
except BaseException:
|
|
e = sys.exc_info()[1]
|
|
self.future.set_exception(e)
|
|
else:
|
|
self.future.set_result(result)
|
|
|
|
def _worker(executor_reference, work_queue):
|
|
try:
|
|
while True:
|
|
work_item = work_queue.get(block=True)
|
|
if work_item is not None:
|
|
work_item.run()
|
|
continue
|
|
executor = executor_reference()
|
|
# Exit if:
|
|
# - The executor that owns the worker has been collected OR
|
|
# - The executor that owns the worker has been shutdown.
|
|
if executor is None or executor._shutdown:
|
|
# Notice other workers
|
|
work_queue.put(None)
|
|
return
|
|
del executor
|
|
except BaseException:
|
|
_base.LOGGER.critical('Exception in worker', exc_info=True)
|
|
|
|
class UnsafeThreadPoolExecutor(_base.Executor):
|
|
def __init__(self, max_workers):
|
|
"""Initializes a new ThreadPoolExecutor instance.
|
|
|
|
Args:
|
|
max_workers: The maximum number of threads that can be used to
|
|
execute the given calls.
|
|
"""
|
|
self._max_workers = max_workers
|
|
self._work_queue = queue.Queue()
|
|
self._threads = set()
|
|
self._shutdown = False
|
|
self._shutdown_lock = threading.Lock()
|
|
|
|
def submit(self, fn, *args, **kwargs):
|
|
with self._shutdown_lock:
|
|
if self._shutdown:
|
|
raise RuntimeError('cannot schedule new futures after shutdown')
|
|
|
|
f = _base.Future()
|
|
w = _WorkItem(f, fn, args, kwargs)
|
|
|
|
self._work_queue.put(w)
|
|
self._adjust_thread_count()
|
|
return f
|
|
submit.__doc__ = _base.Executor.submit.__doc__
|
|
|
|
def _adjust_thread_count(self):
|
|
# When the executor gets lost, the weakref callback will wake up
|
|
# the worker threads.
|
|
def weakref_cb(_, q=self._work_queue):
|
|
q.put(None)
|
|
# TODO(bquinlan): Should avoid creating new threads if there are more
|
|
# idle threads than items in the work queue.
|
|
if len(self._threads) < self._max_workers:
|
|
t = threading.Thread(target=_worker,
|
|
args=(weakref.ref(self, weakref_cb),
|
|
self._work_queue))
|
|
t.daemon = True
|
|
t.start()
|
|
self._threads.add(t)
|
|
|
|
def shutdown(self, wait=True):
|
|
with self._shutdown_lock:
|
|
self._shutdown = True
|
|
self._work_queue.put(None)
|
|
if wait:
|
|
for t in self._threads:
|
|
t.join()
|
|
shutdown.__doc__ = _base.Executor.shutdown.__doc__
|
|
|