Andrew
01/13/2023, 12:32 AMdagster._core.errors.DagsterInvalidDefinitionError: op 'collect_foo_values' cannot be downstream of more than one dynamic output. It is downstream of both "get_foo_by_bar:foo_link" and "yield_bar_links:bar_link"
How are you supposed to resolve assets downstream from multiple fanning processes?Will Holley
01/13/2023, 12:38 AMAndrew
01/13/2023, 12:49 AM@asset(partitions_def=monthly_site_partition_def, code_version="0.1")
def scheduled_state_partition(context) -> Output[dict]:
partition_keys = context.partition_key.keys_by_dimension
date_partition = partition_keys.get("date")
state_partition = partition_keys.get("state")
state_link = f"<https://www.website.com/sitemap/{state_partition}/>"
return Output(
{"state_link": state_link},
metadata={"date_partition": date_partition, "state_partition": state_partition},
)
@op(
retry_policy=RetryPolicy(
max_retries=3,
delay=0.5, # 500ms
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
),
)
def get_foo_by_bar(foo_bar_link):
res = get_page(foo_bar_link.get("state_link"), proxy=True)
soup = get_soup(res.text)
grid = soup.find_all("div", {"class": "linkGrid"})
try:
all_links = grid[0].find_all("a", href=True)
foo_links = [
foo_link.get("href")
for foo_link in all_links
if "sitemap" in foo_link.get("href")
]
return foo_links
# if there is no length, it is likely a failed scrape. Wait 60 seconds for new proxy
except Exception as e:
raise RetryRequested(max_retries=3, seconds_to_wait=60) from e
@asset(
code_version="0.1",
partitions_def=monthly_site_partition_def,
ins={
"state": AssetIn(
AssetKey("scheduled_state_partition"),
)
},
)
def foo_links(state):
cities_links = get_foo_by_bar(state)
return foo_links
@op(out={"foo_link": DynamicOut(String)})
def yield_foo_links(foo_links):
for idx, l in enumerate(foo_links):
yield DynamicOutput(l, mapping_key=l)
@op(
ins={"foo_link": In()},
out={"zipzap_link": DynamicOut(String)},
retry_policy=RetryPolicy(
max_retries=3,
delay=0.5, # 500ms
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
),
)
def get_zipzap_by_foo(context, foo_link):
res = get_page(foo_link, proxy=True)
soup = get_soup(res.text)
grid = soup.find_all("div", {"class": "linkGrid"})
zipzap_links = [
link.get("href")
for link in grid[0].find_all("a", href=True)
if "https" in link.get("href")
]
return zipzap_links
@op
def collect_zipzap_values(val):
return val
@graph
def zipzap_links_graph(foo_links):
zipzap_links = yield_city_links(foo_links).map(get_zipzap_by_foo)
return collect_zipzap_values(zipzap_links.collect())
zipzap_links = AssetsDefinition.from_graph(
zipzap_links_graph, partitions_def=monthly_site_partition_def
)
Will Holley
01/13/2023, 12:56 AMfoo_links
supposed to return itself?cities_links
?Andrew
01/13/2023, 1:04 AMWill Holley
01/13/2023, 1:36 AMAndrew
01/13/2023, 2:08 AM