b1bba2e201
When I initially released this project, I released it under my own copyright. I have since then worked on it in my 20% time at Google (and want to continue doing this) and my life becomes much simpler if the copyright is Google's. From the perspective of how this project is run and managed, **NOTHING** changes. YCM is not a Google product, merely a project run by someone who just happens to work for Google. Please note that the license of the project is **NOT** changing. People sending in future pull requests will have to sign the Google [CLA](https://developers.google.com/open-source/cla/individual) (you can sign online at the bottom of that page) before those pull requests could be merged in. People who sent in pull requests that were merged in the past will get an email from me asking them to sign the CLA as well.
120 lines
3.9 KiB
Python
120 lines
3.9 KiB
Python
# Copyright 2009 Brian Quinlan. All Rights Reserved.
|
|
# Licensed to PSF under a Contributor Agreement.
|
|
#
|
|
# Copyright (C) 2013 Google Inc.
|
|
# Changes to this file are licensed under the same terms as the original file
|
|
# (the Python Software Foundation License).
|
|
|
|
|
|
from __future__ import with_statement
|
|
import threading
|
|
import weakref
|
|
import sys
|
|
|
|
from concurrent.futures import _base
|
|
|
|
try:
|
|
import queue
|
|
except ImportError:
|
|
import Queue as queue
|
|
|
|
|
|
# This file provides an UnsafeThreadPoolExecutor, which operates exactly like
|
|
# the upstream Python version of ThreadPoolExecutor with one exception: it
|
|
# doesn't wait for worker threads to finish before shutting down the Python
|
|
# interpreter.
|
|
#
|
|
# This is dangerous for many workloads, but fine for some (like when threads
|
|
# only send network requests). The YCM workload is one of those workloads where
|
|
# it's safe (the aforementioned network requests case).
|
|
|
|
class _WorkItem(object):
|
|
def __init__(self, future, fn, args, kwargs):
|
|
self.future = future
|
|
self.fn = fn
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def run(self):
|
|
if not self.future.set_running_or_notify_cancel():
|
|
return
|
|
|
|
try:
|
|
result = self.fn(*self.args, **self.kwargs)
|
|
except BaseException:
|
|
e = sys.exc_info()[1]
|
|
self.future.set_exception(e)
|
|
else:
|
|
self.future.set_result(result)
|
|
|
|
def _worker(executor_reference, work_queue):
|
|
try:
|
|
while True:
|
|
work_item = work_queue.get(block=True)
|
|
if work_item is not None:
|
|
work_item.run()
|
|
continue
|
|
executor = executor_reference()
|
|
# Exit if:
|
|
# - The executor that owns the worker has been collected OR
|
|
# - The executor that owns the worker has been shutdown.
|
|
if executor is None or executor._shutdown:
|
|
# Notice other workers
|
|
work_queue.put(None)
|
|
return
|
|
del executor
|
|
except BaseException:
|
|
_base.LOGGER.critical('Exception in worker', exc_info=True)
|
|
|
|
class UnsafeThreadPoolExecutor(_base.Executor):
|
|
def __init__(self, max_workers):
|
|
"""Initializes a new ThreadPoolExecutor instance.
|
|
|
|
Args:
|
|
max_workers: The maximum number of threads that can be used to
|
|
execute the given calls.
|
|
"""
|
|
self._max_workers = max_workers
|
|
self._work_queue = queue.Queue()
|
|
self._threads = set()
|
|
self._shutdown = False
|
|
self._shutdown_lock = threading.Lock()
|
|
|
|
def submit(self, fn, *args, **kwargs):
|
|
with self._shutdown_lock:
|
|
if self._shutdown:
|
|
raise RuntimeError('cannot schedule new futures after shutdown')
|
|
|
|
f = _base.Future()
|
|
w = _WorkItem(f, fn, args, kwargs)
|
|
|
|
self._work_queue.put(w)
|
|
self._adjust_thread_count()
|
|
return f
|
|
submit.__doc__ = _base.Executor.submit.__doc__
|
|
|
|
def _adjust_thread_count(self):
|
|
# When the executor gets lost, the weakref callback will wake up
|
|
# the worker threads.
|
|
def weakref_cb(_, q=self._work_queue):
|
|
q.put(None)
|
|
# TODO(bquinlan): Should avoid creating new threads if there are more
|
|
# idle threads than items in the work queue.
|
|
if len(self._threads) < self._max_workers:
|
|
t = threading.Thread(target=_worker,
|
|
args=(weakref.ref(self, weakref_cb),
|
|
self._work_queue))
|
|
t.daemon = True
|
|
t.start()
|
|
self._threads.add(t)
|
|
|
|
def shutdown(self, wait=True):
|
|
with self._shutdown_lock:
|
|
self._shutdown = True
|
|
self._work_queue.put(None)
|
|
if wait:
|
|
for t in self._threads:
|
|
t.join()
|
|
shutdown.__doc__ = _base.Executor.shutdown.__doc__
|
|
|