https://dagster.io/ logo
#dagster-support
Title
# dagster-support
a

Antonio Bertino

07/30/2022, 2:21 PM
Hello guys. Is there a way to retrieve a job output through a sensor? I want to retrieve some op outputs and use it in a job that will be triggered by the sensor.
o

owen

08/01/2022, 4:39 PM
hi @Antonio Bertino, are you using a custom io manager here or the built in filesystem io manager? also, it sounds like you just need the job output to be loaded in the downstream job (and the output doesn't need to be available within the sensor), is that right? If you're using the filesystem io manager, the path that a given output is stored in is
"<base path>/<run id>/<step key>/<output name>"
, so it is possible to reconstruct this path at runtime and load from it (even if it's a bit of a pain). In this case, you can pass in the upstream run id as configuration when launching your downstream job from your run status sensor. The step key will be the name of the step whose output you want to load (this is generally just the same as the name of the op), and the output name will be "result" by default. You could do this using a root input manager, or just do that loading within the body of an op.
a

Antonio Bertino

08/01/2022, 8:14 PM
Thank you @owen!! I'll try that
r

Roei Jacobovich

08/02/2022, 7:16 AM
I’ll jump on the thread as that’s the same topic - is there a way to use GraphQL to retrieve the same information (from Dagit)? Thanks!!
g

Gabriel Montañola

08/02/2022, 6:29 PM
hi there @owen Since we're using K8s here (and we're going to use Celery executor too) I talked with @Antonio Bertino about using
s3_io_manager
. Is there any examples of how can we load the result of a past job run on another one? Ex: Job A -> outputs a pretty pandas dataframe Job B -> loads this pretty panda dataframe from s3 (AFAIK it's pickled) and do other stuff
o

owen

08/02/2022, 8:23 PM
hi @Gabriel Montañola, I think the most straightforward route would be to create a root input manager that is inspired by the s3_io_manager's load input function: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py#L68. You'll need to replace the
_get_path
function to create the correct path, and this input manager will need to have a config schema that allows you to input the run id of the upstream output that you want to load (because the path depends on that run id)
❤️ 2
and @Roei Jacobovich, there's no way to load these output values via graphQL (as that layer doesn't have access to the serialized output files produced by these steps)
g

Gabriel Montañola

08/02/2022, 8:25 PM
thanks @owen, you're awesome! i was pairing with @Antonio Bertino and this is where we're heading.
make a custom root_input_manager using code from *s3 io manage*r 🙂
🎉 2
a

Antonio Bertino

08/03/2022, 3:57 AM
Thanks, @owen!! It worked beautifully. We've managed to create a custom Input Manager, based on InputManager, that receives a run_id from resource config and step_key from ops inputs config schema. With that in hands, we managed to rebuild a s3 bucket path and do all the work 🙂 Dagster is amazing
dagster spin 1
🌈 1
D 1
g

Gabriel Montañola

08/03/2022, 12:12 PM
oh and @owen we used InputManager because while reading the code for RootInputManager I noticed a deprecation warning. https://github.com/dagster-io/dagster/blob/769cc7a3369ce49545a5f30d304a970a75ecff4[…]hon_modules/dagster/dagster/_core/storage/root_input_manager.py
o

owen

08/03/2022, 4:51 PM
Awesome! And good catch on the input manager (even I can't keep up with all the new stuff sometimes 😛)
🎉 2
3 Views