https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Mykola Palamarchuk

06/20/2022, 10:46 AM
Hi team! We have a pipeline to grab a lot of paged data, e.g.:
Copy code
@graph
def get_and_persist_page(page_idx):
  page_data = get_page(page_idx)  # from external API
  persist_original(page_idx, page_data)  # persist to track history 
  return to_dataframe(page_idx, page_data)  # convert to Pandas DataFrame

@graph
def my_job_graph:
  page_indexes = get_page_indexes()  # DynamicOutput, get list of pages from external API
  list_of_dfs = pages.map(get_and_persist_page).collect()
  combined_df = combine(list_of_dfs)
  save_to_warehouse(combined_df)
It works this way. I like that it is possible to configure retry-policy for "get_page" op. But I'm curious if there is another elegant way to do this? What bothers me: • I don't know how to define type for the list of Pandas DataFrames produced by
collect()
• It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?
🤖 1
d

Dmitry Mogilevsky

06/20/2022, 11:33 AM
Mykola, if you're asking about the appropriate type hint for the
combine
function, in my experience
List[pd.DataFrame]
is interpreted correctly and passes dagster's internal type checks. (Not a member of dagster team, just trying to be helpful)
m

Mykola Palamarchuk

06/20/2022, 12:53 PM
@Dmitry Mogilevsky, thanks, that's probably the best idea in this case! But I was curious about combining Dagster types and List, e.g. type generated by
create_dagster_pandas_dataframe_type()
d

Dmitry Mogilevsky

06/20/2022, 12:54 PM
Ah yes, I haven't gotten around to trying that yet.
s

sandy

06/20/2022, 4:51 PM
hey @Mykola Palamarchuk - you should be able to do something like:
Copy code
from dagster import op, DagsterType, job, DynamicOut, DynamicOutput, List, In

EvenDagsterType = DagsterType(
    name="EvenDagsterType",
    type_check_fn=lambda _, value: isinstance(value, int) and value % 2 == 0,
)


@op(out=DynamicOut())
def start():
    yield DynamicOutput(2, mapping_key="a")
    yield DynamicOutput(4, mapping_key="b")


@op
def middle(x):
    return x


@op(ins={"x": In(dagster_type=List[EvenDagsterType])})
def combine(x):
    ...


@job
def my_job():
    combine(start().map(middle).collect())
• It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?
Do you have thoughts on what this would ideally look like?
m

Mykola Palamarchuk

06/20/2022, 5:53 PM
@sandy, thanks! 1. I've just checked
In(dagster_type=List[SomeDagsterType])
. I'm having this error:
Copy code
UserWarning: Error loading repository location repo.py:TypeError: Parameters to generic types must be types. Got <dagster.core.types.dagster_type.DagsterType object at 0x7f4994fff490>.
I'm using Dagster type generated by
create_dagster_pandas_dataframe_type()
2.
Do you have thoughts on what this would ideally look like?
I'd like there was a possibility to fold/unfold timeline view for dynamic graphs somehow. It does not make a lot of sense to show them all when running on a large sets of chunks.
s

sandy

06/20/2022, 5:55 PM
is the
List
in your example imported from the
typing
module? the code I pasted will only work if you're using
List
that's imported from Dagster. also, what version of Python are you doing?
m

Mykola Palamarchuk

06/20/2022, 5:57 PM
Oh, that works with
List
from Dagster module! Thanks!
@sandy, btw, I have some ideas about improvements around dynamic outputs. • Make it possible to collect results as
dict
with mapping_keys as indexes, e.g.
Copy code
data_as_dict = start().map(process).collect_as_dict()
• Reduce boilerplate for plain list DynamicOutputs. At the moment I have to do something like this:
Copy code
@op(out=DynamicOut())
def start() -> List[DynamicOutput[str]]:
  items = ["a", "b", "c"]
  return [DynamicOutput(item, mapping_key=item) for item in items]
That could probably be improved somehow like this:
Copy code
@op
def start() -> DynamicListOutput[str]:
  items = ["a", "b", "c"]
  return DynamicListOutput(items)  # default mapping_key is item itself
And the same may apply for DynamicDictOutput, where mapping key is a dict index itself.
s

sandy

06/20/2022, 6:56 PM
thanks for the suggestions @Mykola Palamarchuk. I filed a couple of issues to track them: • https://github.com/dagster-io/dagster/issues/8491https://github.com/dagster-io/dagster/issues/8492
and one more for the UI suggestion: https://github.com/dagster-io/dagster/issues/8493