Python Pull Request

Python 3.14 support & synchronize with python-qemu-qmp repo
 -----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCgAdFiEE+ber27ys35W+dsvQfe+BBqr8OQ4FAmjJjxIACgkQfe+BBqr8
 OQ48aA/+JRRIEN8LMbNDRvPTTkvCxstSAb2q8yA+8ccWg0H+EGcewjd+oCoPOqjC
 SwIMAGYJ6Dv2LW6c+rK6VjKw1Da8J9WgEpKmfoWu+1Pef8odU5PoRhAvvZdMq+Eh
 Kqk0r1f87fTiZK1gCBhBUIO0oTroOYxDvIYV0B6UFDPArL8jJ5eTpGLCVAYuk8tH
 MuzQD0IcxCBoraOx9vqVMbKIHwMH/m9pJ2IqINzIStpLoFgT1d5V9CoKXImMVXmF
 XovcMWQzFz1a/lm0ybSAzhgXcpW/vNjstb1IcrigYjQWXU6S+/bRpq17c2WqAJtG
 78Dal7heSjpvWyyCCii+QO+BegH53Mgz3W+aQN7+fkcepjivVYy8tnxOrSjJR+pX
 DqRhMNSc4CrLvJH4BOHKUsJaWMxjd4oJiNhUmhJ7MxZhPTHZvERsOo9kpoJo4eTw
 GhRV98FnJbotgs2kjQpSBF8FDj9LZqPwTfMuEU2NUsIB9o7/Iqj36RDe9L+2r9Ch
 2UKhnUg58y4eYFoC4CO8yCfjsR6HzLdqiVaDhcu5pdQM0Dw1pxrSIHb6faNmSLL5
 v0brhgJGujWt6wAc2c3ASMf8qpWkBrlVfHybodOB2cUDcRgNk85M/s41PnGShqBZ
 Qq7VW9zR4sejwof9dTwYKuwsNzxzFdS2nLwPPkud5aDngrLsNn0=
 =jZpa
 -----END PGP SIGNATURE-----

Merge tag 'python-pull-request' of https://gitlab.com/jsnow/qemu into staging

Python Pull Request

Python 3.14 support & synchronize with python-qemu-qmp repo

# -----BEGIN PGP SIGNATURE-----
#
# iQIzBAABCgAdFiEE+ber27ys35W+dsvQfe+BBqr8OQ4FAmjJjxIACgkQfe+BBqr8
# OQ48aA/+JRRIEN8LMbNDRvPTTkvCxstSAb2q8yA+8ccWg0H+EGcewjd+oCoPOqjC
# SwIMAGYJ6Dv2LW6c+rK6VjKw1Da8J9WgEpKmfoWu+1Pef8odU5PoRhAvvZdMq+Eh
# Kqk0r1f87fTiZK1gCBhBUIO0oTroOYxDvIYV0B6UFDPArL8jJ5eTpGLCVAYuk8tH
# MuzQD0IcxCBoraOx9vqVMbKIHwMH/m9pJ2IqINzIStpLoFgT1d5V9CoKXImMVXmF
# XovcMWQzFz1a/lm0ybSAzhgXcpW/vNjstb1IcrigYjQWXU6S+/bRpq17c2WqAJtG
# 78Dal7heSjpvWyyCCii+QO+BegH53Mgz3W+aQN7+fkcepjivVYy8tnxOrSjJR+pX
# DqRhMNSc4CrLvJH4BOHKUsJaWMxjd4oJiNhUmhJ7MxZhPTHZvERsOo9kpoJo4eTw
# GhRV98FnJbotgs2kjQpSBF8FDj9LZqPwTfMuEU2NUsIB9o7/Iqj36RDe9L+2r9Ch
# 2UKhnUg58y4eYFoC4CO8yCfjsR6HzLdqiVaDhcu5pdQM0Dw1pxrSIHb6faNmSLL5
# v0brhgJGujWt6wAc2c3ASMf8qpWkBrlVfHybodOB2cUDcRgNk85M/s41PnGShqBZ
# Qq7VW9zR4sejwof9dTwYKuwsNzxzFdS2nLwPPkud5aDngrLsNn0=
# =jZpa
# -----END PGP SIGNATURE-----
# gpg: Signature made Tue 16 Sep 2025 09:23:46 AM PDT
# gpg:                using RSA key F9B7ABDBBCACDF95BE76CBD07DEF8106AAFC390E
# gpg: Good signature from "John Snow (John Huston) <jsnow@redhat.com>" [unknown]
# gpg: WARNING: This key is not certified with a trusted signature!
# gpg:          There is no indication that the signature belongs to the owner.
# Primary key fingerprint: FAEB 9711 A12C F475 812F  18F2 88A9 064D 1835 61EB
#      Subkey fingerprint: F9B7 ABDB BCAC DF95 BE76  CBD0 7DEF 8106 AAFC 390E

* tag 'python-pull-request' of https://gitlab.com/jsnow/qemu:
  iotests/check: always enable all python warnings
  iotests/151: ensure subprocesses are cleaned up
  iotests/147: ensure temporary sockets are closed before exiting
  python: ensure QEMUQtestProtocol closes its socket
  iotests: drop compat for old version context manager
  python: synchronize qemu.qmp documentation
  python: backport 'avoid creating additional event loops per thread'
  python: backport 'Remove deprecated get_event_loop calls'
  python: backport 'qmp-tui: Do not crash if optional dependencies are not met'
  python: backport 'qmp-shell-wrap: handle missing binary gracefully'
  python: backport 'make require() preserve async-ness'
  python: backport 'feat: allow setting read buffer limit'
  python: backport 'qmp-shell: add common_parser()'
  python: backport 'Use @asynciocontextmanager'
  python: backport 'drop Python3.6 workarounds'
  python: backport 'protocol: adjust logging name when changing client name'
  python: backport 'kick event queue on legacy event_pull()'
  python: backport 'EventListener: add __repr__ method'
  python: backport 'Change error classes to have better repr methods'

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
This commit is contained in:
Richard Henderson 2025-09-16 10:10:29 -07:00
commit 41511ed734
18 changed files with 529 additions and 326 deletions

View file

@ -177,6 +177,8 @@ class QEMUQtestMachine(QEMUMachine):
self._qtest_sock_pair[0].close()
self._qtest_sock_pair[1].close()
self._qtest_sock_pair = None
if self._qtest is not None:
self._qtest.close()
super()._post_shutdown()
def qtest(self, cmd: str) -> str:

View file

@ -39,7 +39,8 @@ from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())
# The order of these fields impact the Sphinx documentation order.
# IMPORTANT: When modifying this list, update the Sphinx overview docs.
# Anything visible in the qemu.qmp namespace should be on the overview page.
__all__ = (
# Classes, most to least important
'QMPClient',

View file

@ -44,7 +44,10 @@ class ProtocolError(QMPError):
:param error_message: Human-readable string describing the error.
"""
def __init__(self, error_message: str):
super().__init__(error_message)
def __init__(self, error_message: str, *args: object):
super().__init__(error_message, *args)
#: Human-readable error message, without any prefix.
self.error_message: str = error_message
def __str__(self) -> str:
return self.error_message

View file

@ -12,7 +12,14 @@ EventListener Tutorial
----------------------
In all of the following examples, we assume that we have a `QMPClient`
instantiated named ``qmp`` that is already connected.
instantiated named ``qmp`` that is already connected. For example:
.. code:: python
from qemu.qmp import QMPClient
qmp = QMPClient('example-vm')
await qmp.connect('127.0.0.1', 1234)
`listener()` context blocks with one name
@ -87,7 +94,9 @@ This is analogous to the following code:
event = listener.get()
print(f"Event arrived: {event['event']}")
This event stream will never end, so these blocks will never terminate.
This event stream will never end, so these blocks will never
terminate. Even if the QMP connection errors out prematurely, this
listener will go silent without raising an error.
Using asyncio.Task to concurrently retrieve events
@ -227,16 +236,20 @@ Clearing listeners
.. code:: python
await qmp.execute('stop')
qmp.events.clear()
discarded = qmp.events.clear()
await qmp.execute('cont')
event = await qmp.events.get()
assert event['event'] == 'RESUME'
assert discarded[0]['event'] == 'STOP'
`EventListener` objects are FIFO queues. If events are not consumed,
they will remain in the queue until they are witnessed or discarded via
`clear()`. FIFO queues will be drained automatically upon leaving a
context block, or when calling `remove_listener()`.
Any events removed from the queue in this fashion will be returned by
the clear call.
Accessing listener history
~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -350,6 +363,12 @@ While `listener()` is only capable of creating a single listener,
break
Note that in the above example, we explicitly wait on jobA to conclude
first, and then wait for jobB to do the same. All we have guaranteed is
that the code that waits for jobA will not accidentally consume the
event intended for the jobB waiter.
Extending the `EventListener` class
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -407,13 +426,13 @@ Experimental Interfaces & Design Issues
These interfaces are not ones I am sure I will keep or otherwise modify
heavily.
qmp.listener()s type signature
qmp.listen()s type signature
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
`listener()` does not return anything, because it was assumed the caller
`listen()` does not return anything, because it was assumed the caller
already had a handle to the listener. However, for
``qmp.listener(EventListener())`` forms, the caller will not have saved
a handle to the listener.
``qmp.listen(EventListener())`` forms, the caller will not have saved a
handle to the listener.
Because this function can accept *many* listeners, I found it hard to
accurately type in a way where it could be used in both one or many
@ -497,6 +516,21 @@ class EventListener:
#: Optional, secondary event filter.
self.event_filter: Optional[EventFilter] = event_filter
def __repr__(self) -> str:
args: List[str] = []
if self.names:
args.append(f"names={self.names!r}")
if self.event_filter:
args.append(f"event_filter={self.event_filter!r}")
if self._queue.qsize():
state = f"<pending={self._queue.qsize()}>"
else:
state = ''
argstr = ", ".join(args)
return f"{type(self).__name__}{state}({argstr})"
@property
def history(self) -> Tuple[Message, ...]:
"""
@ -618,7 +652,7 @@ class Events:
def __init__(self) -> None:
self._listeners: List[EventListener] = []
#: Default, all-events `EventListener`.
#: Default, all-events `EventListener`. See `qmp.events` for more info.
self.events: EventListener = EventListener()
self.register_listener(self.events)

View file

@ -38,6 +38,7 @@ from typing import (
from .error import QMPError
from .protocol import Runstate, SocketAddrT
from .qmp_client import QMPClient
from .util import get_or_create_event_loop
#: QMPMessage is an entire QMP message of any kind.
@ -86,10 +87,13 @@ class QEMUMonitorProtocol:
"server argument should be False when passing a socket")
self._qmp = QMPClient(nickname)
self._aloop = asyncio.get_event_loop()
self._address = address
self._timeout: Optional[float] = None
# This is a sync shim intended for use in fully synchronous
# programs. Create and set an event loop if necessary.
self._aloop = get_or_create_event_loop()
if server:
assert not isinstance(self._address, socket.socket)
self._sync(self._qmp.start_server(self._address))
@ -231,6 +235,9 @@ class QEMUMonitorProtocol:
:return: The first available QMP event, or None.
"""
# Kick the event loop to allow events to accumulate
self._sync(asyncio.sleep(0))
if not wait:
# wait is False/0: "do not wait, do not except."
if self._qmp.events.empty():
@ -286,8 +293,8 @@ class QEMUMonitorProtocol:
"""
Set the timeout for QMP RPC execution.
This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
The `accept`, `pull_event` and `get_event` methods have their
This timeout affects the `cmd`, `cmd_obj`, and `cmd_raw` methods.
The `accept`, `pull_event` and `get_events` methods have their
own configurable timeouts.
:param timeout:
@ -303,17 +310,30 @@ class QEMUMonitorProtocol:
self._qmp.send_fd_scm(fd)
def __del__(self) -> None:
if self._qmp.runstate == Runstate.IDLE:
return
if self._qmp.runstate != Runstate.IDLE:
self._qmp.logger.warning(
"QEMUMonitorProtocol object garbage collected without a prior "
"call to close()"
)
if not self._aloop.is_running():
self.close()
else:
# Garbage collection ran while the event loop was running.
# Nothing we can do about it now, but if we don't raise our
# own error, the user will be treated to a lot of traceback
# they might not understand.
if self._qmp.runstate != Runstate.IDLE:
# If the user neglected to close the QMP session and we
# are not currently running in an asyncio context, we
# have the opportunity to close the QMP session. If we
# do not do this, the error messages presented over
# dangling async resources may not make any sense to the
# user.
self.close()
if self._qmp.runstate != Runstate.IDLE:
# If QMP is still not quiesced, it means that the garbage
# collector ran from a context within the event loop and we
# are simply too late to take any corrective action. Raise
# our own error to give meaningful feedback to the user in
# order to prevent pages of asyncio stacktrace jargon.
raise QMPError(
"QEMUMonitorProtocol.close()"
" was not called before object was garbage collected"
"QEMUMonitorProtocol.close() was not called before object was "
"garbage collected, and could not be closed due to GC running "
"in the event loop"
)

View file

@ -28,7 +28,8 @@ class Message(MutableMapping[str, object]):
be instantiated from either another mapping (like a `dict`), or from
raw `bytes` that still need to be deserialized.
Once instantiated, it may be treated like any other MutableMapping::
Once instantiated, it may be treated like any other
:py:obj:`~collections.abc.MutableMapping`::
>>> msg = Message(b'{"hello": "world"}')
>>> assert msg['hello'] == 'world'
@ -50,12 +51,19 @@ class Message(MutableMapping[str, object]):
>>> dict(msg)
{'hello': 'world'}
Or pretty-printed::
>>> print(str(msg))
{
"hello": "world"
}
:param value: Initial value, if any.
:param eager:
When `True`, attempt to serialize or deserialize the initial value
immediately, so that conversion exceptions are raised during
the call to ``__init__()``.
"""
# pylint: disable=too-many-ancestors
@ -178,15 +186,15 @@ class DeserializationError(ProtocolError):
:param raw: The raw `bytes` that prompted the failure.
"""
def __init__(self, error_message: str, raw: bytes):
super().__init__(error_message)
super().__init__(error_message, raw)
#: The raw `bytes` that were not understood as JSON.
self.raw: bytes = raw
def __str__(self) -> str:
return "\n".join([
return "\n".join((
super().__str__(),
f" raw bytes were: {str(self.raw)}",
])
))
class UnexpectedTypeError(ProtocolError):
@ -197,13 +205,13 @@ class UnexpectedTypeError(ProtocolError):
:param value: The deserialized JSON value that wasn't an object.
"""
def __init__(self, error_message: str, value: object):
super().__init__(error_message)
super().__init__(error_message, value)
#: The JSON value that was expected to be an object.
self.value: object = value
def __str__(self) -> str:
strval = json.dumps(self.value, indent=2)
return "\n".join([
return "\n".join((
super().__str__(),
f" json value was: {strval}",
])
))

View file

@ -54,7 +54,7 @@ class Model:
class Greeting(Model):
"""
Defined in qmp-spec.rst, section "Server Greeting".
Defined in `interop/qmp-spec`, "Server Greeting" section.
:param raw: The raw Greeting object.
:raise KeyError: If any required fields are absent.
@ -82,7 +82,7 @@ class Greeting(Model):
class QMPGreeting(Model):
"""
Defined in qmp-spec.rst, section "Server Greeting".
Defined in `interop/qmp-spec`, "Server Greeting" section.
:param raw: The raw QMPGreeting object.
:raise KeyError: If any required fields are absent.
@ -104,7 +104,7 @@ class QMPGreeting(Model):
class ErrorResponse(Model):
"""
Defined in qmp-spec.rst, section "Error".
Defined in `interop/qmp-spec`, "Error" section.
:param raw: The raw ErrorResponse object.
:raise KeyError: If any required fields are absent.
@ -126,7 +126,7 @@ class ErrorResponse(Model):
class ErrorInfo(Model):
"""
Defined in qmp-spec.rst, section "Error".
Defined in `interop/qmp-spec`, "Error" section.
:param raw: The raw ErrorInfo object.
:raise KeyError: If any required fields are absent.

View file

@ -15,13 +15,16 @@ class.
import asyncio
from asyncio import StreamReader, StreamWriter
from contextlib import asynccontextmanager
from enum import Enum
from functools import wraps
from inspect import iscoroutinefunction
import logging
import socket
from ssl import SSLContext
from typing import (
Any,
AsyncGenerator,
Awaitable,
Callable,
Generic,
@ -36,13 +39,10 @@ from typing import (
from .error import QMPError
from .util import (
bottom_half,
create_task,
exception_summary,
flush,
is_closing,
pretty_traceback,
upper_half,
wait_closed,
)
@ -54,6 +54,9 @@ InternetAddrT = Tuple[str, int]
UnixAddrT = str
SocketAddrT = Union[UnixAddrT, InternetAddrT]
# Maximum allowable size of read buffer, default
_DEFAULT_READBUFLEN = 64 * 1024
class Runstate(Enum):
"""Protocol session runstate."""
@ -76,11 +79,17 @@ class ConnectError(QMPError):
This Exception always wraps a "root cause" exception that can be
interrogated for additional information.
For example, when connecting to a non-existent socket::
await qmp.connect('not_found.sock')
# ConnectError: Failed to establish connection:
# [Errno 2] No such file or directory
:param error_message: Human-readable string describing the error.
:param exc: The root-cause exception.
"""
def __init__(self, error_message: str, exc: Exception):
super().__init__(error_message)
super().__init__(error_message, exc)
#: Human-readable error string
self.error_message: str = error_message
#: Wrapped root cause exception
@ -99,8 +108,8 @@ class StateError(QMPError):
An API command (connect, execute, etc) was issued at an inappropriate time.
This error is raised when a command like
:py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
time.
:py:meth:`~AsyncProtocol.connect()` is called when the client is
already connected.
:param error_message: Human-readable string describing the state violation.
:param state: The actual `Runstate` seen at the time of the violation.
@ -108,11 +117,14 @@ class StateError(QMPError):
"""
def __init__(self, error_message: str,
state: Runstate, required: Runstate):
super().__init__(error_message)
super().__init__(error_message, state, required)
self.error_message = error_message
self.state = state
self.required = required
def __str__(self) -> str:
return self.error_message
F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name
@ -125,6 +137,25 @@ def require(required_state: Runstate) -> Callable[[F], F]:
:param required_state: The `Runstate` required to invoke this method.
:raise StateError: When the required `Runstate` is not met.
"""
def _check(proto: 'AsyncProtocol[Any]') -> None:
name = type(proto).__name__
if proto.runstate == required_state:
return
if proto.runstate == Runstate.CONNECTING:
emsg = f"{name} is currently connecting."
elif proto.runstate == Runstate.DISCONNECTING:
emsg = (f"{name} is disconnecting."
" Call disconnect() to return to IDLE state.")
elif proto.runstate == Runstate.RUNNING:
emsg = f"{name} is already connected and running."
elif proto.runstate == Runstate.IDLE:
emsg = f"{name} is disconnected and idle."
else:
assert False
raise StateError(emsg, proto.runstate, required_state)
def _decorator(func: F) -> F:
# _decorator is the decorator that is built by calling the
# require() decorator factory; e.g.:
@ -135,29 +166,20 @@ def require(required_state: Runstate) -> Callable[[F], F]:
@wraps(func)
def _wrapper(proto: 'AsyncProtocol[Any]',
*args: Any, **kwargs: Any) -> Any:
# _wrapper is the function that gets executed prior to the
# decorated method.
name = type(proto).__name__
if proto.runstate != required_state:
if proto.runstate == Runstate.CONNECTING:
emsg = f"{name} is currently connecting."
elif proto.runstate == Runstate.DISCONNECTING:
emsg = (f"{name} is disconnecting."
" Call disconnect() to return to IDLE state.")
elif proto.runstate == Runstate.RUNNING:
emsg = f"{name} is already connected and running."
elif proto.runstate == Runstate.IDLE:
emsg = f"{name} is disconnected and idle."
else:
assert False
raise StateError(emsg, proto.runstate, required_state)
# No StateError, so call the wrapped method.
_check(proto)
return func(proto, *args, **kwargs)
# Return the decorated method;
# Transforming Func to Decorated[Func].
@wraps(func)
async def _async_wrapper(proto: 'AsyncProtocol[Any]',
*args: Any, **kwargs: Any) -> Any:
_check(proto)
return await func(proto, *args, **kwargs)
# Return the decorated method; F => Decorated[F]
# Use an async version when applicable, which
# preserves async signature generation in sphinx.
if iscoroutinefunction(func):
return cast(F, _async_wrapper)
return cast(F, _wrapper)
# Return the decorator instance from the decorator factory. Phew!
@ -200,24 +222,26 @@ class AsyncProtocol(Generic[T]):
will log to 'qemu.qmp.protocol', but each individual connection
can be given its own logger by giving it a name; messages will
then log to 'qemu.qmp.protocol.${name}'.
:param readbuflen:
The maximum read buffer length of the underlying StreamReader
instance.
"""
# pylint: disable=too-many-instance-attributes
#: Logger object for debugging messages from this connection.
logger = logging.getLogger(__name__)
# Maximum allowable size of read buffer
_limit = 64 * 1024
# -------------------------
# Section: Public interface
# -------------------------
def __init__(self, name: Optional[str] = None) -> None:
#: The nickname for this connection, if any.
self.name: Optional[str] = name
if self.name is not None:
self.logger = self.logger.getChild(self.name)
def __init__(
self, name: Optional[str] = None,
readbuflen: int = _DEFAULT_READBUFLEN
) -> None:
self._name: Optional[str]
self.name = name
self.readbuflen = readbuflen
# stream I/O
self._reader: Optional[StreamReader] = None
@ -254,6 +278,24 @@ class AsyncProtocol(Generic[T]):
tokens.append(f"runstate={self.runstate.name}")
return f"<{cls_name} {' '.join(tokens)}>"
@property
def name(self) -> Optional[str]:
"""
The nickname for this connection, if any.
This name is used for differentiating instances in debug output.
"""
return self._name
@name.setter
def name(self, name: Optional[str]) -> None:
logger = logging.getLogger(__name__)
if name:
self.logger = logger.getChild(name)
else:
self.logger = logger
self._name = name
@property # @upper_half
def runstate(self) -> Runstate:
"""The current `Runstate` of the connection."""
@ -262,7 +304,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
async def runstate_changed(self) -> Runstate:
"""
Wait for the `runstate` to change, then return that runstate.
Wait for the `runstate` to change, then return that `Runstate`.
"""
await self._runstate_event.wait()
return self.runstate
@ -276,9 +318,9 @@ class AsyncProtocol(Generic[T]):
"""
Accept a connection and begin processing message queues.
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
This method is precisely equivalent to calling `start_server()`
followed by `accept()`.
If this call fails, `runstate` is guaranteed to be set back to
`IDLE`. This method is precisely equivalent to calling
`start_server()` followed by :py:meth:`~AsyncProtocol.accept()`.
:param address:
Address to listen on; UNIX socket path or TCP address/port.
@ -291,7 +333,8 @@ class AsyncProtocol(Generic[T]):
This exception will wrap a more concrete one. In most cases,
the wrapped exception will be `OSError` or `EOFError`. If a
protocol-level failure occurs while establishing a new
session, the wrapped error may also be an `QMPError`.
session, the wrapped error may also be a `QMPError`.
"""
await self.start_server(address, ssl)
await self.accept()
@ -307,8 +350,8 @@ class AsyncProtocol(Generic[T]):
This method starts listening for an incoming connection, but
does not block waiting for a peer. This call will return
immediately after binding and listening on a socket. A later
call to `accept()` must be made in order to finalize the
incoming connection.
call to :py:meth:`~AsyncProtocol.accept()` must be made in order
to finalize the incoming connection.
:param address:
Address to listen on; UNIX socket path or TCP address/port.
@ -321,9 +364,8 @@ class AsyncProtocol(Generic[T]):
This exception will wrap a more concrete one. In most cases,
the wrapped exception will be `OSError`.
"""
await self._session_guard(
self._do_start_server(address, ssl),
'Failed to establish connection')
async with self._session_guard('Failed to establish connection'):
await self._do_start_server(address, ssl)
assert self.runstate == Runstate.CONNECTING
@upper_half
@ -332,10 +374,12 @@ class AsyncProtocol(Generic[T]):
"""
Accept an incoming connection and begin processing message queues.
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
Used after a previous call to `start_server()` to accept an
incoming connection. If this call fails, `runstate` is
guaranteed to be set back to `IDLE`.
:raise StateError: When the `Runstate` is not `CONNECTING`.
:raise QMPError: When `start_server()` was not called yet.
:raise QMPError: When `start_server()` was not called first.
:raise ConnectError:
When a connection or session cannot be established.
@ -346,12 +390,10 @@ class AsyncProtocol(Generic[T]):
"""
if self._accepted is None:
raise QMPError("Cannot call accept() before start_server().")
await self._session_guard(
self._do_accept(),
'Failed to establish connection')
await self._session_guard(
self._establish_session(),
'Failed to establish session')
async with self._session_guard('Failed to establish connection'):
await self._do_accept()
async with self._session_guard('Failed to establish session'):
await self._establish_session()
assert self.runstate == Runstate.RUNNING
@upper_half
@ -376,12 +418,10 @@ class AsyncProtocol(Generic[T]):
protocol-level failure occurs while establishing a new
session, the wrapped error may also be an `QMPError`.
"""
await self._session_guard(
self._do_connect(address, ssl),
'Failed to establish connection')
await self._session_guard(
self._establish_session(),
'Failed to establish session')
async with self._session_guard('Failed to establish connection'):
await self._do_connect(address, ssl)
async with self._session_guard('Failed to establish session'):
await self._establish_session()
assert self.runstate == Runstate.RUNNING
@upper_half
@ -392,7 +432,11 @@ class AsyncProtocol(Generic[T]):
If there was an exception that caused the reader/writers to
terminate prematurely, it will be raised here.
:raise Exception: When the reader or writer terminate unexpectedly.
:raise Exception:
When the reader or writer terminate unexpectedly. You can
expect to see `EOFError` if the server hangs up, or
`OSError` for connection-related issues. If there was a QMP
protocol-level problem, `ProtocolError` will be seen.
"""
self.logger.debug("disconnect() called.")
self._schedule_disconnect()
@ -402,7 +446,8 @@ class AsyncProtocol(Generic[T]):
# Section: Session machinery
# --------------------------
async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None:
@asynccontextmanager
async def _session_guard(self, emsg: str) -> AsyncGenerator[None, None]:
"""
Async guard function used to roll back to `IDLE` on any error.
@ -419,10 +464,9 @@ class AsyncProtocol(Generic[T]):
:raise ConnectError:
When any other error is encountered in the guarded block.
"""
# Note: After Python 3.6 support is removed, this should be an
# @asynccontextmanager instead of accepting a callback.
try:
await coro
# Caller's code runs here.
yield
except BaseException as err:
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
@ -561,7 +605,7 @@ class AsyncProtocol(Generic[T]):
port=address[1],
ssl=ssl,
backlog=1,
limit=self._limit,
limit=self.readbuflen,
)
else:
coro = asyncio.start_unix_server(
@ -569,7 +613,7 @@ class AsyncProtocol(Generic[T]):
path=address,
ssl=ssl,
backlog=1,
limit=self._limit,
limit=self.readbuflen,
)
# Allow runstate watchers to witness 'CONNECTING' state; some
@ -624,7 +668,7 @@ class AsyncProtocol(Generic[T]):
"fd=%d, family=%r, type=%r",
address.fileno(), address.family, address.type)
connect = asyncio.open_connection(
limit=self._limit,
limit=self.readbuflen,
ssl=ssl,
sock=address,
)
@ -634,14 +678,14 @@ class AsyncProtocol(Generic[T]):
address[0],
address[1],
ssl=ssl,
limit=self._limit,
limit=self.readbuflen,
)
else:
self.logger.debug("Connecting to file://%s ...", address)
connect = asyncio.open_unix_connection(
path=address,
ssl=ssl,
limit=self._limit,
limit=self.readbuflen,
)
self._reader, self._writer = await connect
@ -663,8 +707,8 @@ class AsyncProtocol(Generic[T]):
reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
self._reader_task = create_task(reader_coro)
self._writer_task = create_task(writer_coro)
self._reader_task = asyncio.create_task(reader_coro)
self._writer_task = asyncio.create_task(writer_coro)
self._bh_tasks = asyncio.gather(
self._reader_task,
@ -689,7 +733,7 @@ class AsyncProtocol(Generic[T]):
if not self._dc_task:
self._set_state(Runstate.DISCONNECTING)
self.logger.debug("Scheduling disconnect.")
self._dc_task = create_task(self._bh_disconnect())
self._dc_task = asyncio.create_task(self._bh_disconnect())
@upper_half
async def _wait_disconnect(self) -> None:
@ -825,13 +869,13 @@ class AsyncProtocol(Generic[T]):
if not self._writer:
return
if not is_closing(self._writer):
if not self._writer.is_closing():
self.logger.debug("Closing StreamWriter.")
self._writer.close()
self.logger.debug("Waiting for StreamWriter to close ...")
try:
await wait_closed(self._writer)
await self._writer.wait_closed()
except Exception: # pylint: disable=broad-except
# It's hard to tell if the Stream is already closed or
# not. Even if one of the tasks has failed, it may have

View file

@ -41,7 +41,7 @@ class _WrappedProtocolError(ProtocolError):
:param exc: The root-cause exception.
"""
def __init__(self, error_message: str, exc: Exception):
super().__init__(error_message)
super().__init__(error_message, exc)
self.exc = exc
def __str__(self) -> str:
@ -70,21 +70,38 @@ class ExecuteError(QMPError):
"""
Exception raised by `QMPClient.execute()` on RPC failure.
This exception is raised when the server received, interpreted, and
replied to a command successfully; but the command itself returned a
failure status.
For example::
await qmp.execute('block-dirty-bitmap-add',
{'node': 'foo', 'name': 'my_bitmap'})
# qemu.qmp.qmp_client.ExecuteError:
# Cannot find device='foo' nor node-name='foo'
:param error_response: The RPC error response object.
:param sent: The sent RPC message that caused the failure.
:param received: The raw RPC error reply received.
"""
def __init__(self, error_response: ErrorResponse,
sent: Message, received: Message):
super().__init__(error_response.error.desc)
super().__init__(error_response, sent, received)
#: The sent `Message` that caused the failure
self.sent: Message = sent
#: The received `Message` that indicated failure
self.received: Message = received
#: The parsed error response
self.error: ErrorResponse = error_response
#: The QMP error class
self.error_class: str = error_response.error.class_
@property
def error_class(self) -> str:
"""The QMP error class"""
return self.error.error.class_
def __str__(self) -> str:
return self.error.error.desc
class ExecInterruptedError(QMPError):
@ -93,9 +110,22 @@ class ExecInterruptedError(QMPError):
This error is raised when an `execute()` statement could not be
completed. This can occur because the connection itself was
terminated before a reply was received.
terminated before a reply was received. The true cause of the
interruption will be available via `disconnect()`.
The true cause of the interruption will be available via `disconnect()`.
The QMP protocol does not make it possible to know if a command
succeeded or failed after such an event; the client will need to
query the server to determine the state of the server on a
case-by-case basis.
For example, ECONNRESET might look like this::
try:
await qmp.execute('query-block')
# ExecInterruptedError: Disconnected
except ExecInterruptedError:
await qmp.disconnect()
# ConnectionResetError: [Errno 104] Connection reset by peer
"""
@ -110,8 +140,8 @@ class _MsgProtocolError(ProtocolError):
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
def __init__(self, error_message: str, msg: Message):
super().__init__(error_message)
def __init__(self, error_message: str, msg: Message, *args: object):
super().__init__(error_message, msg, *args)
#: The received `Message` that caused the error.
self.msg: Message = msg
@ -150,30 +180,44 @@ class BadReplyError(_MsgProtocolError):
:param sent: The message that was sent that prompted the error.
"""
def __init__(self, error_message: str, msg: Message, sent: Message):
super().__init__(error_message, msg)
super().__init__(error_message, msg, sent)
#: The sent `Message` that caused the failure
self.sent = sent
class QMPClient(AsyncProtocol[Message], Events):
"""
Implements a QMP client connection.
"""Implements a QMP client connection.
QMP can be used to establish a connection as either the transport
client or server, though this class always acts as the QMP client.
`QMPClient` can be used to either connect or listen to a QMP server,
but always acts as the QMP client.
:param name: Optional nickname for the connection, used for logging.
:param name:
Optional nickname for the connection, used to differentiate
instances when logging.
:param readbuflen:
The maximum buffer length for reads and writes to and from the QMP
server, in bytes. Default is 10MB. If `QMPClient` is used to
connect to a guest agent to transfer files via ``guest-file-read``/
``guest-file-write``, increasing this value may be required.
Basic script-style usage looks like this::
qmp = QMPClient('my_virtual_machine_name')
await qmp.connect(('127.0.0.1', 1234))
...
res = await qmp.execute('block-query')
...
await qmp.disconnect()
import asyncio
from qemu.qmp import QMPClient
Basic async client-style usage looks like this::
async def main():
qmp = QMPClient('my_virtual_machine_name')
await qmp.connect(('127.0.0.1', 1234))
...
res = await qmp.execute('query-block')
...
await qmp.disconnect()
asyncio.run(main())
A more advanced example that starts to take advantage of asyncio
might look like this::
class Client:
def __init__(self, name: str):
@ -193,25 +237,32 @@ class QMPClient(AsyncProtocol[Message], Events):
await self.disconnect()
See `qmp.events` for more detail on event handling patterns.
"""
#: Logger object used for debugging messages.
logger = logging.getLogger(__name__)
# Read buffer limit; 10MB like libvirt default
_limit = 10 * 1024 * 1024
# Read buffer default limit; 10MB like libvirt default
_readbuflen = 10 * 1024 * 1024
# Type alias for pending execute() result items
_PendingT = Union[Message, ExecInterruptedError]
def __init__(self, name: Optional[str] = None) -> None:
super().__init__(name)
def __init__(
self,
name: Optional[str] = None,
readbuflen: int = _readbuflen
) -> None:
super().__init__(name, readbuflen)
Events.__init__(self)
#: Whether or not to await a greeting after establishing a connection.
#: Defaults to True; QGA servers expect this to be False.
self.await_greeting: bool = True
#: Whether or not to perform capabilities negotiation upon connection.
#: Implies `await_greeting`.
#: Whether or not to perform capabilities negotiation upon
#: connection. Implies `await_greeting`. Defaults to True; QGA
#: servers expect this to be False.
self.negotiate: bool = True
# Cached Greeting, if one was awaited.
@ -228,7 +279,13 @@ class QMPClient(AsyncProtocol[Message], Events):
@property
def greeting(self) -> Optional[Greeting]:
"""The `Greeting` from the QMP server, if any."""
"""
The `Greeting` from the QMP server, if any.
Defaults to ``None``, and will be set after a greeting is
received during the connection process. It is reset at the start
of each connection attempt.
"""
return self._greeting
@upper_half
@ -369,7 +426,7 @@ class QMPClient(AsyncProtocol[Message], Events):
# This is very likely a server parsing error.
# It doesn't inherently belong to any pending execution.
# Instead of performing clever recovery, just terminate.
# See "NOTE" in qmp-spec.rst, section "Error".
# See "NOTE" in interop/qmp-spec, "Error" section.
raise ServerParseError(
("Server sent an error response without an ID, "
"but there are no ID-less executions pending. "
@ -377,7 +434,7 @@ class QMPClient(AsyncProtocol[Message], Events):
msg
)
# qmp-spec.rst, section "Commands Responses":
# qmp-spec.rst, "Commands Responses" section:
# 'Clients should drop all the responses
# that have an unknown "id" field.'
self.logger.log(
@ -550,7 +607,7 @@ class QMPClient(AsyncProtocol[Message], Events):
@require(Runstate.RUNNING)
async def execute_msg(self, msg: Message) -> object:
"""
Execute a QMP command and return its value.
Execute a QMP command on the server and return its value.
:param msg: The QMP `Message` to execute.
@ -562,7 +619,9 @@ class QMPClient(AsyncProtocol[Message], Events):
If the QMP `Message` does not have either the 'execute' or
'exec-oob' fields set.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
:raise ExecInterruptedError:
If the connection was disrupted before
receiving a reply from the server.
"""
if not ('execute' in msg or 'exec-oob' in msg):
raise ValueError("Requires 'execute' or 'exec-oob' message")
@ -601,9 +660,11 @@ class QMPClient(AsyncProtocol[Message], Events):
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
:param oob: If `True`, execute "out of band".
:param oob:
If `True`, execute "out of band". See `interop/qmp-spec`
section "Out-of-band execution".
:return: An executable QMP `Message`.
:return: A QMP `Message` that can be executed with `execute_msg()`.
"""
msg = Message({'exec-oob' if oob else 'execute': cmd})
if arguments is not None:
@ -615,18 +676,22 @@ class QMPClient(AsyncProtocol[Message], Events):
arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> object:
"""
Execute a QMP command and return its value.
Execute a QMP command on the server and return its value.
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
:param oob: If `True`, execute "out of band".
:param oob:
If `True`, execute "out of band". See `interop/qmp-spec`
section "Out-of-band execution".
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
:raise ExecInterruptedError:
If the connection was disrupted before
receiving a reply from the server.
"""
msg = self.make_execute_msg(cmd, arguments, oob=oob)
return await self.execute_msg(msg)
@ -634,8 +699,20 @@ class QMPClient(AsyncProtocol[Message], Events):
@upper_half
@require(Runstate.RUNNING)
def send_fd_scm(self, fd: int) -> None:
"""
Send a file descriptor to the remote via SCM_RIGHTS.
"""Send a file descriptor to the remote via SCM_RIGHTS.
This method does not close the file descriptor.
:param fd: The file descriptor to send to QEMU.
This is an advanced feature of QEMU where file descriptors can
be passed from client to server. This is usually used as a
security measure to isolate the QEMU process from being able to
open its own files. See the QMP commands ``getfd`` and
``add-fd`` for more information.
See `socket.socket.sendmsg` for more information on the Python
implementation for sending file descriptors over a UNIX socket.
"""
assert self._writer is not None
sock = self._writer.transport.get_extra_info('socket')

View file

@ -10,9 +10,15 @@
#
"""
Low-level QEMU shell on top of QMP.
qmp-shell - An interactive QEMU shell powered by QMP
usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
qmp-shell offers a simple shell with a convenient shorthand syntax as an
alternative to typing JSON by hand. This syntax is not standardized and
is not meant to be used as a scriptable interface. This shorthand *may*
change incompatibly in the future, and it is strongly encouraged to use
the QMP library to provide API-stable scripting when needed.
usage: qmp-shell [-h] [-H] [-v] [-p] [-l LOGFILE] [-N] qmp_server
positional arguments:
qmp_server < UNIX socket path | TCP address:port >
@ -20,41 +26,52 @@ positional arguments:
optional arguments:
-h, --help show this help message and exit
-H, --hmp Use HMP interface
-N, --skip-negotiation
Skip negotiate (for qemu-ga)
-v, --verbose Verbose (echo commands sent and received)
-p, --pretty Pretty-print JSON
-l LOGFILE, --logfile LOGFILE
Save log of all QMP messages to PATH
-N, --skip-negotiation
Skip negotiate (for qemu-ga)
Usage
-----
Start QEMU with:
First, start QEMU with::
# qemu [...] -qmp unix:./qmp-sock,server
> qemu [...] -qmp unix:./qmp-sock,server=on[,wait=off]
Run the shell:
Then run the shell, passing the address of the socket::
$ qmp-shell ./qmp-sock
> qmp-shell ./qmp-sock
Commands have the following format:
Syntax
------
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
Commands have the following format::
For example:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
(QEMU) device_add driver=e1000 id=net1
{'return': {}}
(QEMU)
For example, to add a network device::
key=value pairs also support Python or JSON object literal subset notations,
without spaces. Dictionaries/objects {} are supported as are arrays [].
(QEMU) device_add driver=e1000 id=net1
{'return': {}}
(QEMU)
example-command arg-name1={'key':'value','obj'={'prop':"value"}}
key=value pairs support either Python or JSON object literal notations,
**without spaces**. Dictionaries/objects ``{}`` are supported, as are
arrays ``[]``::
Both JSON and Python formatting should work, including both styles of
string literal quotes. Both paradigms of literal values should work,
including null/true/false for JSON and None/True/False for Python.
example-command arg-name1={'key':'value','obj'={'prop':"value"}}
Either JSON or Python formatting for compound values works, including
both styles of string literal quotes (either single or double
quotes). Both paradigms of literal values are accepted, including
``null/true/false`` for JSON and ``None/True/False`` for Python.
Transactions have the following multi-line format:
Transactions
------------
Transactions have the following multi-line format::
transaction(
action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
@ -62,11 +79,11 @@ Transactions have the following multi-line format:
action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
)
One line transactions are also supported:
One line transactions are also supported::
transaction( action-name1 ... )
For example:
For example::
(QEMU) transaction(
TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
@ -75,9 +92,35 @@ For example:
{"return": {}}
(QEMU)
Use the -v and -p options to activate the verbose and pretty-print options,
which will echo back the properly formatted JSON-compliant QMP that is being
sent to QEMU, which is useful for debugging and documentation generation.
Commands
--------
Autocomplete of command names using <tab> is supported. Pressing <tab>
at a blank CLI prompt will show you a list of all available commands
that the connected QEMU instance supports.
For documentation on QMP commands and their arguments, please see
`qmp ref`.
Events
------
qmp-shell will display events received from the server, but this version
does not do so asynchronously. To check for new events from the server,
press <enter> on a blank line::
(QEMU)
{'timestamp': {'seconds': 1660071944, 'microseconds': 184667},
'event': 'STOP'}
Display options
---------------
Use the -v and -p options to activate the verbose and pretty-print
options, which will echo back the properly formatted JSON-compliant QMP
that is being sent to QEMU. This is useful for debugging to see the
wire-level QMP data being exchanged, and generating output for use in
writing documentation for QEMU.
"""
import argparse
@ -514,21 +557,29 @@ def die(msg: str) -> NoReturn:
sys.exit(1)
def main() -> None:
"""
qmp-shell entry point: parse command line arguments and start the REPL.
"""
def common_parser() -> argparse.ArgumentParser:
"""Build common parsing options used by qmp-shell and qmp-shell-wrap."""
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hmp', action='store_true',
help='Use HMP interface')
parser.add_argument('-N', '--skip-negotiation', action='store_true',
help='Skip negotiate (for qemu-ga)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose (echo commands sent and received)')
parser.add_argument('-p', '--pretty', action='store_true',
help='Pretty-print JSON')
parser.add_argument('-l', '--logfile',
help='Save log of all QMP messages to PATH')
# NOTE: When changing arguments, update both this module docstring
# and the manpage synopsis in docs/man/qmp_shell.rst.
return parser
def main() -> None:
"""
qmp-shell entry point: parse command line arguments and start the REPL.
"""
parser = common_parser()
parser.add_argument('-N', '--skip-negotiation', action='store_true',
help='Skip negotiate (for qemu-ga)')
default_server = os.environ.get('QMP_SOCKET')
parser.add_argument('qmp_server', action='store',
@ -561,19 +612,37 @@ def main() -> None:
def main_wrap() -> None:
"""
qmp-shell-wrap entry point: parse command line arguments and
start the REPL.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hmp', action='store_true',
help='Use HMP interface')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose (echo commands sent and received)')
parser.add_argument('-p', '--pretty', action='store_true',
help='Pretty-print JSON')
parser.add_argument('-l', '--logfile',
help='Save log of all QMP messages to PATH')
qmp-shell-wrap - QEMU + qmp-shell launcher utility
Launch QEMU and connect to it with `qmp-shell` in a single command.
CLI arguments will be forwarded to qemu, with additional arguments
added to allow `qmp-shell` to then connect to the recently launched
QEMU instance.
usage: qmp-shell-wrap [-h] [-H] [-v] [-p] [-l LOGFILE] ...
positional arguments:
command QEMU command line to invoke
optional arguments:
-h, --help show this help message and exit
-H, --hmp Use HMP interface
-v, --verbose Verbose (echo commands sent and received)
-p, --pretty Pretty-print JSON
-l LOGFILE, --logfile LOGFILE
Save log of all QMP messages to PATH
Usage
-----
Prepend "qmp-shell-wrap" to your usual QEMU command line::
> qmp-shell-wrap qemu-system-x86_64 -M q35 -m 4096 -display none
Welcome to the QMP low-level shell!
Connected
(QEMU)
"""
parser = common_parser()
parser.add_argument('command', nargs=argparse.REMAINDER,
help='QEMU command line to invoke')
@ -610,6 +679,8 @@ def main_wrap() -> None:
for _ in qemu.repl():
pass
except FileNotFoundError:
sys.stderr.write(f"ERROR: QEMU executable '{cmd[0]}' not found.\n")
finally:
os.unlink(sockpath)

View file

@ -21,6 +21,7 @@ import json
import logging
from logging import Handler, LogRecord
import signal
import sys
from typing import (
List,
Optional,
@ -30,17 +31,27 @@ from typing import (
cast,
)
from pygments import lexers
from pygments import token as Token
import urwid
import urwid_readline
try:
from pygments import lexers
from pygments import token as Token
import urwid
import urwid_readline
except ModuleNotFoundError as exc:
print(
f"Module '{exc.name}' not found.",
"You need the optional 'tui' group: pip install qemu.qmp[tui]",
sep='\n',
file=sys.stderr,
)
sys.exit(1)
from .error import ProtocolError
from .legacy import QEMUMonitorProtocol, QMPBadPortError
from .message import DeserializationError, Message, UnexpectedTypeError
from .protocol import ConnectError, Runstate
from .qmp_client import ExecInterruptedError, QMPClient
from .util import create_task, pretty_traceback
from .util import get_or_create_event_loop, pretty_traceback
# The name of the signal that is used to update the history list
@ -225,7 +236,7 @@ class App(QMPClient):
"""
try:
msg = Message(bytes(raw_msg, encoding='utf-8'))
create_task(self._send_to_server(msg))
asyncio.create_task(self._send_to_server(msg))
except (DeserializationError, UnexpectedTypeError) as err:
raw_msg = format_json(raw_msg)
logging.info('Invalid message: %s', err.error_message)
@ -246,7 +257,7 @@ class App(QMPClient):
Initiates killing of app. A bridge between asynchronous and synchronous
code.
"""
create_task(self._kill_app())
asyncio.create_task(self._kill_app())
async def _kill_app(self) -> None:
"""
@ -376,8 +387,7 @@ class App(QMPClient):
"""
screen = urwid.raw_display.Screen()
screen.set_terminal_properties(256)
self.aloop = asyncio.get_event_loop()
self.aloop = get_or_create_event_loop()
self.aloop.set_debug(debug)
# Gracefully handle SIGTERM and SIGINT signals
@ -393,7 +403,7 @@ class App(QMPClient):
handle_mouse=True,
event_loop=event_loop)
create_task(self.manage_connection(), self.aloop)
self.aloop.create_task(self.manage_connection())
try:
main_loop.run()
except Exception as err:

View file

@ -1,25 +1,16 @@
"""
Miscellaneous Utilities
This module provides asyncio utilities and compatibility wrappers for
Python 3.6 to provide some features that otherwise become available in
Python 3.7+.
Various logging and debugging utilities are also provided, such as
`exception_summary()` and `pretty_traceback()`, used primarily for
adding information into the logging stream.
This module provides asyncio and various logging and debugging
utilities, such as `exception_summary()` and `pretty_traceback()`, used
primarily for adding information into the logging stream.
"""
import asyncio
import sys
import traceback
from typing import (
Any,
Coroutine,
Optional,
TypeVar,
cast,
)
from typing import TypeVar, cast
import warnings
T = TypeVar('T')
@ -30,9 +21,35 @@ T = TypeVar('T')
# --------------------------
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Return this thread's current event loop, or create a new one.
This function behaves similarly to asyncio.get_event_loop() in
Python<=3.13, where if there is no event loop currently associated
with the current context, it will create and register one. It should
generally not be used in any asyncio-native applications.
"""
try:
with warnings.catch_warnings():
# Python <= 3.13 will trigger deprecation warnings if no
# event loop is set, but will create and set a new loop.
warnings.simplefilter("ignore")
loop = asyncio.get_event_loop()
except RuntimeError:
# Python 3.14+: No event loop set for this thread,
# create and set one.
loop = asyncio.new_event_loop()
# Set this loop as the current thread's loop, to be returned
# by calls to get_event_loop() in the future.
asyncio.set_event_loop(loop)
return loop
async def flush(writer: asyncio.StreamWriter) -> None:
"""
Utility function to ensure a StreamWriter is *fully* drained.
Utility function to ensure an `asyncio.StreamWriter` is *fully* drained.
`asyncio.StreamWriter.drain` only promises we will return to below
the "high-water mark". This function ensures we flush the entire
@ -72,102 +89,13 @@ def bottom_half(func: T) -> T:
These methods do not, in general, have the ability to directly
report information to a callers context and will usually be
collected as a Task result instead.
collected as an `asyncio.Task` result instead.
They must not call upper-half functions directly.
"""
return func
# -------------------------------
# Section: Compatibility Wrappers
# -------------------------------
def create_task(coro: Coroutine[Any, Any, T],
loop: Optional[asyncio.AbstractEventLoop] = None
) -> 'asyncio.Future[T]':
"""
Python 3.6-compatible `asyncio.create_task` wrapper.
:param coro: The coroutine to execute in a task.
:param loop: Optionally, the loop to create the task in.
:return: An `asyncio.Future` object.
"""
if sys.version_info >= (3, 7):
if loop is not None:
return loop.create_task(coro)
return asyncio.create_task(coro) # pylint: disable=no-member
# Python 3.6:
return asyncio.ensure_future(coro, loop=loop)
def is_closing(writer: asyncio.StreamWriter) -> bool:
"""
Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
:param writer: The `asyncio.StreamWriter` object.
:return: `True` if the writer is closing, or closed.
"""
if sys.version_info >= (3, 7):
return writer.is_closing()
# Python 3.6:
transport = writer.transport
assert isinstance(transport, asyncio.WriteTransport)
return transport.is_closing()
async def wait_closed(writer: asyncio.StreamWriter) -> None:
"""
Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
:param writer: The `asyncio.StreamWriter` to wait on.
"""
if sys.version_info >= (3, 7):
await writer.wait_closed()
return
# Python 3.6
transport = writer.transport
assert isinstance(transport, asyncio.WriteTransport)
while not transport.is_closing():
await asyncio.sleep(0)
# This is an ugly workaround, but it's the best I can come up with.
sock = transport.get_extra_info('socket')
if sock is None:
# Our transport doesn't have a socket? ...
# Nothing we can reasonably do.
return
while sock.fileno() != -1:
await asyncio.sleep(0)
def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
"""
Python 3.6-compatible `asyncio.run` wrapper.
:param coro: A coroutine to execute now.
:return: The return value from the coroutine.
"""
if sys.version_info >= (3, 7):
return asyncio.run(coro, debug=debug)
# Python 3.6
loop = asyncio.get_event_loop()
loop.set_debug(debug)
ret = loop.run_until_complete(coro)
loop.close()
return ret
# ----------------------------
# Section: Logging & Debugging
# ----------------------------
@ -177,8 +105,11 @@ def exception_summary(exc: BaseException) -> str:
"""
Return a summary string of an arbitrary exception.
It will be of the form "ExceptionType: Error Message", if the error
It will be of the form "ExceptionType: Error Message" if the error
string is non-empty, and just "ExceptionType" otherwise.
This code is based on CPython's implementation of
`traceback.TracebackException.format_exception_only`.
"""
name = type(exc).__qualname__
smod = type(exc).__module__

View file

@ -8,7 +8,6 @@ import avocado
from qemu.qmp import ConnectError, Runstate
from qemu.qmp.protocol import AsyncProtocol, StateError
from qemu.qmp.util import asyncio_run, create_task
class NullProtocol(AsyncProtocol[None]):
@ -124,7 +123,7 @@ def run_as_task(coro, allow_cancellation=False):
if allow_cancellation:
return
raise
return create_task(_runner())
return asyncio.create_task(_runner())
@contextmanager
@ -228,7 +227,7 @@ class TestBase(avocado.Test):
Decorator; adds SetUp and TearDown to async tests.
"""
async def _wrapper(self, *args, **kwargs):
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
loop.set_debug(True)
await self._asyncSetUp()
@ -271,7 +270,7 @@ class TestBase(avocado.Test):
msg=f"Expected state '{state.name}'",
)
self.runstate_watcher = create_task(_watcher())
self.runstate_watcher = asyncio.create_task(_watcher())
# Kick the loop and force the task to block on the event.
await asyncio.sleep(0)
@ -589,7 +588,8 @@ class SimpleSession(TestBase):
async def testSmoke(self):
with TemporaryDirectory(suffix='.qmp') as tmpdir:
sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
server_task = create_task(self.server.start_server_and_accept(sock))
server_task = asyncio.create_task(
self.server.start_server_and_accept(sock))
# give the server a chance to start listening [...]
await asyncio.sleep(0)

View file

@ -277,6 +277,7 @@ class BuiltinNBD(NBDBlockdevAddBase):
} }
self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
sockfd.close()
self._server_down()

View file

@ -263,6 +263,11 @@ class TestThrottledWithNbdExportBase(iotests.QMPTestCase):
break
except subprocess.TimeoutExpired:
self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
try:
p.kill()
p.stdout.close()
except:
pass
except IndexError:
pass

View file

@ -21,6 +21,7 @@ import sys
import argparse
import shutil
from pathlib import Path
import warnings
from findtests import TestFinder
from testenv import TestEnv
@ -137,6 +138,9 @@ def make_argparser() -> argparse.ArgumentParser:
if __name__ == '__main__':
warnings.simplefilter("default")
os.environ["PYTHONWARNINGS"] = "default"
args = make_argparser().parse_args()
env = TestEnv(source_dir=args.source_dir,

View file

@ -22,15 +22,12 @@ import tempfile
from pathlib import Path
import shutil
import collections
import contextlib
import random
import subprocess
import glob
from typing import List, Dict, Any, Optional
if sys.version_info >= (3, 9):
from contextlib import AbstractContextManager as ContextManager
else:
from typing import ContextManager
DEF_GDB_OPTIONS = 'localhost:12345'
@ -58,7 +55,7 @@ def get_default_machine(qemu_prog: str) -> str:
return default_machine
class TestEnv(ContextManager['TestEnv']):
class TestEnv(contextlib.AbstractContextManager['TestEnv']):
"""
Manage system environment for running tests

View file

@ -30,11 +30,6 @@ from multiprocessing import Pool
from typing import List, Optional, Any, Sequence, Dict
from testenv import TestEnv
if sys.version_info >= (3, 9):
from contextlib import AbstractContextManager as ContextManager
else:
from typing import ContextManager
def silent_unlink(path: Path) -> None:
try:
@ -57,7 +52,7 @@ def file_diff(file1: str, file2: str) -> List[str]:
return res
class LastElapsedTime(ContextManager['LastElapsedTime']):
class LastElapsedTime(contextlib.AbstractContextManager['LastElapsedTime']):
""" Cache for elapsed time for tests, to show it during new test run
It is safe to use get() at any time. To use update(), you must either
@ -112,7 +107,7 @@ class TestResult:
self.interrupted = interrupted
class TestRunner(ContextManager['TestRunner']):
class TestRunner(contextlib.AbstractContextManager['TestRunner']):
shared_self = None
@staticmethod