Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to break parking lots, stop locks from stalling #3081

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/3035.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.StalledLockError` when ``acquire()`` would previously stall due to the owner of the lock having exited without releasing the lock.
1 change: 1 addition & 0 deletions newsfragments/3081.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally. Breaking a parking lot raises :exc:`trio.BrokenResourceError` for all tasks currently parked in the lot, and any tasks attempting to park in an already broken parking lot will also error. The breakage status of a lot can be viewed and manually modified with the ``trio.ParkingLot.broken_by`` attribute.
jakkdl marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
Lock as Lock,
LockStatistics as LockStatistics,
Semaphore as Semaphore,
StalledLockError as StalledLockError,
StrictFIFOLock as StrictFIFOLock,
)
from ._timeouts import (
Expand Down
7 changes: 6 additions & 1 deletion src/trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection
from ._local import RunVar, RunVarToken
from ._mock_clock import MockClock
from ._parking_lot import ParkingLot, ParkingLotStatistics
from ._parking_lot import (
ParkingLot,
ParkingLotStatistics,
add_parking_lot_breaker,
remove_parking_lot_breaker,
)

# Imports that always exist
from ._run import (
Expand Down
53 changes: 53 additions & 0 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from typing import TYPE_CHECKING

import attrs
import outcome

from .. import _core
from .._util import final
Expand All @@ -86,6 +87,31 @@
from ._run import Task


GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {}


def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Register a task as a breaker for a lot. This means that if the task exits without
having unparked from the lot, then the lot will break and raise an error for all tasks
parked in the lot, as well as any future task that attempt to park in it."""
jakkdl marked this conversation as resolved.
Show resolved Hide resolved
if task not in GLOBAL_PARKING_LOT_BREAKER:
GLOBAL_PARKING_LOT_BREAKER[task] = [lot]
else:
GLOBAL_PARKING_LOT_BREAKER[task].append(lot)


def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Deregister a task as a breaker for a lot. See :func:`add_parking_lot_breaker`."""
try:
GLOBAL_PARKING_LOT_BREAKER[task].remove(lot)
except (KeyError, ValueError):
raise RuntimeError(
"Attempted to remove task as breaker for a lot it is not registered for",
) from None
if not GLOBAL_PARKING_LOT_BREAKER[task]:
del GLOBAL_PARKING_LOT_BREAKER[task]


@attrs.frozen
class ParkingLotStatistics:
"""An object containing debugging information for a ParkingLot.
Expand Down Expand Up @@ -118,6 +144,7 @@ class ParkingLot:
# {task: None}, we just want a deque where we can quickly delete random
# items
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False)
broken_by: Task | None = None

def __len__(self) -> int:
"""Returns the number of parked tasks."""
Expand All @@ -136,7 +163,15 @@ async def park(self) -> None:
"""Park the current task until woken by a call to :meth:`unpark` or
:meth:`unpark_all`.

Raises:
BrokenResourceError: if attempting to park in a broken lot, or the lot
breaks before we get to unpark.

"""
if self.broken_by is not None:
raise _core.BrokenResourceError(
f"Attempted to park in parking lot broken by {self.broken_by}",
)
task = _core.current_task()
self._parked[task] = None
task.custom_sleep_data = self
Expand Down Expand Up @@ -234,6 +269,24 @@ def repark_all(self, new_lot: ParkingLot) -> None:
"""
return self.repark(new_lot, count=len(self))

def break_lot(self, task: Task | None = None) -> None:
"""Break this lot, causing all parked tasks to raise an error, and any
future tasks attempting to park (and unpark? repark?) to error. The error
A5rocks marked this conversation as resolved.
Show resolved Hide resolved
contains a reference to the task sent as a parameter.
"""
if task is None:
task = _core.current_task()
self.broken_by = task
A5rocks marked this conversation as resolved.
Show resolved Hide resolved

for parked_task in self._parked:
_core.reschedule(
parked_task,
outcome.Error(
_core.BrokenResourceError(f"Parking lot broken by {task}"),
),
)
self._parked.clear()

def statistics(self) -> ParkingLotStatistics:
"""Return an object containing debugging information.

Expand Down
7 changes: 7 additions & 0 deletions src/trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._exceptions import Cancelled, RunFinishedError, TrioInternalError
from ._instrumentation import Instruments
from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection
from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER
from ._thread_cache import start_thread_soon
from ._traps import (
Abort,
Expand Down Expand Up @@ -1859,6 +1860,12 @@ def task_exited(self, task: Task, outcome: Outcome[Any]) -> None:
assert task._parent_nursery is not None, task
task._parent_nursery._child_finished(task, outcome)

# before or after the other stuff in this function?
if task in GLOBAL_PARKING_LOT_BREAKER:
for lot in GLOBAL_PARKING_LOT_BREAKER[task]:
lot.break_lot(task)
Zac-HD marked this conversation as resolved.
Show resolved Hide resolved
del GLOBAL_PARKING_LOT_BREAKER[task]

if "task_exited" in self.instruments:
self.instruments.call("task_exited", task)

Expand Down
73 changes: 73 additions & 0 deletions src/trio/_core/_tests/test_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import pytest

import trio.lowlevel
from trio.testing import Matcher, RaisesGroup

from ... import _core
from ...testing import wait_all_tasks_blocked
from .._parking_lot import ParkingLot
Expand Down Expand Up @@ -215,3 +218,73 @@ async def test_parking_lot_repark_with_count() -> None:
"wake 2",
]
lot1.unpark_all()


async def test_parking_lot_breaker_basic() -> None:
lot = ParkingLot()
task = trio.lowlevel.current_task()

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
trio.lowlevel.remove_parking_lot_breaker(task, lot)
trio.lowlevel.add_parking_lot_breaker(task, lot)
trio.lowlevel.add_parking_lot_breaker(task, lot)
trio.lowlevel.remove_parking_lot_breaker(task, lot)
trio.lowlevel.remove_parking_lot_breaker(task, lot)
jakkdl marked this conversation as resolved.
Show resolved Hide resolved

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
trio.lowlevel.remove_parking_lot_breaker(task, lot)

lot.break_lot()
assert lot.broken_by == task


async def test_parking_lot_breaker() -> None:
async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None:
trio.lowlevel.add_parking_lot_breaker(trio.lowlevel.current_task(), lot)
with scope:
await trio.sleep_forever()

lot = ParkingLot()
cs = _core.CancelScope()

# check that parked task errors
with RaisesGroup(
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
):
async with _core.open_nursery() as nursery:
nursery.start_soon(bad_parker, lot, cs)
await wait_all_tasks_blocked()

nursery.start_soon(lot.park)
await wait_all_tasks_blocked()

cs.cancel()

# check that trying to park in brokena lot errors
jakkdl marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(_core.BrokenResourceError):
await lot.park()


async def test_parking_lot_weird() -> None:
"""break a parking lot, where the breakee is parked. Doing this is weird, but should probably be supported??
Although the message makes less sense"""

async def return_me_and_park(
lot: ParkingLot,
*,
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
) -> None:
task_status.started(_core.current_task())
await lot.park()

lot = ParkingLot()
with RaisesGroup(Matcher(_core.BrokenResourceError, match="Parking lot broken by")):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with RaisesGroup(Matcher(_core.BrokenResourceError, match="Parking lot broken by")):
with RaisesGroup(Matcher(_core.BrokenResourceError, match="^Parking lot broken by")):

for consistency

async with _core.open_nursery() as nursery:
task = await nursery.start(return_me_and_park, lot)
lot.break_lot(task)
35 changes: 29 additions & 6 deletions src/trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import _core
from ._core import Abort, ParkingLot, RaiseCancelT, enable_ki_protection
from ._core._parking_lot import add_parking_lot_breaker, remove_parking_lot_breaker
Comment on lines 11 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could import the functions from ._core for a nicer import

from ._util import final

if TYPE_CHECKING:
Expand All @@ -18,6 +19,11 @@
from ._core._parking_lot import ParkingLotStatistics


class StalledLockError(Exception):
"""Raised by :meth:`Lock.acquire` and :meth:`StrictFIFOLock.acquire` if the owner
exits, or has previously exited, without releasing the lock."""
A5rocks marked this conversation as resolved.
Show resolved Hide resolved


@attrs.frozen
class EventStatistics:
"""An object containing debugging information.
Expand Down Expand Up @@ -576,20 +582,30 @@ def acquire_nowait(self) -> None:
elif self._owner is None and not self._lot:
# No-one owns it
self._owner = task
add_parking_lot_breaker(task, self._lot)
else:
raise trio.WouldBlock

@enable_ki_protection
async def acquire(self) -> None:
"""Acquire the lock, blocking if necessary."""
"""Acquire the lock, blocking if necessary.

Raises:
StalledLockError: if the owner of the lock exits without releasing.
"""
await trio.lowlevel.checkpoint_if_cancelled()
try:
self.acquire_nowait()
except trio.WouldBlock:
# NOTE: it's important that the contended acquire path is just
# "_lot.park()", because that's how Condition.wait() acquires the
# lock as well.
await self._lot.park()
try:
# NOTE: it's important that the contended acquire path is just
# "_lot.park()", because that's how Condition.wait() acquires the
# lock as well.
await self._lot.park()
except trio.BrokenResourceError:
raise StalledLockError(
"Owner of this lock exited without releasing: {self._owner}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Owner of this lock exited without releasing: {self._owner}",
f"Owner of this lock exited without releasing: {self._owner}",

) from None
else:
await trio.lowlevel.cancel_shielded_checkpoint()

Expand All @@ -604,8 +620,10 @@ def release(self) -> None:
task = trio.lowlevel.current_task()
if task is not self._owner:
raise RuntimeError("can't release a Lock you don't own")
remove_parking_lot_breaker(self._owner, self._lot)
if self._lot:
(self._owner,) = self._lot.unpark(count=1)
add_parking_lot_breaker(self._owner, self._lot)
else:
self._owner = None

Expand Down Expand Up @@ -767,7 +785,11 @@ def acquire_nowait(self) -> None:
return self._lock.acquire_nowait()

async def acquire(self) -> None:
"""Acquire the underlying lock, blocking if necessary."""
"""Acquire the underlying lock, blocking if necessary.

Raises:
StalledLockError: if the owner of the lock exits without releasing.
"""
await self._lock.acquire()

def release(self) -> None:
Expand Down Expand Up @@ -796,6 +818,7 @@ async def wait(self) -> None:

Raises:
RuntimeError: if the calling task does not hold the lock.
StalledLockError: if the owner of the lock exits without releasing, when attempting to re-acquire.

"""
if trio.lowlevel.current_task() is not self._lock._owner:
Expand Down
59 changes: 59 additions & 0 deletions src/trio/_tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

import pytest

from trio.testing import Matcher, RaisesGroup

from .. import _core
from .._core._parking_lot import GLOBAL_PARKING_LOT_BREAKER
from .._sync import *
from .._timeouts import sleep_forever
from ..testing import assert_checkpoints, wait_all_tasks_blocked
Expand Down Expand Up @@ -586,3 +589,59 @@ async def lock_taker() -> None:
await wait_all_tasks_blocked()
assert record == ["started"]
lock_like.release()


async def test_lock_acquire_unowned_lock() -> None:
"""Test that trying to acquire a lock whose owner has exited raises an error.
Partial fix for https://github.com/python-trio/trio/issues/3035
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about testing the full fix because this PR should be the full fix.

"""
assert not GLOBAL_PARKING_LOT_BREAKER
lock = trio.Lock()
async with trio.open_nursery() as nursery:
nursery.start_soon(lock.acquire)
with pytest.raises(
trio.StalledLockError,
match="^Owner of this lock exited without releasing",
):
await lock.acquire()
assert not GLOBAL_PARKING_LOT_BREAKER


async def test_lock_multiple_acquire() -> None:
assert not GLOBAL_PARKING_LOT_BREAKER
lock = trio.Lock()
with RaisesGroup(
Matcher(
trio.StalledLockError,
match="^Owner of this lock exited without releasing",
),
):
async with trio.open_nursery() as nursery:
nursery.start_soon(lock.acquire)
nursery.start_soon(lock.acquire)
assert not GLOBAL_PARKING_LOT_BREAKER


async def test_lock_handover() -> None:
assert not GLOBAL_PARKING_LOT_BREAKER
lock = trio.Lock()
lock.acquire_nowait()
child_task: Task | None = None
assert GLOBAL_PARKING_LOT_BREAKER == {
_core.current_task(): [
lock._lot,
],
}

async with trio.open_nursery() as nursery:
nursery.start_soon(lock.acquire)
await wait_all_tasks_blocked()

lock.release()

assert len(GLOBAL_PARKING_LOT_BREAKER) == 1
child_task = next(iter(GLOBAL_PARKING_LOT_BREAKER))
assert GLOBAL_PARKING_LOT_BREAKER[child_task] == [lock._lot]

assert lock._lot.broken_by == child_task
assert not GLOBAL_PARKING_LOT_BREAKER
2 changes: 2 additions & 0 deletions src/trio/lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
UnboundedQueue as UnboundedQueue,
UnboundedQueueStatistics as UnboundedQueueStatistics,
add_instrument as add_instrument,
add_parking_lot_breaker as add_parking_lot_breaker,
cancel_shielded_checkpoint as cancel_shielded_checkpoint,
checkpoint as checkpoint,
checkpoint_if_cancelled as checkpoint_if_cancelled,
Expand All @@ -40,6 +41,7 @@
permanently_detach_coroutine_object as permanently_detach_coroutine_object,
reattach_detached_coroutine_object as reattach_detached_coroutine_object,
remove_instrument as remove_instrument,
remove_parking_lot_breaker as remove_parking_lot_breaker,
reschedule as reschedule,
spawn_system_task as spawn_system_task,
start_guest_run as start_guest_run,
Expand Down
Loading