diff -Nru python3.5-3.5.0/Lib/asyncio/base_events.py python3.5-3.5.1/Lib/asyncio/base_events.py --- python3.5-3.5.0/Lib/asyncio/base_events.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/base_events.py 2015-12-07 01:39:07.000000000 +0000 @@ -18,6 +18,7 @@ import concurrent.futures import heapq import inspect +import itertools import logging import os import socket @@ -69,10 +70,6 @@ return repr(fd) -class _StopError(BaseException): - """Raised to stop the event loop.""" - - def _check_resolved_address(sock, address): # Ensure that the address is already resolved to avoid the trap of hanging # the entire event loop when the address requires doing a DNS lookup. @@ -117,9 +114,6 @@ "got host %r: %s" % (host, err)) -def _raise_stop_error(*args): - raise _StopError - def _run_until_complete_cb(fut): exc = fut._exception @@ -128,7 +122,7 @@ # Issue #22429: run_forever() already finished, no need to # stop it. return - _raise_stop_error() + fut._loop.stop() class Server(events.AbstractServer): @@ -183,6 +177,7 @@ def __init__(self): self._timer_cancelled_count = 0 self._closed = False + self._stopping = False self._ready = collections.deque() self._scheduled = [] self._default_executor = None @@ -297,11 +292,11 @@ self._thread_id = threading.get_ident() try: while True: - try: - self._run_once() - except _StopError: + self._run_once() + if self._stopping: break finally: + self._stopping = False self._thread_id = None self._set_coroutine_wrapper(False) @@ -344,11 +339,10 @@ def stop(self): """Stop running the event loop. - Every callback scheduled before stop() is called will run. Callbacks - scheduled after stop() is called will not run. However, those callbacks - will run if run_forever is called again later. + Every callback already scheduled will still run. This simply informs + run_forever to stop looping after a complete iteration. """ - self.call_soon(_raise_stop_error) + self._stopping = True def close(self): """Close the event loop. @@ -699,75 +693,109 @@ @coroutine def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0): + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): """Create datagram connection.""" - if not (local_addr or remote_addr): - if family == 0: - raise ValueError('unexpected address family') - addr_pairs_info = (((family, proto), (None, None)),) - else: - # join address by (family, protocol) - addr_infos = collections.OrderedDict() - for idx, addr in ((0, local_addr), (1, remote_addr)): - if addr is not None: - assert isinstance(addr, tuple) and len(addr) == 2, ( - '2-tuple is expected') - - infos = yield from self.getaddrinfo( - *addr, family=family, type=socket.SOCK_DGRAM, - proto=proto, flags=flags) - if not infos: - raise OSError('getaddrinfo() returned empty list') - - for fam, _, pro, _, address in infos: - key = (fam, pro) - if key not in addr_infos: - addr_infos[key] = [None, None] - addr_infos[key][idx] = address - - # each addr has to have info for each (family, proto) pair - addr_pairs_info = [ - (key, addr_pair) for key, addr_pair in addr_infos.items() - if not ((local_addr and addr_pair[0] is None) or - (remote_addr and addr_pair[1] is None))] - - if not addr_pairs_info: - raise ValueError('can not get address information') - - exceptions = [] - - for ((family, proto), - (local_address, remote_address)) in addr_pairs_info: - sock = None + if sock is not None: + if (local_addr or remote_addr or + family or proto or flags or + reuse_address or reuse_port or allow_broadcast): + # show the problematic kwargs in exception msg + opts = dict(local_addr=local_addr, remote_addr=remote_addr, + family=family, proto=proto, flags=flags, + reuse_address=reuse_address, reuse_port=reuse_port, + allow_broadcast=allow_broadcast) + problems = ', '.join( + '{}={}'.format(k, v) for k, v in opts.items() if v) + raise ValueError( + 'socket modifier keyword arguments can not be used ' + 'when sock is specified. ({})'.format(problems)) + sock.setblocking(False) r_addr = None - try: - sock = socket.socket( - family=family, type=socket.SOCK_DGRAM, proto=proto) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setblocking(False) - - if local_addr: - sock.bind(local_address) - if remote_addr: - yield from self.sock_connect(sock, remote_address) - r_addr = remote_address - except OSError as exc: - if sock is not None: - sock.close() - exceptions.append(exc) - except: - if sock is not None: - sock.close() - raise - else: - break else: - raise exceptions[0] + if not (local_addr or remote_addr): + if family == 0: + raise ValueError('unexpected address family') + addr_pairs_info = (((family, proto), (None, None)),) + else: + # join address by (family, protocol) + addr_infos = collections.OrderedDict() + for idx, addr in ((0, local_addr), (1, remote_addr)): + if addr is not None: + assert isinstance(addr, tuple) and len(addr) == 2, ( + '2-tuple is expected') + + infos = yield from self.getaddrinfo( + *addr, family=family, type=socket.SOCK_DGRAM, + proto=proto, flags=flags) + if not infos: + raise OSError('getaddrinfo() returned empty list') + + for fam, _, pro, _, address in infos: + key = (fam, pro) + if key not in addr_infos: + addr_infos[key] = [None, None] + addr_infos[key][idx] = address + + # each addr has to have info for each (family, proto) pair + addr_pairs_info = [ + (key, addr_pair) for key, addr_pair in addr_infos.items() + if not ((local_addr and addr_pair[0] is None) or + (remote_addr and addr_pair[1] is None))] + + if not addr_pairs_info: + raise ValueError('can not get address information') + + exceptions = [] + + if reuse_address is None: + reuse_address = os.name == 'posix' and sys.platform != 'cygwin' + + for ((family, proto), + (local_address, remote_address)) in addr_pairs_info: + sock = None + r_addr = None + try: + sock = socket.socket( + family=family, type=socket.SOCK_DGRAM, proto=proto) + if reuse_address: + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if reuse_port: + if not hasattr(socket, 'SO_REUSEPORT'): + raise ValueError( + 'reuse_port not supported by socket module') + else: + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if allow_broadcast: + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.setblocking(False) + + if local_addr: + sock.bind(local_address) + if remote_addr: + yield from self.sock_connect(sock, remote_address) + r_addr = remote_address + except OSError as exc: + if sock is not None: + sock.close() + exceptions.append(exc) + except: + if sock is not None: + sock.close() + raise + else: + break + else: + raise exceptions[0] protocol = protocol_factory() waiter = futures.Future(loop=self) - transport = self._make_datagram_transport(sock, protocol, r_addr, - waiter) + transport = self._make_datagram_transport( + sock, protocol, r_addr, waiter) if self._debug: if local_addr: logger.info("Datagram endpoint local_addr=%r remote_addr=%r " @@ -787,6 +815,15 @@ return transport, protocol @coroutine + def _create_server_getaddrinfo(self, host, port, family, flags): + infos = yield from self.getaddrinfo(host, port, family=family, + type=socket.SOCK_STREAM, + flags=flags) + if not infos: + raise OSError('getaddrinfo({!r}) returned empty list'.format(host)) + return infos + + @coroutine def create_server(self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, @@ -794,8 +831,15 @@ sock=None, backlog=100, ssl=None, - reuse_address=None): - """Create a TCP server bound to host and port. + reuse_address=None, + reuse_port=None): + """Create a TCP server. + + The host parameter can be a string, in that case the TCP server is bound + to host and port. + + The host parameter can also be a sequence of strings and in that case + the TCP server is bound to all hosts of the sequence. Return a Server object which can be used to stop the service. @@ -813,13 +857,18 @@ reuse_address = os.name == 'posix' and sys.platform != 'cygwin' sockets = [] if host == '': - host = None + hosts = [None] + elif (isinstance(host, str) or + not isinstance(host, collections.Iterable)): + hosts = [host] + else: + hosts = host - infos = yield from self.getaddrinfo( - host, port, family=family, - type=socket.SOCK_STREAM, proto=0, flags=flags) - if not infos: - raise OSError('getaddrinfo() returned empty list') + fs = [self._create_server_getaddrinfo(host, port, family=family, + flags=flags) + for host in hosts] + infos = yield from tasks.gather(*fs, loop=self) + infos = itertools.chain.from_iterable(infos) completed = False try: @@ -836,8 +885,15 @@ continue sockets.append(sock) if reuse_address: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, - True) + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + if reuse_port: + if not hasattr(socket, 'SO_REUSEPORT'): + raise ValueError( + 'reuse_port not supported by socket module') + else: + sock.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEPORT, True) # Disable IPv4/IPv6 dual stack support (enabled by # default on Linux) which makes a single socket # listen on both address families. @@ -1131,7 +1187,7 @@ handle._scheduled = False timeout = None - if self._ready: + if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. diff -Nru python3.5-3.5.0/Lib/asyncio/base_subprocess.py python3.5-3.5.1/Lib/asyncio/base_subprocess.py --- python3.5-3.5.0/Lib/asyncio/base_subprocess.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/base_subprocess.py 2015-12-07 01:39:07.000000000 +0000 @@ -87,6 +87,9 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): raise NotImplementedError + def is_closing(self): + return self._closed + def close(self): if self._closed: return diff -Nru python3.5-3.5.0/Lib/asyncio/coroutines.py python3.5-3.5.1/Lib/asyncio/coroutines.py --- python3.5-3.5.0/Lib/asyncio/coroutines.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/coroutines.py 2015-12-07 01:39:07.000000000 +0000 @@ -140,7 +140,13 @@ if compat.PY35: - __await__ = __iter__ # make compatible with 'await' expression + def __await__(self): + cr_await = getattr(self.gen, 'cr_await', None) + if cr_await is not None: + raise RuntimeError( + "Cannot await on coroutine {!r} while it's " + "awaiting for {!r}".format(self.gen, cr_await)) + return self @property def gi_yieldfrom(self): diff -Nru python3.5-3.5.0/Lib/asyncio/events.py python3.5-3.5.1/Lib/asyncio/events.py --- python3.5-3.5.0/Lib/asyncio/events.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/events.py 2015-12-07 01:39:07.000000000 +0000 @@ -297,7 +297,8 @@ def create_server(self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, - sock=None, backlog=100, ssl=None, reuse_address=None): + sock=None, backlog=100, ssl=None, reuse_address=None, + reuse_port=None): """A coroutine which creates a TCP server bound to host and port. The return value is a Server object which can be used to stop @@ -305,7 +306,8 @@ If host is an empty string or None all interfaces are assumed and a list of multiple sockets will be returned (most likely - one for IPv4 and another one for IPv6). + one for IPv4 and another one for IPv6). The host parameter can also be a + sequence (e.g. list) of hosts to bind to. family can be set to either AF_INET or AF_INET6 to force the socket to use IPv4 or IPv6. If not set it will be determined @@ -326,6 +328,11 @@ TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows. """ raise NotImplementedError @@ -357,7 +364,37 @@ def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0): + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): + """A coroutine which creates a datagram endpoint. + + This method will try to establish the endpoint in the background. + When successful, the coroutine returns a (transport, protocol) pair. + + protocol_factory must be a callable returning a protocol instance. + + socket family AF_INET or socket.AF_INET6 depending on host (or + family if specified), socket type SOCK_DGRAM. + + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified it will automatically be set to True on + UNIX. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows and some UNIX's. If the + :py:data:`~socket.SO_REUSEPORT` constant is not defined then this + capability is unsupported. + + allow_broadcast tells the kernel to allow this endpoint to send + messages to the broadcast address. + + sock can optionally be specified in order to use a preexisting + socket object. + """ raise NotImplementedError # Pipes and subprocesses. diff -Nru python3.5-3.5.0/Lib/asyncio/futures.py python3.5-3.5.1/Lib/asyncio/futures.py --- python3.5-3.5.0/Lib/asyncio/futures.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/futures.py 2015-12-07 01:39:07.000000000 +0000 @@ -154,7 +154,7 @@ if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) - def _format_callbacks(self): + def __format_callbacks(self): cb = self._callbacks size = len(cb) if not size: @@ -184,7 +184,7 @@ result = reprlib.repr(self._result) info.append('result={}'.format(result)) if self._callbacks: - info.append(self._format_callbacks()) + info.append(self.__format_callbacks()) if self._source_traceback: frame = self._source_traceback[-1] info.append('created at %s:%s' % (frame[0], frame[1])) @@ -319,12 +319,6 @@ # So-called internal methods (note: no set_running_or_notify_cancel()). - def _set_result_unless_cancelled(self, result): - """Helper setting the result only if the future was not cancelled.""" - if self.cancelled(): - return - self.set_result(result) - def set_result(self, result): """Mark the future done and set its result. @@ -358,27 +352,6 @@ # have had a chance to call result() or exception(). self._loop.call_soon(self._tb_logger.activate) - # Truly internal methods. - - def _copy_state(self, other): - """Internal helper to copy state from another Future. - - The other Future may be a concurrent.futures.Future. - """ - assert other.done() - if self.cancelled(): - return - assert not self.done() - if other.cancelled(): - self.cancel() - else: - exception = other.exception() - if exception is not None: - self.set_exception(exception) - else: - result = other.result() - self.set_result(result) - def __iter__(self): if not self.done(): self._blocking = True @@ -390,22 +363,91 @@ __await__ = __iter__ # make compatible with 'await' expression -def wrap_future(fut, *, loop=None): +def _set_result_unless_cancelled(fut, result): + """Helper setting the result only if the future was not cancelled.""" + if fut.cancelled(): + return + fut.set_result(result) + + +def _set_concurrent_future_state(concurrent, source): + """Copy state from a future to a concurrent.futures.Future.""" + assert source.done() + if source.cancelled(): + concurrent.cancel() + if not concurrent.set_running_or_notify_cancel(): + return + exception = source.exception() + if exception is not None: + concurrent.set_exception(exception) + else: + result = source.result() + concurrent.set_result(result) + + +def _copy_future_state(source, dest): + """Internal helper to copy state from another Future. + + The other Future may be a concurrent.futures.Future. + """ + assert source.done() + if dest.cancelled(): + return + assert not dest.done() + if source.cancelled(): + dest.cancel() + else: + exception = source.exception() + if exception is not None: + dest.set_exception(exception) + else: + result = source.result() + dest.set_result(result) + + +def _chain_future(source, destination): + """Chain two futures so that when one completes, so does the other. + + The result (or exception) of source will be copied to destination. + If destination is cancelled, source gets cancelled too. + Compatible with both asyncio.Future and concurrent.futures.Future. + """ + if not isinstance(source, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for source argument') + if not isinstance(destination, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for destination argument') + source_loop = source._loop if isinstance(source, Future) else None + dest_loop = destination._loop if isinstance(destination, Future) else None + + def _set_state(future, other): + if isinstance(future, Future): + _copy_future_state(other, future) + else: + _set_concurrent_future_state(future, other) + + def _call_check_cancel(destination): + if destination.cancelled(): + if source_loop is None or source_loop is dest_loop: + source.cancel() + else: + source_loop.call_soon_threadsafe(source.cancel) + + def _call_set_state(source): + if dest_loop is None or dest_loop is source_loop: + _set_state(destination, source) + else: + dest_loop.call_soon_threadsafe(_set_state, destination, source) + + destination.add_done_callback(_call_check_cancel) + source.add_done_callback(_call_set_state) + + +def wrap_future(future, *, loop=None): """Wrap concurrent.futures.Future object.""" - if isinstance(fut, Future): - return fut - assert isinstance(fut, concurrent.futures.Future), \ - 'concurrent.futures.Future is expected, got {!r}'.format(fut) - if loop is None: - loop = events.get_event_loop() + if isinstance(future, Future): + return future + assert isinstance(future, concurrent.futures.Future), \ + 'concurrent.futures.Future is expected, got {!r}'.format(future) new_future = Future(loop=loop) - - def _check_cancel_other(f): - if f.cancelled(): - fut.cancel() - - new_future.add_done_callback(_check_cancel_other) - fut.add_done_callback( - lambda future: loop.call_soon_threadsafe( - new_future._copy_state, future)) + _chain_future(future, new_future) return new_future diff -Nru python3.5-3.5.0/Lib/asyncio/locks.py python3.5-3.5.1/Lib/asyncio/locks.py --- python3.5-3.5.0/Lib/asyncio/locks.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/locks.py 2015-12-07 01:39:07.000000000 +0000 @@ -411,6 +411,13 @@ extra = '{},waiters:{}'.format(extra, len(self._waiters)) return '<{} [{}]>'.format(res[1:-1], extra) + def _wake_up_next(self): + while self._waiters: + waiter = self._waiters.popleft() + if not waiter.done(): + waiter.set_result(None) + return + def locked(self): """Returns True if semaphore can not be acquired immediately.""" return self._value == 0 @@ -425,18 +432,19 @@ called release() to make it larger than 0, and then return True. """ - if not self._waiters and self._value > 0: - self._value -= 1 - return True - - fut = futures.Future(loop=self._loop) - self._waiters.append(fut) - try: - yield from fut - self._value -= 1 - return True - finally: - self._waiters.remove(fut) + while self._value <= 0: + fut = futures.Future(loop=self._loop) + self._waiters.append(fut) + try: + yield from fut + except: + # See the similar code in Queue.get. + fut.cancel() + if self._value > 0 and not fut.cancelled(): + self._wake_up_next() + raise + self._value -= 1 + return True def release(self): """Release a semaphore, incrementing the internal counter by one. @@ -444,10 +452,7 @@ become larger than zero again, wake up that coroutine. """ self._value += 1 - for waiter in self._waiters: - if not waiter.done(): - waiter.set_result(True) - break + self._wake_up_next() class BoundedSemaphore(Semaphore): diff -Nru python3.5-3.5.0/Lib/asyncio/proactor_events.py python3.5-3.5.1/Lib/asyncio/proactor_events.py --- python3.5-3.5.0/Lib/asyncio/proactor_events.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/proactor_events.py 2015-12-07 01:39:07.000000000 +0000 @@ -41,7 +41,8 @@ self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: # only wake up the waiter when connection_made() has been called - self._loop.call_soon(waiter._set_result_unless_cancelled, None) + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) def __repr__(self): info = [self.__class__.__name__] @@ -65,6 +66,9 @@ def _set_extra(self, sock): self._extra['pipe'] = sock + def is_closing(self): + return self._closing + def close(self): if self._closing: return diff -Nru python3.5-3.5.0/Lib/asyncio/queues.py python3.5-3.5.1/Lib/asyncio/queues.py --- python3.5-3.5.0/Lib/asyncio/queues.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/queues.py 2015-12-07 01:39:07.000000000 +0000 @@ -47,7 +47,7 @@ # Futures. self._getters = collections.deque() - # Futures + # Futures. self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -67,10 +67,13 @@ # End of the overridable methods. - def __put_internal(self, item): - self._put(item) - self._unfinished_tasks += 1 - self._finished.clear() + def _wakeup_next(self, waiters): + # Wake up the next waiter (if any) that isn't cancelled. + while waiters: + waiter = waiters.popleft() + if not waiter.done(): + waiter.set_result(None) + break def __repr__(self): return '<{} at {:#x} {}>'.format( @@ -91,16 +94,6 @@ result += ' tasks={}'.format(self._unfinished_tasks) return result - def _consume_done_getters(self): - # Delete waiters at the head of the get() queue who've timed out. - while self._getters and self._getters[0].done(): - self._getters.popleft() - - def _consume_done_putters(self): - # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0].done(): - self._putters.popleft() - def qsize(self): """Number of items in the queue.""" return len(self._queue) @@ -134,47 +127,31 @@ This method is a coroutine. """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(self._get()) - - elif self._maxsize > 0 and self._maxsize <= self.qsize(): - waiter = futures.Future(loop=self._loop) - - self._putters.append(waiter) - yield from waiter - self._put(item) - - else: - self.__put_internal(item) + while self.full(): + putter = futures.Future(loop=self._loop) + self._putters.append(putter) + try: + yield from putter + except: + putter.cancel() # Just in case putter is not done yet. + if not self.full() and not putter.cancelled(): + # We were woken up by get_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._putters) + raise + return self.put_nowait(item) def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull. """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(self._get()) - - elif self._maxsize > 0 and self._maxsize <= self.qsize(): + if self.full(): raise QueueFull - else: - self.__put_internal(item) + self._put(item) + self._unfinished_tasks += 1 + self._finished.clear() + self._wakeup_next(self._getters) @coroutine def get(self): @@ -184,77 +161,30 @@ This method is a coroutine. """ - self._consume_done_putters() - if self._putters: - assert self.full(), 'queue not full, why are putters waiting?' - putter = self._putters.popleft() - - # When a getter runs and frees up a slot so this putter can - # run, we need to defer the put for a tick to ensure that - # getters and putters alternate perfectly. See - # ChannelTest.test_wait. - self._loop.call_soon(putter._set_result_unless_cancelled, None) - - return self._get() - - elif self.qsize(): - return self._get() - else: - waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) + while self.empty(): + getter = futures.Future(loop=self._loop) + self._getters.append(getter) try: - return (yield from waiter) - except futures.CancelledError: - # if we get CancelledError, it means someone cancelled this - # get() coroutine. But there is a chance that the waiter - # already is ready and contains an item that has just been - # removed from the queue. In this case, we need to put the item - # back into the front of the queue. This get() must either - # succeed without fault or, if it gets cancelled, it must be as - # if it never happened. - if waiter.done(): - self._put_it_back(waiter.result()) + yield from getter + except: + getter.cancel() # Just in case getter is not done yet. + if not self.empty() and not getter.cancelled(): + # We were woken up by put_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._getters) raise - - def _put_it_back(self, item): - """ - This is called when we have a waiter to get() an item and this waiter - gets cancelled. In this case, we put the item back: wake up another - waiter or put it in the _queue. - """ - self._consume_done_getters() - if self._getters: - assert not self._queue, ( - 'queue non-empty, why are getters waiting?') - - getter = self._getters.popleft() - self.__put_internal(item) - - # getter cannot be cancelled, we just removed done getters - getter.set_result(item) - else: - self._queue.appendleft(item) + return self.get_nowait() def get_nowait(self): """Remove and return an item from the queue. Return an item if one is immediately available, else raise QueueEmpty. """ - self._consume_done_putters() - if self._putters: - assert self.full(), 'queue not full, why are putters waiting?' - putter = self._putters.popleft() - # Wake putter on next tick. - - # getter cannot be cancelled, we just removed done putters - putter.set_result(None) - - return self._get() - - elif self.qsize(): - return self._get() - else: + if self.empty(): raise QueueEmpty + item = self._get() + self._wakeup_next(self._putters) + return item def task_done(self): """Indicate that a formerly enqueued task is complete. diff -Nru python3.5-3.5.0/Lib/asyncio/selector_events.py python3.5-3.5.1/Lib/asyncio/selector_events.py --- python3.5-3.5.0/Lib/asyncio/selector_events.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/selector_events.py 2015-12-07 01:39:07.000000000 +0000 @@ -556,6 +556,9 @@ def abort(self): self._force_close(None) + def is_closing(self): + return self._closing + def close(self): if self._closing: return @@ -633,7 +636,8 @@ self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called - self._loop.call_soon(waiter._set_result_unless_cancelled, None) + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) def pause_reading(self): if self._closing: @@ -843,6 +847,7 @@ self._extra.update(peercert=peercert, cipher=self._sock.cipher(), compression=self._sock.compression(), + ssl_object=self._sock, ) self._read_wants_write = False @@ -986,7 +991,8 @@ self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called - self._loop.call_soon(waiter._set_result_unless_cancelled, None) + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) def get_write_buffer_size(self): return sum(len(data) for data, _ in self._buffer) diff -Nru python3.5-3.5.0/Lib/asyncio/sslproto.py python3.5-3.5.1/Lib/asyncio/sslproto.py --- python3.5-3.5.0/Lib/asyncio/sslproto.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/sslproto.py 2015-12-07 01:39:07.000000000 +0000 @@ -295,6 +295,7 @@ def __init__(self, loop, ssl_protocol, app_protocol): self._loop = loop + # SSLProtocol instance self._ssl_protocol = ssl_protocol self._app_protocol = app_protocol self._closed = False @@ -303,6 +304,9 @@ """Get optional transport information.""" return self._ssl_protocol._get_extra_info(name, default) + def is_closing(self): + return self._closed + def close(self): """Close the transport. @@ -348,7 +352,7 @@ high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the - high-water limit is given, the low-water limit defaults to a + high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes pause_writing() to be called whenever the @@ -425,10 +429,12 @@ self._app_protocol = app_protocol self._app_transport = _SSLProtocolTransport(self._loop, self, self._app_protocol) + # _SSLPipe instance (None until the connection is made) self._sslpipe = None self._session_established = False self._in_handshake = False self._in_shutdown = False + # transport, ex: SelectorSocketTransport self._transport = None def _wakeup_waiter(self, exc=None): @@ -591,6 +597,7 @@ self._extra.update(peercert=peercert, cipher=sslobj.cipher(), compression=sslobj.compression(), + ssl_object=sslobj, ) self._app_protocol.connection_made(self._app_transport) self._wakeup_waiter() diff -Nru python3.5-3.5.0/Lib/asyncio/streams.py python3.5-3.5.1/Lib/asyncio/streams.py --- python3.5-3.5.0/Lib/asyncio/streams.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/streams.py 2015-12-07 01:39:07.000000000 +0000 @@ -255,7 +255,7 @@ def __init__(self, transport, protocol, reader, loop): self._transport = transport self._protocol = protocol - # drain() expects that the reader has a exception() method + # drain() expects that the reader has an exception() method assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop @@ -301,6 +301,15 @@ exc = self._reader.exception() if exc is not None: raise exc + if self._transport is not None: + if self._transport.is_closing(): + # Yield to the event loop so connection_lost() may be + # called. Without this, _drain_helper() would return + # immediately, and code that calls + # write(...); yield from drain() + # in a loop would never call connection_lost(), so it + # would not see an error when the socket is closed. + yield yield from self._protocol._drain_helper() @@ -324,7 +333,7 @@ def __repr__(self): info = ['StreamReader'] if self._buffer: - info.append('%d bytes' % len(info)) + info.append('%d bytes' % len(self._buffer)) if self._eof: info.append('eof') if self._limit != _DEFAULT_LIMIT: diff -Nru python3.5-3.5.0/Lib/asyncio/tasks.py python3.5-3.5.1/Lib/asyncio/tasks.py --- python3.5-3.5.0/Lib/asyncio/tasks.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/tasks.py 2015-12-07 01:39:07.000000000 +0000 @@ -3,7 +3,7 @@ __all__ = ['Task', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'async', - 'gather', 'shield', 'ensure_future', + 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', ] import concurrent.futures @@ -220,9 +220,9 @@ self._must_cancel = True return True - def _step(self, value=None, exc=None): + def _step(self, exc=None): assert not self.done(), \ - '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc) + '_step(): already done: {!r}, {!r}'.format(self, exc) if self._must_cancel: if not isinstance(exc, futures.CancelledError): exc = futures.CancelledError() @@ -231,12 +231,14 @@ self._fut_waiter = None self.__class__._current_tasks[self._loop] = self - # Call either coro.throw(exc) or coro.send(value). + # Call either coro.throw(exc) or coro.send(None). try: - if exc is not None: - result = coro.throw(exc) + if exc is None: + # We use the `send` method directly, because coroutines + # don't have `__iter__` and `__next__` methods. + result = coro.send(None) else: - result = coro.send(value) + result = coro.throw(exc) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: @@ -258,7 +260,7 @@ self._must_cancel = False else: self._loop.call_soon( - self._step, None, + self._step, RuntimeError( 'yield was used instead of yield from ' 'in task {!r} with {!r}'.format(self, result))) @@ -268,7 +270,7 @@ elif inspect.isgenerator(result): # Yielding a generator is just wrong. self._loop.call_soon( - self._step, None, + self._step, RuntimeError( 'yield was used instead of yield from for ' 'generator in task {!r} with {}'.format( @@ -276,7 +278,7 @@ else: # Yielding something else is an error. self._loop.call_soon( - self._step, None, + self._step, RuntimeError( 'Task got bad yield: {!r}'.format(result))) finally: @@ -285,12 +287,18 @@ def _wakeup(self, future): try: - value = future.result() + future.result() except Exception as exc: # This may also be a cancellation. - self._step(None, exc) + self._step(exc) else: - self._step(value, None) + # Don't pass the value of `future.result()` explicitly, + # as `Future.__iter__` and `Future.__await__` don't need it. + # If we call `_step(value, None)` instead of `_step()`, + # Python eval loop would use `.send(value)` method call, + # instead of `__next__()`, which is slower for futures + # that return non-generator iterators from their `__iter__`. + self._step() self = None # Needed to break cycles when an exception occurs. @@ -488,9 +496,14 @@ @coroutine def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" + if delay == 0: + yield + return result + future = futures.Future(loop=loop) h = future._loop.call_later(delay, - future._set_result_unless_cancelled, result) + futures._set_result_unless_cancelled, + future, result) try: return (yield from future) finally: @@ -512,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None): - """Wrap a coroutine in a future. + """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ @@ -527,8 +540,20 @@ if task._source_traceback: del task._source_traceback[-1] return task + elif compat.PY35 and inspect.isawaitable(coro_or_future): + return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else: - raise TypeError('A Future or coroutine is required') + raise TypeError('A Future, a coroutine or an awaitable is required') + + +@coroutine +def _wrap_awaitable(awaitable): + """Helper for asyncio.ensure_future(). + + Wraps awaitable (an object with __await__) into a coroutine + that will later be wrapped in a Task by ensure_future(). + """ + return (yield from awaitable.__await__()) class _GatheringFuture(futures.Future): @@ -680,3 +705,24 @@ inner.add_done_callback(_done_callback) return outer + + +def run_coroutine_threadsafe(coro, loop): + """Submit a coroutine object to a given event loop. + + Return a concurrent.futures.Future to access the result. + """ + if not coroutines.iscoroutine(coro): + raise TypeError('A coroutine object is required') + future = concurrent.futures.Future() + + def callback(): + try: + futures._chain_future(ensure_future(coro, loop=loop), future) + except Exception as exc: + if future.set_running_or_notify_cancel(): + future.set_exception(exc) + raise + + loop.call_soon_threadsafe(callback) + return future diff -Nru python3.5-3.5.0/Lib/asyncio/test_utils.py python3.5-3.5.1/Lib/asyncio/test_utils.py --- python3.5-3.5.0/Lib/asyncio/test_utils.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/test_utils.py 2015-12-07 01:39:07.000000000 +0000 @@ -24,6 +24,7 @@ ssl = None from . import base_events +from . import compat from . import events from . import futures from . import selectors @@ -71,12 +72,13 @@ def run_once(loop): - """loop.stop() schedules _raise_stop_error() - and run_forever() runs until _raise_stop_error() callback. - this wont work if test waits for some IO events, because - _raise_stop_error() runs before any of io events callbacks. + """Legacy API to run once through the event loop. + + This is the recommended pattern for test code. It will poll the + selector once and run all callbacks scheduled in response to I/O + events. """ - loop.stop() + loop.call_soon(loop.stop) loop.run_forever() @@ -420,6 +422,16 @@ # in an except block of a generator self.assertEqual(sys.exc_info(), (None, None, None)) + if not compat.PY34: + # Python 3.3 compatibility + def subTest(self, *args, **kwargs): + class EmptyCM: + def __enter__(self): + pass + def __exit__(self, *exc): + pass + return EmptyCM() + @contextlib.contextmanager def disable_logger(): diff -Nru python3.5-3.5.0/Lib/asyncio/transports.py python3.5-3.5.1/Lib/asyncio/transports.py --- python3.5-3.5.0/Lib/asyncio/transports.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/transports.py 2015-12-07 01:39:07.000000000 +0000 @@ -19,6 +19,10 @@ """Get optional transport information.""" return self._extra.get(name, default) + def is_closing(self): + """Return True if the transport is closing or closed.""" + raise NotImplementedError + def close(self): """Close the transport. @@ -62,7 +66,7 @@ high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the - high-water limit is given, the low-water limit defaults to a + high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes pause_writing() to be called whenever the diff -Nru python3.5-3.5.0/Lib/asyncio/unix_events.py python3.5-3.5.1/Lib/asyncio/unix_events.py --- python3.5-3.5.0/Lib/asyncio/unix_events.py 2015-09-13 12:41:20.000000000 +0100 +++ python3.5-3.5.1/Lib/asyncio/unix_events.py 2015-12-07 01:39:07.000000000 +0000 @@ -319,7 +319,8 @@ self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called - self._loop.call_soon(waiter._set_result_unless_cancelled, None) + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) def __repr__(self): info = [self.__class__.__name__] @@ -364,6 +365,9 @@ def resume_reading(self): self._loop.add_reader(self._fileno, self._read_ready) + def is_closing(self): + return self._closing + def close(self): if not self._closing: self._close(None) @@ -439,7 +443,8 @@ if waiter is not None: # only wake up the waiter when connection_made() has been called - self._loop.call_soon(waiter._set_result_unless_cancelled, None) + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) def __repr__(self): info = [self.__class__.__name__] @@ -548,6 +553,9 @@ self._loop.remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, None) + def is_closing(self): + return self._closing + def close(self): if self._pipe is not None and not self._closing: # write_eof is all what we needed to close the write pipe