-
Notifications
You must be signed in to change notification settings - Fork 753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC for package decorator #1982
base: master
Are you sure you want to change the base?
Conversation
f55c077
to
0d33b88
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Haven't played with it but I think it can be cleaned up a tad as well.
@@ -969,6 +975,10 @@ def start( | |||
ctx.obj.monitor.start() | |||
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor) | |||
|
|||
# Add package suffixes and echo to the flow object as they are used by decorators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not needed anymore since you pass it to the MetaflowPackage constructor.
@@ -830,8 +830,14 @@ def before_run(obj, tags, decospecs): | |||
# Package working directory only once per run. | |||
# We explicitly avoid doing this in `start` since it is invoked for every | |||
# step in the run. | |||
# Update package to be None. The package is now created in the @package decorator in a separate thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment needs updating.
environment.init_environment(echo) | ||
for step in flow: | ||
for deco in step.decorators: | ||
deco.package_init(flow, step.__name__, environment) | ||
self.blob = self._make() | ||
self.logger(f"Initalized environment and packages for flow: {flow.name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Initialized.
@@ -1552,6 +1552,9 @@ def _launch(self): | |||
self.task.ubf_context, | |||
) | |||
env.update(args.get_env()) | |||
# We add another environment variable that tells us whether we are executing in the main process or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed?
@@ -0,0 +1,70 @@ | |||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now seems to be an empty hull and a lot of code can go away. I wonder if we can also move the env-vars part directly to runtime.py and get rid of this extra decorator.
self._init_thread.daemon = True | ||
self._init_thread.start() | ||
|
||
def _prepare_and_upload_package(self, flow, environment, flow_datastore, echo): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hardening: wrap in a try/except block to deal with issues like this: https://www.joeshaw.org/python-daemon-threads-considered-harmful/
@@ -181,6 +223,32 @@ def no_mtime(tarinfo): | |||
blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts | |||
return blob | |||
|
|||
def wait(self, timeout=DEFAULT_PACKAGE_TIMEOUT): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would return True/False. Also, quick path on checking on the bool you have about package readiness. Ideally the triple state:
- None: no state so we wait for the thread
- True: done -- no need to wait on anything
- False: some error happened, no need to wait but no package will be available.
We ideally also have to find a way to report that back with wait (this three state)
): | ||
if not is_cloned: | ||
self._save_package_once(self.flow_datastore, self.package) | ||
# Saving of the package is now done in the package decorator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be able to clean up save_package_once code as well.
class PackageDecorator(StepDecorator): | ||
name = "package" | ||
|
||
def set_remote_step(self, decorators): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will still need something like this. Maybe the "global decorator state" is a good palce to start with this.
Adds a new package decorator to package all metaflow runs, including local ones. Additionally, this POC also demonstrates how we can move the logic for packaging/uploading packages outside the critical path of execution, and make it non-blocking by executing it in a thread.