Joe Williamson
03/19/2023, 6:24 PM@op(
out= {
"times" : DynamicOut(),
"urls" : DynamicOut()
}
)
def get Stuff():
# Scrape stuff
for idx, row in enumerate(table_rows):
# Just feed in url here and say if available for certain date (today -> then yield)
yield DynamicOutput(time, output_name = "times", mapping_key=f"times_{idx}")
yield DynamicOutput(url, output_name = "urls", mapping_key=f"urls_{idx}")
@op
def do_Stuff(url):
# Just generate datasets then plots from url we got above
@job
def materialize_Stuff():
time, url = get_Info()
url.map(do_Stuff)
Issue is seemingly that last step - I get KeyError: 'times'. Assumed at first I had just mistyped in that mapping_name but not seemingly it. If I delete everything related to that times aspect then it runs fine. But I can't seem to unpack at that last stage. I've printed the outputs in that first get_Info definition and can see it's printing the correct info. I've also tried calling it other things (in case it clashes with any dagster used terms) but that doesn't work either.
Also, does that map function only work with single variables? I.e. I couldn't make that 2nd function (do_Stuff) have 2 dynamic inputs which I can pass through in that last stage?
But thanks in advance for help anyone can give.jamie
03/20/2023, 2:29 PMget_stuff
that is a tuple of the time and the url. Something like this should work
@op(
out= {
"data" : DynamicOut(),
}
)
def get_stuff():
# Scrape stuff
for idx, row in enumerate(table_rows):
# Just feed in url here and say if available for certain date (today -> then yield)
yield DynamicOutput((time, url), output_name = "data", mapping_key=f"data{idx}")
@op
def do_Stuff(data):
time, url = data # unpacks the input into the time and url pieces
# Just generate datasets then plots from url we got above
@job
def materialize_Stuff():
data = get_Info()
data.map(do_Stuff)
here’s a full toy code sample that does the above that you can run to see it working
from dagster import op, job, DynamicOut, DynamicOutput
@op(
out={
"data": DynamicOut()
}
)
def top_level():
for i in range(10):
yield DynamicOutput((i, -i), output_name="data", mapping_key=f"data_{i}")
@op
def process(data):
pos, neg = data
return pos * neg
@job
def my_job():
data = top_level()
data.map(process)
Joe Williamson
03/20/2023, 5:59 PM