https://dagster.io/ logo
#ask-community
Title
# ask-community
j

Joe Williamson

03/19/2023, 6:24 PM
Hi, I'm a complete newcomer when it comes to this stuff so apologies in advance if the question is kinda trivial/stupid. Trying not to provide too much filler, I want to do the following (basically a football Twitter bot): • Scrape info from site at schedule and get today's matches -> * Do stuff with data * -> Connect to Twitter API and post etc. The issue atm is I need 2 outputs from that 1st stage (url + time), and I can only seem to get it to work with 1 at the minute. Looking through the docs, it seems using dynamic graphs was the way to go (but happy to be told otherwise). I've tried to simplify the code below (otherwise just too long)
@op(
out= {
"times" : DynamicOut(),
"urls"  : DynamicOut()
}
)
def get Stuff():
# Scrape stuff
for idx, row in enumerate(table_rows):
# Just feed in url here and say if available for certain date (today -> then yield)
yield DynamicOutput(time, output_name = "times", mapping_key=f"times_{idx}")
yield DynamicOutput(url, output_name = "urls", mapping_key=f"urls_{idx}")
@op
def do_Stuff(url):
# Just generate datasets then plots from url we got above
@job
def materialize_Stuff():
time, url = get_Info()
url.map(do_Stuff)
Issue is seemingly that last step - I get KeyError: 'times'. Assumed at first I had just mistyped in that mapping_name but not seemingly it. If I delete everything related to that times aspect then it runs fine. But I can't seem to unpack at that last stage. I've printed the outputs in that first get_Info definition and can see it's printing the correct info. I've also tried calling it other things (in case it clashes with any dagster used terms) but that doesn't work either. Also, does that map function only work with single variables? I.e. I couldn't make that 2nd function (do_Stuff) have 2 dynamic inputs which I can pass through in that last stage? But thanks in advance for help anyone can give.
j

jamie

03/20/2023, 2:29 PM
Hey @Joe Williamson this is achievable! Right now dagster doesn’t support mapping two dynamic outputs into a single op. Basically, we’d have to determine which entries of the first dynamic output get passed with which entries of the second dynamic output (do we zip them together? do some kind of cross product? etc). This is something we’ve been working on a bit on and off, but we haven’t had time to make it a top priority. However, based on your use case and code example, I assume you want to do the “zip” method, which we can achieve by returning a single dynamic output from
get_stuff
that is a tuple of the time and the url. Something like this should work
Copy code
@op(
        out= {
                "data" : DynamicOut(), 
        }
)
def get_stuff():
    # Scrape stuff
    for idx, row in enumerate(table_rows):
          # Just feed in url here and say if available for certain date (today -> then yield)
          yield DynamicOutput((time, url), output_name = "data", mapping_key=f"data{idx}")

@op 
def do_Stuff(data):
    time, url = data # unpacks the input into the time and url pieces
    # Just generate datasets then plots from url we got above

@job
def materialize_Stuff():
    data = get_Info()
    data.map(do_Stuff)
here’s a full toy code sample that does the above that you can run to see it working
Copy code
from dagster import op, job, DynamicOut, DynamicOutput

@op(
        out={
            "data": DynamicOut()
        }
)
def top_level():
    for i in range(10):
        yield DynamicOutput((i, -i), output_name="data", mapping_key=f"data_{i}")

@op
def process(data):
    pos, neg = data
    return pos * neg

@job
def my_job():
    data = top_level()
    data.map(process)
j

Joe Williamson

03/20/2023, 5:59 PM
Thanks Jamie, this works perfectly. Appreciate your help 😀
3 Views