Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to extend AssetExecutionContext #24435

Open
paluchs opened this issue Sep 12, 2024 · 0 comments
Open

Ability to extend AssetExecutionContext #24435

paluchs opened this issue Sep 12, 2024 · 0 comments
Labels
area: asset Related to Software-Defined Assets type: feature-request

Comments

@paluchs
Copy link

paluchs commented Sep 12, 2024

What's the use case?

I'm extending the @asset decorator with some arguments that I will also need in the asset function. I'm using context to pass these down like so:

@partitioned_asset(
partition_on = ["some_partition_key"]
)
def my_partitioned_asset(context: AssetExecutionContext):
    partition_on = context.partition_on

Now I've extended AssetExecutionContext like so:

class PartitionedAssetExecutionContext(AssetExecutionContext):
    partition_on: List[str] = None

If I now want to use PartitionedAssetExecutionContext as a type hint for context I get a DagsterInvalidDefinitionError in _validate_context_type_hint.

I can set context.partition_on in my decorator without changing the type hint and the Code will not break. However, it would be nice to be able to guarantee that the context has those attributes.

Ideas of implementation

The implementation of _validate_context_type_hint could be changed so that it also accepts any subclasses of:

  • AssetExecutionContext,
  • OpExecutionContext,
  • AssetCheckExecutionContext

The function could probably be changed like so:

def _validate_context_type_hint(fn):
    from inspect import _empty as EmptyAnnotation

    from dagster._core.decorator_utils import get_function_params
    from dagster._core.definitions.decorators.op_decorator import is_context_provided
    from dagster._core.execution.context.compute import (
        AssetCheckExecutionContext,
        AssetExecutionContext,
        OpExecutionContext,
    )

    params = get_function_params(fn)
    if is_context_provided(params):
        context_classes = [
            AssetExecutionContext,
            OpExecutionContext,
            AssetCheckExecutionContext
        ]

        if not any(
                params[0].annotation is context_class or issubclass(params[0].annotation, context_class)
                for context_class in context_classes
        ) or params[0].annotation is not EmptyAnnotation:
                raise DagsterInvalidDefinitionError(
                    f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`"
                    " must be annotated with AssetExecutionContext, AssetCheckExecutionContext, OpExecutionContext, or left blank."
                )

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@garethbrickman garethbrickman added the area: asset Related to Software-Defined Assets label Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: asset Related to Software-Defined Assets type: feature-request
Projects
None yet
Development

No branches or pull requests

2 participants