I have a question regarding trying to define an as...
# ask-community
a
I have a question regarding trying to define an asset that is downstream of multiple fanning ops. I've been wrestling with this for days and I never seem to get anywhere.
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: op 'collect_foo_values' cannot be downstream of more than one dynamic output. It is downstream of both "get_foo_by_bar:foo_link" and "yield_bar_links:bar_link"
How are you supposed to resolve assets downstream from multiple fanning processes?
dagster bot responded by community 1
w
Can you share an example of your fanning strategy?
a
Copy code
@asset(partitions_def=monthly_site_partition_def, code_version="0.1")
def scheduled_state_partition(context) -> Output[dict]:
    partition_keys = context.partition_key.keys_by_dimension
    date_partition = partition_keys.get("date")
    state_partition = partition_keys.get("state")
    state_link = f"<https://www.website.com/sitemap/{state_partition}/>"

    return Output(
        {"state_link": state_link},
        metadata={"date_partition": date_partition, "state_partition": state_partition},
    )


@op(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=0.5,  # 500ms
        backoff=Backoff.EXPONENTIAL,
        jitter=Jitter.PLUS_MINUS,
    ),
)
def get_foo_by_bar(foo_bar_link):
    res = get_page(foo_bar_link.get("state_link"), proxy=True)
    soup = get_soup(res.text)
    grid = soup.find_all("div", {"class": "linkGrid"})
    try:
        all_links = grid[0].find_all("a", href=True)
        foo_links = [
            foo_link.get("href")
            for foo_link in all_links
            if "sitemap" in foo_link.get("href")
        ]
        return foo_links

    # if there is no length, it is likely a failed scrape. Wait 60 seconds for new proxy
    except Exception as e:
        raise RetryRequested(max_retries=3, seconds_to_wait=60) from e


@asset(
    code_version="0.1",
    partitions_def=monthly_site_partition_def,
    ins={
        "state": AssetIn(
            AssetKey("scheduled_state_partition"),
        )
    },
)
def foo_links(state):
    cities_links = get_foo_by_bar(state)

    return foo_links


@op(out={"foo_link": DynamicOut(String)})
def yield_foo_links(foo_links):
    for idx, l in enumerate(foo_links):
        yield DynamicOutput(l, mapping_key=l)


@op(
    ins={"foo_link": In()},
    out={"zipzap_link": DynamicOut(String)},
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=0.5,  # 500ms
        backoff=Backoff.EXPONENTIAL,
        jitter=Jitter.PLUS_MINUS,
    ),
)
def get_zipzap_by_foo(context, foo_link):

    res = get_page(foo_link, proxy=True)
    soup = get_soup(res.text)
    grid = soup.find_all("div", {"class": "linkGrid"})
    zipzap_links = [
        link.get("href")
        for link in grid[0].find_all("a", href=True)
        if "https" in link.get("href")
    ]
    return zipzap_links


@op
def collect_zipzap_values(val):
    return val


@graph
def zipzap_links_graph(foo_links):
    zipzap_links = yield_city_links(foo_links).map(get_zipzap_by_foo)

    return collect_zipzap_values(zipzap_links.collect())


zipzap_links = AssetsDefinition.from_graph(
    zipzap_links_graph, partitions_def=monthly_site_partition_def
)
w
Is
foo_links
supposed to return itself?
Or
cities_links
?
The simplest strategy is to switch your ops to assets.
a
If I do that though, I can’t rerun or retry failed operations as easily. I don’t think they are. I may have renamed some stuff in this example badly.
It’s parsing through a nested sitemap that branches into other links, ideally ending in a list of links that I then go parse each page.
I’ve never seen anyone manage a scraper with dagster - it’s always incredibly simple api or pre-existing data sets.
w
I'm scraping through Dagster
My strategy is recursive 1. scrape a known page [asset] and write links to datastore 2. sensor listens on datastore, yields a run request (asset materialization from step 1) for each link In practice the execution is much more complex but assets and sensors allow me to monitor and rerun on failure trivially. I'm paying the cost in terms of increased parallelization overhead.
a
I’ll try this style. Thanks!