Stefan Samba
08/04/2022, 9:37 AM@op
elements it’s easy to get serial (link).
In my specific case I’m looking for:
• getting serial when running .ipynb files
For example:
import dagstermill as dm
from dagster import job
download_data = dm.define_dagstermill_op(
"download_data",
notebook_path=("download_data.ipynb"),
output_notebook_name="download_data_output",
)
prepare_data = dm.define_dagstermill_op(
"prepare_data",
notebook_path=("prepare_data.ipynb"),
output_notebook_name="prepare_data_output",
)
@job(
resource_defs={
"output_notebook_io_manager": dm.local_output_notebook_io_manager,
}
)
def dagster_main():
download_data()
prepare_data(download_data)
This is small example for illustrational purposes. The last line will not work because prepare_data can’t take any arguments. It just for illustrational purposes to show that prepare data will depend on download_data.
Would it be possible to make this serial in some way?
Q2
For ops I can see it’s possible to get a dataflow going from one step to another. Would that dataflow be possible when working with ipynb files? I can imagine this is a challenge as a ipynb file is not returning a value. And ideas here?owen
08/04/2022, 4:26 PMdefine_dagstermill_op
, using the ins
and out
arguments, which I think is what you want here. For more info on passing output data from an op, see: https://docs.dagster.io/integrations/dagstermill#results-and-custom-materializationsStefan Samba
08/05/2022, 2:26 PMfrom dagster import In, Out, job
from dagster.utils import script_relative_path
import dagstermill as dm
load_data = dm.define_dagstermill_op(
"load_data",
notebook_path=script_relative_path("load_data.ipynb"),
outs={"numbers": Out(list, description="list of numbers")}
)
read_data = dm.define_dagstermill_op(
"read_data",
notebook_path=script_relative_path("read_data.ipynb"),
output_notebook_name="read_data_output",
ins={"load_data_output": In(list, description="test data", default_value=[9,9,9])}
)
@job(
resource_defs={
"output_notebook_io_manager": dm.local_output_notebook_io_manager,
}
)
def main_job():
result = load_data()
read_data(result)
1. load_data.ipynb
import dagstermill as dm
result = [1,2,3,4]
context = dm.get_context().<http://log.info|log.info>(f"Load_data - results: {result}")
# Yield the results and give it the name "numbers"
# this should allow passing the data downstream to a next step
dm.yield_result(result, "numbers")
2. read_data.ipynb
import dagstermill as dm
# Logs an dummy string
dm.get_context().<http://log.info|log.info>("Logging works here")
# How to load/retrieve the numbers result from previous notebook?
# context = dm.get_context()
claire
08/10/2022, 11:43 PMread_data = dm.define_dagstermill_op(
....
ins={"numbers": In()},
)
##### IPYNB
import dagstermill as dm
dm.get_context().<http://log.info|log.info>(numbers)
Stefan Samba
08/11/2022, 7:40 AMclaire
08/11/2022, 5:03 PM