Skip to content

Commit

Permalink
Move packaging logic from decorator to runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Aug 26, 2024
1 parent 6b014ff commit 0d33b88
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 111 deletions.
9 changes: 8 additions & 1 deletion metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,14 @@ def before_run(obj, tags, decospecs):
# We explicitly avoid doing this in `start` since it is invoked for every
# step in the run.
# Update package to be None. The package is now created in the @package decorator in a separate thread
obj.package = None
obj.package = MetaflowPackage(
obj.flow,
obj.environment,
obj.echo,
obj.package_suffixes,
obj.flow_datastore,
obj.logger,
)


@cli.command(help="Print the Metaflow version")
Expand Down
1 change: 1 addition & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DEFAULT_METADATA = from_conf("DEFAULT_METADATA", "local")
DEFAULT_MONITOR = from_conf("DEFAULT_MONITOR", "nullSidecarMonitor")
DEFAULT_PACKAGE_SUFFIXES = from_conf("DEFAULT_PACKAGE_SUFFIXES", ".py,.R,.RDS")
DEFAULT_PACKAGE_TIMEOUT = from_conf("DEFAULT_PACKAGE_TIMEOUT", 600)
DEFAULT_AWS_CLIENT_PROVIDER = from_conf("DEFAULT_AWS_CLIENT_PROVIDER", "boto3")
DEFAULT_AZURE_CLIENT_PROVIDER = from_conf(
"DEFAULT_AZURE_CLIENT_PROVIDER", "azure-default"
Expand Down
76 changes: 72 additions & 4 deletions metaflow/package.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import importlib
import os
import sys
Expand All @@ -7,7 +8,7 @@
from io import BytesIO

from .extension_support import EXT_PKG, package_mfext_all
from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES
from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES, DEFAULT_PACKAGE_TIMEOUT
from .exception import MetaflowException
from .util import to_unicode
from . import R, INFO_FILE
Expand Down Expand Up @@ -59,19 +60,60 @@ def _recurse(root):


class MetaflowPackage(object):
def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST):
def __init__(
self,
flow,
environment,
echo,
suffixes=DEFAULT_SUFFIXES_LIST,
flow_datastore=None,
logger=None,
):
self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST))
self.environment = environment
self.metaflow_root = os.path.dirname(__file__)

self.flow_name = flow.name
self._flow = flow
self.flow_datastore = flow_datastore
self.logger = logger
self.create_time = time.time()
self._is_package_available = False
self.blob = None
self.package_url = None
self.package_sha = None

# Make package creation and upload asynchronous
self._init_thread = threading.Thread(
target=self._prepare_and_upload_package,
args=(flow, environment, flow_datastore, echo),
)
self._init_thread.daemon = True
self._init_thread.start()

def _prepare_and_upload_package(self, flow, environment, flow_datastore, echo):
self.logger(f"Creating package for flow: {flow.name}")
environment.init_environment(echo)
for step in flow:
for deco in step.decorators:
deco.package_init(flow, step.__name__, environment)
self.blob = self._make()
self.logger(f"Initalized environment and packages for flow: {flow.name}")
self.blob = self._create_package()
self.logger(f"Package created for flow: {flow.name}")

if flow_datastore:
self.package_url, self.package_sha = flow_datastore.save_data(
[self.blob], len_hint=1
)[0]

self._is_package_available = True
self.logger(
f"Package created and saved successfully, URL: {self.package_url}, SHA: {self.package_sha}, is_package_available: {self.is_package_available}"
)

@property
def is_package_available(self):
return self._is_package_available

def _walk(self, root, exclude_hidden=True, suffixes=None):
if suffixes is None:
Expand Down Expand Up @@ -161,7 +203,7 @@ def _add_info(self, tar):
info.mtime = 1575360000
tar.addfile(info, buf)

def _make(self):
def _create_package(self):
def no_mtime(tarinfo):
# a modification time change should not change the hash of
# the package. Only content modifications will.
Expand All @@ -181,6 +223,32 @@ def no_mtime(tarinfo):
blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts
return blob

def wait(self, timeout=DEFAULT_PACKAGE_TIMEOUT):
"""
Wait for the package preparation and upload to complete.
Parameters
----------
timeout : int, default DEFAULT_PACKAGE_TIMEOUT
The maximum time to wait for the package preparation and upload to complete.
Returns
-------
bool
True if the package preparation and upload is complete.
Raises
------
TimeoutError
If the package preparation and upload does not complete within the specified timeout.
"""
self._init_thread.join(timeout)
if self._init_thread.is_alive():
raise TimeoutError(
f"Package preparation and upload did not complete within {timeout} seconds."
)
return True

def __str__(self):
return "<code package for flow %s (created @ %s)>" % (
self.flow_name,
Expand Down
14 changes: 9 additions & 5 deletions metaflow/plugins/aws/batch/batch_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ def runtime_init(self, flow, graph, package, run_id):
# Set some more internal state.
self.flow = flow
self.graph = graph
# Package is provided by the package decorator
# self.package = package
self.package = package
self.run_id = run_id

# Saving of the package is now done in the package decorator
Expand All @@ -229,9 +228,14 @@ def runtime_step_cli(
# to execute on AWS Batch anymore. We can execute possible fallback
# code locally.
cli_args.commands = ["batch", "step"]
# The package_url and package_sha are now added by the package decorator for that step
# cli_args.command_args.append(self.package_sha)
# cli_args.command_args.append(self.package_url)
if not self.package.is_package_available:
self.logger(f"Waiting for package to be available for {self.step}...")
self.package.wait()
self.logger(
f"Package is now available for {self.step} with URL: {self.package.package_url} and SHA: {self.package.package_sha}"
)
cli_args.command_args.append(self.package.package_sha)
cli_args.command_args.append(self.package.package_url)
cli_args.command_options.update(self.attributes)
cli_args.command_options["run-time-limit"] = self.run_time_limit
if not R.use_r():
Expand Down
111 changes: 10 additions & 101 deletions metaflow/plugins/package/package_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,6 @@

class PackageDecorator(StepDecorator):
name = "package"
# Instance variables as they need to be shared across all instances of the decorator
# in the main/initial process.
package_url = None
package_sha = None
_lock = threading.Lock()
_package_created = False

@staticmethod
def _is_executing_remotely():
return os.environ.get("TITUS_TASK_ID", None) is not None

@staticmethod
def _is_subprocess():
return os.environ.get("METAFLOW_SUBPROCESS", "0") == "1"

def set_remote_step(self, decorators):
for deco in decorators:
Expand Down Expand Up @@ -60,102 +46,25 @@ def _create_package(
def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
self._flow_datastore = flow_datastore
self._environment = environment
self._flow_datastore = flow_datastore
self._logger = logger
self.package_suffixes = self.attributes.get(
"package_suffixes", flow.package_suffixes
)
self.set_remote_step(decorators)

self._logger("-" * 100)
self._logger(
f"Flow: {flow.name}, Step: {step_name}, Package suffixes: {self.package_suffixes}"
)
self._logger(
f"Is subprocess: {self._is_subprocess()}, Is remote step: {self.is_remote_step}, Is executing remotely: {self._is_executing_remotely()}"
)
if not self._is_subprocess() and not self._is_executing_remotely():
if self._lock.acquire(blocking=False):
self.lock_acquired = True
self._logger(f"{step_name} acquired lock.")
if not self._package_created:
self._logger(f"{step_name} is creating package.")
self.package_thread = threading.Thread(
target=self._create_package,
args=(
flow,
environment,
flow.echo,
self.package_suffixes,
flow_datastore,
self._logger,
),
)
self.package_thread.start()
else:
self._logger(f"{step_name} found package already created.")
else:
self._logger(
f"{step_name} couldn't acquire lock. Another thread is already packaging the flow."
)
def runtime_init(self, flow, graph, package, run_id):
# Set some more internal state.
self.flow = flow
self.graph = graph
self.package = package
self.run_id = run_id

def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
):
self._logger("-" * 100)
self._logger(f"PackageDecorator, Runtime step CLI: {cli_args}")
step_name = cli_args.command_args[-1]
self._logger(
f"Is subprocess: {self._is_subprocess()}, Step Name: {step_name}, requires_package: {self.is_remote_step}"
)
self._logger(
f"Package URL: {self.package_url}, Package SHA: {self.package_sha}"
)

if retry_count <= max_user_code_retries:
max_wait_time = 300
wait_interval = 5
total_wait_time = 0

if self.is_remote_step:
while (
not self.package_url or not self.package_sha
) and total_wait_time < max_wait_time:
time.sleep(wait_interval)
total_wait_time += wait_interval
self._logger(
f"Waiting for package to be created... ({total_wait_time}s)"
)

if not self.package_url or not self.package_sha:
raise MetaflowException(
"Package creation failed. Please check the logs for more information."
)

self._logger(
f"After Waiting Package URL: {self.package_url}, Package SHA: {self.package_sha}"
)
if self.package_url and self.package_sha:
cli_args.env["METAFLOW_CODE_SHA"] = self.package_sha
cli_args.env["METAFLOW_CODE_URL"] = self.package_url
if self.package.is_package_available and not self.is_remote_step:
cli_args.env["METAFLOW_CODE_SHA"] = self.package.package_sha
cli_args.env["METAFLOW_CODE_URL"] = self.package.package_url
cli_args.env["METAFLOW_CODE_DS"] = self._flow_datastore.TYPE
if self.is_remote_step:
cli_args.command_args.append(self.package_sha)
cli_args.command_args.append(self.package_url)

self._logger(f"Updated CLI args according to PackageDecorator: {cli_args}")
self._logger("-" * 100)

def task_finished(
self, step_name, flow, graph, is_task_ok, retry_count, max_user_code_retries
):
# Join only after the task is finished, so that other decorators do not
# acquire the lock and start creating the package again.
if self.package_thread:
self.package_thread.join()
print(f"Package thread joined. Output: {self.thread_output}")

if self.lock_acquired:
self._lock.release()
print(f"{step_name} released lock in task_finished.")

0 comments on commit 0d33b88

Please sign in to comment.