-
-
Notifications
You must be signed in to change notification settings - Fork 332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to break parking lots, stop locks from stalling #3081
base: main
Are you sure you want to change the base?
Changes from 14 commits
3bd67a5
4dfa1ad
543a087
c36cdad
127c5fc
1f75d44
6835e87
3b86e80
94ff9a2
eb7a451
e7d7205
c89fb2a
277c7da
21cf0d6
45f78f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock having exited without releasing the lock. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,10 +72,12 @@ | |
from __future__ import annotations | ||
|
||
import math | ||
import warnings | ||
from collections import OrderedDict | ||
from typing import TYPE_CHECKING | ||
|
||
import attrs | ||
import outcome | ||
|
||
from .. import _core | ||
from .._util import final | ||
|
@@ -86,6 +88,33 @@ | |
from ._run import Task | ||
|
||
|
||
GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {} | ||
|
||
|
||
def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: | ||
"""Register a task as a breaker for a lot. If this task exits without being removed | ||
as a breaker, the lot will break. This will cause an error to be raised for all | ||
tasks currently parked in the lot, as well as any future tasks that attempt to | ||
park in it. | ||
""" | ||
if task not in GLOBAL_PARKING_LOT_BREAKER: | ||
GLOBAL_PARKING_LOT_BREAKER[task] = [lot] | ||
else: | ||
GLOBAL_PARKING_LOT_BREAKER[task].append(lot) | ||
|
||
|
||
def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None: | ||
"""Deregister a task as a breaker for a lot. See :func:`add_parking_lot_breaker`.""" | ||
try: | ||
GLOBAL_PARKING_LOT_BREAKER[task].remove(lot) | ||
except (KeyError, ValueError): | ||
raise RuntimeError( | ||
"Attempted to remove task as breaker for a lot it is not registered for", | ||
) from None | ||
if not GLOBAL_PARKING_LOT_BREAKER[task]: | ||
del GLOBAL_PARKING_LOT_BREAKER[task] | ||
|
||
|
||
@attrs.frozen | ||
class ParkingLotStatistics: | ||
"""An object containing debugging information for a ParkingLot. | ||
|
@@ -118,6 +147,7 @@ class ParkingLot: | |
# {task: None}, we just want a deque where we can quickly delete random | ||
# items | ||
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False) | ||
broken_by: Task | None = None | ||
|
||
def __len__(self) -> int: | ||
"""Returns the number of parked tasks.""" | ||
|
@@ -136,7 +166,15 @@ async def park(self) -> None: | |
"""Park the current task until woken by a call to :meth:`unpark` or | ||
:meth:`unpark_all`. | ||
|
||
Raises: | ||
BrokenResourceError: if attempting to park in a broken lot, or the lot | ||
breaks before we get to unpark. | ||
|
||
""" | ||
if self.broken_by is not None: | ||
raise _core.BrokenResourceError( | ||
f"Attempted to park in parking lot broken by {self.broken_by}", | ||
) | ||
task = _core.current_task() | ||
self._parked[task] = None | ||
task.custom_sleep_data = self | ||
|
@@ -234,6 +272,34 @@ def repark_all(self, new_lot: ParkingLot) -> None: | |
""" | ||
return self.repark(new_lot, count=len(self)) | ||
|
||
def break_lot(self, task: Task | None = None) -> None: | ||
"""Break this lot, causing all parked tasks to raise an error, and any | ||
future tasks attempting to park to error. Unpark & repark become no-ops as the | ||
parking lot is empty. | ||
The error raised contains a reference to the task sent as a parameter. | ||
""" | ||
Comment on lines
+276
to
+280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Summary line here too. Also "The error raised contains a reference to the task sent as a parameter." should probably just mention that |
||
if task is None: | ||
task = _core.current_task() | ||
if self.broken_by is not None: | ||
if self.broken_by != task: | ||
warnings.warn( | ||
RuntimeWarning( | ||
f"{task} attempted to break parking lot {self} already broken by {self.broken_by}", | ||
), | ||
stacklevel=2, | ||
) | ||
return | ||
self.broken_by = task | ||
A5rocks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for parked_task in self._parked: | ||
_core.reschedule( | ||
parked_task, | ||
outcome.Error( | ||
_core.BrokenResourceError(f"Parking lot broken by {task}"), | ||
), | ||
) | ||
self._parked.clear() | ||
|
||
def statistics(self) -> ParkingLotStatistics: | ||
"""Return an object containing debugging information. | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,6 +4,9 @@ | |||||
|
||||||
import pytest | ||||||
|
||||||
import trio.lowlevel | ||||||
from trio.testing import Matcher, RaisesGroup | ||||||
|
||||||
from ... import _core | ||||||
from ...testing import wait_all_tasks_blocked | ||||||
from .._parking_lot import ParkingLot | ||||||
|
@@ -215,3 +218,99 @@ async def test_parking_lot_repark_with_count() -> None: | |||||
"wake 2", | ||||||
] | ||||||
lot1.unpark_all() | ||||||
|
||||||
|
||||||
async def test_parking_lot_breaker_basic() -> None: | ||||||
lot = ParkingLot() | ||||||
task = trio.lowlevel.current_task() | ||||||
|
||||||
with pytest.raises( | ||||||
RuntimeError, | ||||||
match="Attempted to remove task as breaker for a lot it is not registered for", | ||||||
): | ||||||
trio.lowlevel.remove_parking_lot_breaker(task, lot) | ||||||
|
||||||
# check that a task can be registered as breaker for the same lot multiple times | ||||||
trio.lowlevel.add_parking_lot_breaker(task, lot) | ||||||
trio.lowlevel.add_parking_lot_breaker(task, lot) | ||||||
trio.lowlevel.remove_parking_lot_breaker(task, lot) | ||||||
trio.lowlevel.remove_parking_lot_breaker(task, lot) | ||||||
jakkdl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
with pytest.raises( | ||||||
RuntimeError, | ||||||
match="Attempted to remove task as breaker for a lot it is not registered for", | ||||||
): | ||||||
trio.lowlevel.remove_parking_lot_breaker(task, lot) | ||||||
|
||||||
# defaults to current task | ||||||
lot.break_lot() | ||||||
assert lot.broken_by == task | ||||||
|
||||||
# breaking the lot again with the same task is a no-op | ||||||
lot.break_lot() | ||||||
|
||||||
# but with a different task it gives a warning | ||||||
async def dummy_task( | ||||||
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, | ||||||
) -> None: | ||||||
task_status.started(_core.current_task()) | ||||||
|
||||||
# The nursery is only to create a task we can pass to lot.break_lot | ||||||
# and has no effect on the test otherwise. | ||||||
async with trio.open_nursery() as nursery: | ||||||
child_task = await nursery.start(dummy_task) | ||||||
with pytest.warns( | ||||||
RuntimeWarning, | ||||||
match="attempted to break parking .* already broken by .*", | ||||||
): | ||||||
lot.break_lot(child_task) | ||||||
nursery.cancel_scope.cancel() | ||||||
|
||||||
# and doesn't change broken_by | ||||||
assert lot.broken_by == task | ||||||
|
||||||
|
||||||
async def test_parking_lot_breaker() -> None: | ||||||
async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None: | ||||||
trio.lowlevel.add_parking_lot_breaker(trio.lowlevel.current_task(), lot) | ||||||
with scope: | ||||||
await trio.sleep_forever() | ||||||
|
||||||
lot = ParkingLot() | ||||||
cs = _core.CancelScope() | ||||||
|
||||||
# check that parked task errors | ||||||
with RaisesGroup( | ||||||
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"), | ||||||
): | ||||||
async with _core.open_nursery() as nursery: | ||||||
nursery.start_soon(bad_parker, lot, cs) | ||||||
await wait_all_tasks_blocked() | ||||||
|
||||||
nursery.start_soon(lot.park) | ||||||
await wait_all_tasks_blocked() | ||||||
|
||||||
cs.cancel() | ||||||
|
||||||
# check that trying to park in broken lot errors | ||||||
with pytest.raises(_core.BrokenResourceError): | ||||||
await lot.park() | ||||||
|
||||||
|
||||||
async def test_parking_lot_weird() -> None: | ||||||
"""break a parking lot, where the breakee is parked. Doing this is weird, but should probably be supported?? | ||||||
Although the message makes less sense""" | ||||||
|
||||||
async def return_me_and_park( | ||||||
lot: ParkingLot, | ||||||
*, | ||||||
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED, | ||||||
) -> None: | ||||||
task_status.started(_core.current_task()) | ||||||
await lot.park() | ||||||
|
||||||
lot = ParkingLot() | ||||||
with RaisesGroup(Matcher(_core.BrokenResourceError, match="Parking lot broken by")): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
for consistency |
||||||
async with _core.open_nursery() as nursery: | ||||||
task = await nursery.start(return_me_and_park, lot) | ||||||
lot.break_lot(task) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ | |||||
|
||||||
from . import _core | ||||||
from ._core import Abort, ParkingLot, RaiseCancelT, enable_ki_protection | ||||||
from ._core._parking_lot import add_parking_lot_breaker, remove_parking_lot_breaker | ||||||
Comment on lines
11
to
+12
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could import the functions from |
||||||
from ._util import final | ||||||
|
||||||
if TYPE_CHECKING: | ||||||
|
@@ -576,20 +577,30 @@ def acquire_nowait(self) -> None: | |||||
elif self._owner is None and not self._lot: | ||||||
# No-one owns it | ||||||
self._owner = task | ||||||
add_parking_lot_breaker(task, self._lot) | ||||||
else: | ||||||
raise trio.WouldBlock | ||||||
|
||||||
@enable_ki_protection | ||||||
async def acquire(self) -> None: | ||||||
"""Acquire the lock, blocking if necessary.""" | ||||||
"""Acquire the lock, blocking if necessary. | ||||||
|
||||||
Raises: | ||||||
BrokenResourceError: if the owner of the lock exits without releasing. | ||||||
""" | ||||||
await trio.lowlevel.checkpoint_if_cancelled() | ||||||
try: | ||||||
self.acquire_nowait() | ||||||
except trio.WouldBlock: | ||||||
# NOTE: it's important that the contended acquire path is just | ||||||
# "_lot.park()", because that's how Condition.wait() acquires the | ||||||
# lock as well. | ||||||
await self._lot.park() | ||||||
try: | ||||||
# NOTE: it's important that the contended acquire path is just | ||||||
# "_lot.park()", because that's how Condition.wait() acquires the | ||||||
# lock as well. | ||||||
await self._lot.park() | ||||||
except trio.BrokenResourceError: | ||||||
raise trio.BrokenResourceError( | ||||||
"Owner of this lock exited without releasing: {self._owner}", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) from None | ||||||
else: | ||||||
await trio.lowlevel.cancel_shielded_checkpoint() | ||||||
|
||||||
|
@@ -604,8 +615,10 @@ def release(self) -> None: | |||||
task = trio.lowlevel.current_task() | ||||||
if task is not self._owner: | ||||||
raise RuntimeError("can't release a Lock you don't own") | ||||||
remove_parking_lot_breaker(self._owner, self._lot) | ||||||
if self._lot: | ||||||
(self._owner,) = self._lot.unpark(count=1) | ||||||
add_parking_lot_breaker(self._owner, self._lot) | ||||||
else: | ||||||
self._owner = None | ||||||
|
||||||
|
@@ -767,7 +780,11 @@ def acquire_nowait(self) -> None: | |||||
return self._lock.acquire_nowait() | ||||||
|
||||||
async def acquire(self) -> None: | ||||||
"""Acquire the underlying lock, blocking if necessary.""" | ||||||
"""Acquire the underlying lock, blocking if necessary. | ||||||
|
||||||
Raises: | ||||||
BrokenResourceError: if the owner of the lock exits without releasing. | ||||||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
await self._lock.acquire() | ||||||
|
||||||
def release(self) -> None: | ||||||
|
@@ -796,6 +813,7 @@ async def wait(self) -> None: | |||||
|
||||||
Raises: | ||||||
RuntimeError: if the calling task does not hold the lock. | ||||||
BrokenResourceError: if the owner of the lock exits without releasing, when attempting to re-acquire. | ||||||
|
||||||
""" | ||||||
if trio.lowlevel.current_task() is not self._lock._owner: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should have a summary.
I think it might be better to link to the
break_lot
to explain the effects of a task exiting without removing themselves.