diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 1e05476c43..9f293b3b66 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -2522,10 +2522,29 @@ def _heartbeat_daemon_template(self): # Use all the affordances available to _parameters task executable = self.environment.executable("_parameters") run_id = "argo-{{workflow.name}}" - entrypoint = [executable, "-m metaflow.plugins.argo.daemon"] - heartbeat_cmds = "{entrypoint} --flow_name {flow_name} --run_id {run_id} {tags} heartbeat".format( + script_name = os.path.basename(sys.argv[0]) + entrypoint = [executable, script_name] + # FlowDecorators can define their own top-level options. These might affect run level information + # so it is important to pass these to the heartbeat process as well, as it might be the first task to register a run. + top_opts_dict = {} + for deco in flow_decorators(self.flow): + top_opts_dict.update(deco.get_top_level_options()) + + top_level = list(dict_to_cli_options(top_opts_dict)) + [ + "--quiet", + "--metadata=%s" % self.metadata.TYPE, + "--environment=%s" % self.environment.TYPE, + "--datastore=%s" % self.flow_datastore.TYPE, + "--datastore-root=%s" % self.flow_datastore.datastore_root, + "--event-logger=%s" % self.event_logger.TYPE, + "--monitor=%s" % self.monitor.TYPE, + "--no-pylint", + "--with=argo_workflows_internal:auto-emit-argo-events=%i" + % self.auto_emit_argo_events, + ] + heartbeat_cmds = "{entrypoint} {top_level} argo-workflows heartbeat --run_id {run_id} {tags}".format( entrypoint=" ".join(entrypoint), - flow_name=self.flow.name, + top_level=" ".join(top_level) if top_level else "", run_id=run_id, tags=" ".join(["--tag %s" % t for t in self.tags]) if self.tags else "", ) @@ -2576,12 +2595,16 @@ def _heartbeat_daemon_template(self): "METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL, "METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS), "METAFLOW_USER": "argo-workflows", + "METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3, + "METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT, "METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE, "METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA, + "METAFLOW_CARD_S3ROOT": CARD_S3ROOT, "METAFLOW_KUBERNETES_WORKLOAD": 1, + "METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA, "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes", "METAFLOW_OWNER": self.username, - "METAFLOW_PRODUCTION_TOKEN": self.production_token, + "METAFLOW_PRODUCTION_TOKEN": self.production_token, # Used in identity resolving. This affects system tags. } # support Metaflow sandboxes env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index a69f991f2e..7ce3a39025 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -4,6 +4,7 @@ import re import sys from hashlib import sha1 +from time import sleep from metaflow import JSONType, Run, current, decorators, parameters from metaflow._vendor import click @@ -959,6 +960,31 @@ def list_workflow_templates(obj, all=None): obj.echo_always(template_name) +# Internal CLI command to run a heartbeat daemon in an Argo Workflows Daemon container. +@argo_workflows.command(hidden=True, help="start heartbeat process for a run") +@click.option("--run_id", required=True) +@click.option( + "--tag", + "tags", + multiple=True, + default=None, + help="Annotate all objects produced by Argo Workflows runs " + "with the given tag. You can specify this option multiple " + "times to attach multiple tags.", +) +@click.pass_obj +def heartbeat(obj, run_id, tags=None): + # Try to register a run in case the start task has not taken care of it yet. + obj.metadata.register_run_id(run_id, tags) + # Start run heartbeat + obj.metadata.start_run_heartbeat(obj.flow.name, run_id) + # Keepalive loop + while True: + # Do not pollute daemon logs with anything unnecessary, + # as they might be extremely long running. + sleep(10) + + def validate_run_id( workflow_name, token_prefix, authorize, run_id, instructions_fn=None ): diff --git a/metaflow/plugins/argo/daemon.py b/metaflow/plugins/argo/daemon.py deleted file mode 100644 index 4bc0b45cc8..0000000000 --- a/metaflow/plugins/argo/daemon.py +++ /dev/null @@ -1,59 +0,0 @@ -from collections import namedtuple -from time import sleep -from metaflow.metaflow_config import DEFAULT_METADATA -from metaflow.metaflow_environment import MetaflowEnvironment -from metaflow.plugins import METADATA_PROVIDERS -from metaflow._vendor import click - - -class CliState: - pass - - -@click.group() -@click.option("--flow_name", required=True) -@click.option("--run_id", required=True) -@click.option( - "--tag", - "tags", - multiple=True, - default=None, - help="Annotate all objects produced by Argo Workflows runs " - "with the given tag. You can specify this option multiple " - "times to attach multiple tags.", -) -@click.pass_context -def cli(ctx, flow_name, run_id, tags=None): - ctx.obj = CliState() - ctx.obj.flow_name = flow_name - ctx.obj.run_id = run_id - ctx.obj.tags = tags - # Use a dummy flow to initialize the environment and metadata service, - # as we only need a name for the flow object. - flow = namedtuple("DummyFlow", "name") - dummyflow = flow(flow_name) - - # Initialize a proper metadata service instance - environment = MetaflowEnvironment(dummyflow) - - ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == DEFAULT_METADATA][0]( - environment, dummyflow, None, None - ) - - -@cli.command(help="start heartbeat process for a run") -@click.pass_obj -def heartbeat(obj): - # Try to register a run in case the start task has not taken care of it yet. - obj.metadata.register_run_id(obj.run_id, obj.tags) - # Start run heartbeat - obj.metadata.start_run_heartbeat(obj.flow_name, obj.run_id) - # Keepalive loop - while True: - # Do not pollute daemon logs with anything unnecessary, - # as they might be extremely long running. - sleep(10) - - -if __name__ == "__main__": - cli()