Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for package decorator #1982

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,14 @@ def before_run(obj, tags, decospecs):
# Package working directory only once per run.
# We explicitly avoid doing this in `start` since it is invoked for every
# step in the run.
# Update package to be None. The package is now created in the @package decorator in a separate thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment needs updating.

obj.package = MetaflowPackage(
obj.flow, obj.environment, obj.echo, obj.package_suffixes
obj.flow,
obj.environment,
obj.echo,
obj.package_suffixes,
obj.flow_datastore,
obj.logger,
)


Expand Down Expand Up @@ -969,6 +975,10 @@ def start(
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

# Add package suffixes and echo to the flow object as they are used by decorators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not needed anymore since you pass it to the MetaflowPackage constructor.

ctx.obj.flow.package_suffixes = ctx.obj.package_suffixes
ctx.obj.flow.echo = echo

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
Expand Down Expand Up @@ -1031,7 +1041,6 @@ def start(
decorators._attach_decorators(ctx.obj.flow, all_decospecs)
# Regenerate graph if we attached more decorators
ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__)

decorators._init_step_decorators(
ctx.obj.flow,
ctx.obj.graph,
Expand Down
1 change: 1 addition & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DEFAULT_METADATA = from_conf("DEFAULT_METADATA", "local")
DEFAULT_MONITOR = from_conf("DEFAULT_MONITOR", "nullSidecarMonitor")
DEFAULT_PACKAGE_SUFFIXES = from_conf("DEFAULT_PACKAGE_SUFFIXES", ".py,.R,.RDS")
DEFAULT_PACKAGE_TIMEOUT = from_conf("DEFAULT_PACKAGE_TIMEOUT", 600)
DEFAULT_AWS_CLIENT_PROVIDER = from_conf("DEFAULT_AWS_CLIENT_PROVIDER", "boto3")
DEFAULT_AZURE_CLIENT_PROVIDER = from_conf(
"DEFAULT_AZURE_CLIENT_PROVIDER", "azure-default"
Expand Down
76 changes: 72 additions & 4 deletions metaflow/package.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import importlib
import os
import sys
Expand All @@ -7,7 +8,7 @@
from io import BytesIO

from .extension_support import EXT_PKG, package_mfext_all
from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES
from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES, DEFAULT_PACKAGE_TIMEOUT
from .exception import MetaflowException
from .util import to_unicode
from . import R, INFO_FILE
Expand Down Expand Up @@ -59,19 +60,60 @@ def _recurse(root):


class MetaflowPackage(object):
def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST):
def __init__(
self,
flow,
environment,
echo,
suffixes=DEFAULT_SUFFIXES_LIST,
flow_datastore=None,
logger=None,
):
self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST))
self.environment = environment
self.metaflow_root = os.path.dirname(__file__)

self.flow_name = flow.name
self._flow = flow
self.flow_datastore = flow_datastore
self.logger = logger
self.create_time = time.time()
self._is_package_available = False
self.blob = None
self.package_url = None
self.package_sha = None

# Make package creation and upload asynchronous
self._init_thread = threading.Thread(
target=self._prepare_and_upload_package,
args=(flow, environment, flow_datastore, echo),
)
self._init_thread.daemon = True
self._init_thread.start()

def _prepare_and_upload_package(self, flow, environment, flow_datastore, echo):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hardening: wrap in a try/except block to deal with issues like this: https://www.joeshaw.org/python-daemon-threads-considered-harmful/

self.logger(f"Creating package for flow: {flow.name}")
environment.init_environment(echo)
for step in flow:
for deco in step.decorators:
deco.package_init(flow, step.__name__, environment)
self.blob = self._make()
self.logger(f"Initalized environment and packages for flow: {flow.name}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Initialized.

self.blob = self._create_package()
self.logger(f"Package created for flow: {flow.name}")

if flow_datastore:
self.package_url, self.package_sha = flow_datastore.save_data(
[self.blob], len_hint=1
)[0]

self._is_package_available = True
self.logger(
f"Package created and saved successfully, URL: {self.package_url}, SHA: {self.package_sha}, is_package_available: {self.is_package_available}"
)

@property
def is_package_available(self):
return self._is_package_available

def _walk(self, root, exclude_hidden=True, suffixes=None):
if suffixes is None:
Expand Down Expand Up @@ -161,7 +203,7 @@ def _add_info(self, tar):
info.mtime = 1575360000
tar.addfile(info, buf)

def _make(self):
def _create_package(self):
def no_mtime(tarinfo):
# a modification time change should not change the hash of
# the package. Only content modifications will.
Expand All @@ -181,6 +223,32 @@ def no_mtime(tarinfo):
blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts
return blob

def wait(self, timeout=DEFAULT_PACKAGE_TIMEOUT):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would return True/False. Also, quick path on checking on the bool you have about package readiness. Ideally the triple state:

  • None: no state so we wait for the thread
  • True: done -- no need to wait on anything
  • False: some error happened, no need to wait but no package will be available.

We ideally also have to find a way to report that back with wait (this three state)

"""
Wait for the package preparation and upload to complete.
Parameters
----------
timeout : int, default DEFAULT_PACKAGE_TIMEOUT
The maximum time to wait for the package preparation and upload to complete.
Returns
-------
bool
True if the package preparation and upload is complete.
Raises
------
TimeoutError
If the package preparation and upload does not complete within the specified timeout.
"""
self._init_thread.join(timeout)
if self._init_thread.is_alive():
raise TimeoutError(
f"Package preparation and upload did not complete within {timeout} seconds."
)
return True

def __str__(self):
return "<code package for flow %s (created @ %s)>" % (
self.flow_name,
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
("timeout", ".timeout_decorator.TimeoutDecorator"),
("environment", ".environment_decorator.EnvironmentDecorator"),
("secrets", ".secrets.secrets_decorator.SecretsDecorator"),
("package", ".package.package_decorator.PackageDecorator"),
("parallel", ".parallel_decorator.ParallelDecorator"),
("retry", ".retry_decorator.RetryDecorator"),
("resources", ".resources_decorator.ResourcesDecorator"),
Expand Down
21 changes: 14 additions & 7 deletions metaflow/plugins/aws/batch/batch_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ def runtime_init(self, flow, graph, package, run_id):
self.package = package
self.run_id = run_id

def runtime_task_created(
self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context
):
if not is_cloned:
self._save_package_once(self.flow_datastore, self.package)
# Saving of the package is now done in the package decorator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be able to clean up save_package_once code as well.

# def runtime_task_created(
# self, task_datastore, task_id, split_index, input_paths, is_cloned, ubf_context
# ):
# if not is_cloned:
# self._save_package_once(self.flow_datastore, self.package)

def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
Expand All @@ -227,8 +228,14 @@ def runtime_step_cli(
# to execute on AWS Batch anymore. We can execute possible fallback
# code locally.
cli_args.commands = ["batch", "step"]
cli_args.command_args.append(self.package_sha)
cli_args.command_args.append(self.package_url)
if not self.package.is_package_available:
self.logger(f"Waiting for package to be available for {self.step}...")
self.package.wait()
self.logger(
f"Package is now available for {self.step} with URL: {self.package.package_url} and SHA: {self.package.package_sha}"
)
cli_args.command_args.append(self.package.package_sha)
cli_args.command_args.append(self.package.package_url)
cli_args.command_options.update(self.attributes)
cli_args.command_options["run-time-limit"] = self.run_time_limit
if not R.use_r():
Expand Down
Empty file.
70 changes: 70 additions & 0 deletions metaflow/plugins/package/package_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now seems to be an empty hull and a lot of code can go away. I wonder if we can also move the env-vars part directly to runtime.py and get rid of this extra decorator.

import sys
import threading
import time

from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
from metaflow.package import MetaflowPackage


class PackageDecorator(StepDecorator):
name = "package"

def set_remote_step(self, decorators):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will still need something like this. Maybe the "global decorator state" is a good palce to start with this.

for deco in decorators:
if deco.name == "titus":
self.is_remote_step = True
break

def __init__(self, attributes=None, statically_defined=False):
super(PackageDecorator, self).__init__(attributes, statically_defined)
self.package = None
self.thread_output = None
self.is_remote_step = False
self.lock_acquired = False
self.package_thread = None

@classmethod
def _create_package(
cls, flow, environment, echo, package_suffixes, flow_datastore, logger
):
try:
logger(f"Creating package for flow: {flow.name}")
package = MetaflowPackage(flow, environment, echo, package_suffixes)
cls.package_url, cls.package_sha = flow_datastore.save_data(
[package.blob], len_hint=1
)[0]
logger(
f"Package created and saved successfully, URL: {cls.package_url}, SHA: {cls.package_sha}"
)
cls._package_created = True
cls.thread_output = "Package created and saved successfully."
except Exception as e:
cls.thread_output = f"Package creation failed: {str(e)}"

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
self._environment = environment
self._flow_datastore = flow_datastore
self._logger = logger
self.set_remote_step(decorators)

def runtime_init(self, flow, graph, package, run_id):
# Set some more internal state.
self.flow = flow
self.graph = graph
self.package = package
self.run_id = run_id

def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
):
if retry_count <= max_user_code_retries:
if self.package.is_package_available and not self.is_remote_step:
cli_args.env["METAFLOW_CODE_SHA"] = self.package.package_sha
cli_args.env["METAFLOW_CODE_URL"] = self.package.package_url
cli_args.env["METAFLOW_CODE_DS"] = self._flow_datastore.TYPE

self._logger(f"Updated CLI args according to PackageDecorator: {cli_args}")
3 changes: 3 additions & 0 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,9 @@ def _launch(self):
self.task.ubf_context,
)
env.update(args.get_env())
# We add another environment variable that tells us whether we are executing in the main process or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

# in a subprocess
env["METAFLOW_SUBPROCESS"] = "1"
env["PYTHONUNBUFFERED"] = "x"
tracing.inject_tracing_vars(env)
# NOTE bufsize=1 below enables line buffering which is required
Expand Down
Loading