--- /usr/lib/python3.6/concurrent/futures/thread.py 2018-06-27 14:44:17.000000000 +0000 +++ /usr/lib/python3.7/concurrent/futures/thread.py 2018-07-20 18:01:12.651692412 +0000 @@ -41,6 +41,7 @@ atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -61,7 +62,17 @@ else: self.future.set_result(result) -def _worker(executor_reference, work_queue): + +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return try: while True: work_item = work_queue.get(block=True) @@ -76,6 +87,10 @@ # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True # Notice other workers work_queue.put(None) return @@ -83,18 +98,28 @@ except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) + +class BrokenThreadPool(_base.BrokenExecutor): + """ + Raised when a worker thread in a ThreadPoolExecutor failed initializing. + """ + + class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ - def __init__(self, max_workers=None, thread_name_prefix=''): + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. + initializer: An callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often @@ -103,18 +128,30 @@ if max_workers <= 0: raise ValueError("max_workers must be greater than 0") + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.SimpleQueue() self._threads = set() + self._broken = False self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs def submit(self, fn, *args, **kwargs): with self._shutdown_lock: + if self._broken: + raise BrokenThreadPool(self._broken) + if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -137,12 +174,27 @@ num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), - self._work_queue)) + self._work_queue, + self._initializer, + self._initargs)) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue + def _initializer_failed(self): + with self._shutdown_lock: + self._broken = ('A thread initializer failed, the thread pool ' + 'is not usable anymore') + # Drain work queue and mark pending futures failed + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception(BrokenThreadPool(self._broken)) + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True