Mykola Palamarchuk
06/20/2022, 10:46 AM@graph
def get_and_persist_page(page_idx):
page_data = get_page(page_idx) # from external API
persist_original(page_idx, page_data) # persist to track history
return to_dataframe(page_idx, page_data) # convert to Pandas DataFrame
@graph
def my_job_graph:
page_indexes = get_page_indexes() # DynamicOutput, get list of pages from external API
list_of_dfs = pages.map(get_and_persist_page).collect()
combined_df = combine(list_of_dfs)
save_to_warehouse(combined_df)
It works this way. I like that it is possible to configure retry-policy for "get_page" op. But I'm curious if there is another elegant way to do this?
What bothers me:
• I don't know how to define type for the list of Pandas DataFrames produced by collect()
• It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?Dmitry Mogilevsky
06/20/2022, 11:33 AMcombine
function, in my experience List[pd.DataFrame]
is interpreted correctly and passes dagster's internal type checks. (Not a member of dagster team, just trying to be helpful)Mykola Palamarchuk
06/20/2022, 12:53 PMcreate_dagster_pandas_dataframe_type()
Dmitry Mogilevsky
06/20/2022, 12:54 PMsandy
06/20/2022, 4:51 PMfrom dagster import op, DagsterType, job, DynamicOut, DynamicOutput, List, In
EvenDagsterType = DagsterType(
name="EvenDagsterType",
type_check_fn=lambda _, value: isinstance(value, int) and value % 2 == 0,
)
@op(out=DynamicOut())
def start():
yield DynamicOutput(2, mapping_key="a")
yield DynamicOutput(4, mapping_key="b")
@op
def middle(x):
return x
@op(ins={"x": In(dagster_type=List[EvenDagsterType])})
def combine(x):
...
@job
def my_job():
combine(start().map(middle).collect())
• It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?Do you have thoughts on what this would ideally look like?
Mykola Palamarchuk
06/20/2022, 5:53 PMIn(dagster_type=List[SomeDagsterType])
. I'm having this error:
UserWarning: Error loading repository location repo.py:TypeError: Parameters to generic types must be types. Got <dagster.core.types.dagster_type.DagsterType object at 0x7f4994fff490>.
I'm using Dagster type generated by create_dagster_pandas_dataframe_type()
2.
Do you have thoughts on what this would ideally look like?I'd like there was a possibility to fold/unfold timeline view for dynamic graphs somehow. It does not make a lot of sense to show them all when running on a large sets of chunks.
sandy
06/20/2022, 5:55 PMList
in your example imported from the typing
module? the code I pasted will only work if you're using List
that's imported from Dagster. also, what version of Python are you doing?Mykola Palamarchuk
06/20/2022, 5:57 PMList
from Dagster module! Thanks!dict
with mapping_keys as indexes, e.g.
data_as_dict = start().map(process).collect_as_dict()
• Reduce boilerplate for plain list DynamicOutputs.
At the moment I have to do something like this:
@op(out=DynamicOut())
def start() -> List[DynamicOutput[str]]:
items = ["a", "b", "c"]
return [DynamicOutput(item, mapping_key=item) for item in items]
That could probably be improved somehow like this:
@op
def start() -> DynamicListOutput[str]:
items = ["a", "b", "c"]
return DynamicListOutput(items) # default mapping_key is item itself
And the same may apply for DynamicDictOutput, where mapping key is a dict index itself.sandy
06/20/2022, 6:56 PM