Skip to content

Commit

Permalink
fix: argo workflows daemon not propagating certain system tags (#2047)
Browse files Browse the repository at this point in the history
* move argo heartbeat cli command under argo-workflows in order to gain access to a fully initialized flow object, which has correct tags set.

* add top-level options to the argo daemon
  • Loading branch information
saikonen committed Sep 19, 2024
1 parent cc32b5f commit 8cb94c4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 63 deletions.
31 changes: 27 additions & 4 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2522,10 +2522,29 @@ def _heartbeat_daemon_template(self):
# Use all the affordances available to _parameters task
executable = self.environment.executable("_parameters")
run_id = "argo-{{workflow.name}}"
entrypoint = [executable, "-m metaflow.plugins.argo.daemon"]
heartbeat_cmds = "{entrypoint} --flow_name {flow_name} --run_id {run_id} {tags} heartbeat".format(
script_name = os.path.basename(sys.argv[0])
entrypoint = [executable, script_name]
# FlowDecorators can define their own top-level options. These might affect run level information
# so it is important to pass these to the heartbeat process as well, as it might be the first task to register a run.
top_opts_dict = {}
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

top_level = list(dict_to_cli_options(top_opts_dict)) + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.TYPE,
"--monitor=%s" % self.monitor.TYPE,
"--no-pylint",
"--with=argo_workflows_internal:auto-emit-argo-events=%i"
% self.auto_emit_argo_events,
]
heartbeat_cmds = "{entrypoint} {top_level} argo-workflows heartbeat --run_id {run_id} {tags}".format(
entrypoint=" ".join(entrypoint),
flow_name=self.flow.name,
top_level=" ".join(top_level) if top_level else "",
run_id=run_id,
tags=" ".join(["--tag %s" % t for t in self.tags]) if self.tags else "",
)
Expand Down Expand Up @@ -2576,12 +2595,16 @@ def _heartbeat_daemon_template(self):
"METAFLOW_SERVICE_URL": SERVICE_INTERNAL_URL,
"METAFLOW_SERVICE_HEADERS": json.dumps(SERVICE_HEADERS),
"METAFLOW_USER": "argo-workflows",
"METAFLOW_DATASTORE_SYSROOT_S3": DATASTORE_SYSROOT_S3,
"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": DEFAULT_METADATA,
"METAFLOW_CARD_S3ROOT": CARD_S3ROOT,
"METAFLOW_KUBERNETES_WORKLOAD": 1,
"METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA,
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_OWNER": self.username,
"METAFLOW_PRODUCTION_TOKEN": self.production_token,
"METAFLOW_PRODUCTION_TOKEN": self.production_token, # Used in identity resolving. This affects system tags.
}
# support Metaflow sandboxes
env["METAFLOW_INIT_SCRIPT"] = KUBERNETES_SANDBOX_INIT_SCRIPT
Expand Down
26 changes: 26 additions & 0 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
import sys
from hashlib import sha1
from time import sleep

from metaflow import JSONType, Run, current, decorators, parameters
from metaflow._vendor import click
Expand Down Expand Up @@ -959,6 +960,31 @@ def list_workflow_templates(obj, all=None):
obj.echo_always(template_name)


# Internal CLI command to run a heartbeat daemon in an Argo Workflows Daemon container.
@argo_workflows.command(hidden=True, help="start heartbeat process for a run")
@click.option("--run_id", required=True)
@click.option(
"--tag",
"tags",
multiple=True,
default=None,
help="Annotate all objects produced by Argo Workflows runs "
"with the given tag. You can specify this option multiple "
"times to attach multiple tags.",
)
@click.pass_obj
def heartbeat(obj, run_id, tags=None):
# Try to register a run in case the start task has not taken care of it yet.
obj.metadata.register_run_id(run_id, tags)
# Start run heartbeat
obj.metadata.start_run_heartbeat(obj.flow.name, run_id)
# Keepalive loop
while True:
# Do not pollute daemon logs with anything unnecessary,
# as they might be extremely long running.
sleep(10)


def validate_run_id(
workflow_name, token_prefix, authorize, run_id, instructions_fn=None
):
Expand Down
59 changes: 0 additions & 59 deletions metaflow/plugins/argo/daemon.py

This file was deleted.

0 comments on commit 8cb94c4

Please sign in to comment.