Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DaskTaskRunner: Completed tasks are re-computed when workers die #15442

Open
ashtuchkin opened this issue Sep 19, 2024 · 0 comments
Open

DaskTaskRunner: Completed tasks are re-computed when workers die #15442

ashtuchkin opened this issue Sep 19, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@ashtuchkin
Copy link

Bug summary

Summary: When using DaskTaskRunner, when a dask worker unexpectedly dies for any reason, all previous successfully completed tasks are re-run on other workers, even though Prefect treats them as complete. This is a problem for expensive or non-idempotent tasks.

The reason is likely that dask scheduler thinks that we need the futures' data still on the worker and if that worker dies, it schedules a recompute. To avoid this behavior, the futures either need to be dropped, or a .release() needs to be called.

Here's a flow that shows this behavior on both prefect 2 and 3:

import os
from time import sleep
from prefect import flow, get_run_logger, task
from prefect_dask import DaskTaskRunner

@task
def my_task(id: int):
    logger = get_run_logger()
    if id == 9:
        logger.info(f"Task {id}: crashing the worker")
        os._exit(0)
    else:
        logger.info(f"Task {id} running")
        sleep(1)

@flow(
    task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 1, "threads_per_worker": 1})
)
def my_flow():
    logger = get_run_logger()
    for i, fut in enumerate(my_task.map(range(10))):
        fut.wait()
        logger.info(f"Future {i} complete")

if __name__ == "__main__":
    my_flow()

When running this script, note that after the worker is crashed, the tasks that were completed before are re-computed on the new worker.

For Prefect 2, this PR should help: #13536. Not sure why, but the same thing happens in Prefect 3 in my tests.

Version info (prefect version output)

Version:             2.19.9
API version:         0.8.4
Python version:      3.11.7
Git commit:          0e5fa902
Built:               Thu, Jul 25, 2024 11:59 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

No response

@ashtuchkin ashtuchkin added the bug Something isn't working label Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant