Hello guys. Is there a way to retrieve a job outpu...
# ask-community
a
Hello guys. Is there a way to retrieve a job output through a sensor? I want to retrieve some op outputs and use it in a job that will be triggered by the sensor.
o
hi @Antonio Bertino, are you using a custom io manager here or the built in filesystem io manager? also, it sounds like you just need the job output to be loaded in the downstream job (and the output doesn't need to be available within the sensor), is that right? If you're using the filesystem io manager, the path that a given output is stored in is
"<base path>/<run id>/<step key>/<output name>"
, so it is possible to reconstruct this path at runtime and load from it (even if it's a bit of a pain). In this case, you can pass in the upstream run id as configuration when launching your downstream job from your run status sensor. The step key will be the name of the step whose output you want to load (this is generally just the same as the name of the op), and the output name will be "result" by default. You could do this using a root input manager, or just do that loading within the body of an op.
a
Thank you @owen!! I'll try that
r
I’ll jump on the thread as that’s the same topic - is there a way to use GraphQL to retrieve the same information (from Dagit)? Thanks!!
g
hi there @owen Since we're using K8s here (and we're going to use Celery executor too) I talked with @Antonio Bertino about using
s3_io_manager
. Is there any examples of how can we load the result of a past job run on another one? Ex: Job A -> outputs a pretty pandas dataframe Job B -> loads this pretty panda dataframe from s3 (AFAIK it's pickled) and do other stuff
o
hi @Gabriel Montañola, I think the most straightforward route would be to create a root input manager that is inspired by the s3_io_manager's load input function: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py#L68. You'll need to replace the
_get_path
function to create the correct path, and this input manager will need to have a config schema that allows you to input the run id of the upstream output that you want to load (because the path depends on that run id)
❤️ 2
and @Roei Jacobovich, there's no way to load these output values via graphQL (as that layer doesn't have access to the serialized output files produced by these steps)
g
thanks @owen, you're awesome! i was pairing with @Antonio Bertino and this is where we're heading.
make a custom root_input_manager using code from *s3 io manage*r 🙂
🎉 2
a
Thanks, @owen!! It worked beautifully. We've managed to create a custom Input Manager, based on InputManager, that receives a run_id from resource config and step_key from ops inputs config schema. With that in hands, we managed to rebuild a s3 bucket path and do all the work 🙂 Dagster is amazing
dagster spin 1
🌈 1
D 1
g
oh and @owen we used InputManager because while reading the code for RootInputManager I noticed a deprecation warning. https://github.com/dagster-io/dagster/blob/769cc7a3369ce49545a5f30d304a970a75ecff4[…]hon_modules/dagster/dagster/_core/storage/root_input_manager.py
o
Awesome! And good catch on the input manager (even I can't keep up with all the new stuff sometimes 😛)
🎉 2
139 Views