Levan
09/15/2022, 5:54 PMLevan
09/15/2022, 5:54 PM@op
def op1():
list_of_df = [df1,df2,df3]
for df in list_of_df:
yield DynamicOutput(
df, mapping_key=ticker_name
)
@op
def op2(context, df):
df = ... # do some processing
return df
@op(
name="test",
out={
"test": Out(
io_manager_key="snowflake_io_manager",
metadata={"partition_expr": "_updated_at"},
)
},
)
def op3(context, df):
return df
@job(name="dynamic", metadata={"partition_expr": "_updated_at"}, config=weekly_partitioned_config)
def dynamic_graph_job():
result = op1().map(op2)
op3(result.collect()) ## ??
@weekly_partitioned_config(start_date=datetime(2022, 8, 21))
def weekly_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"op1": {
"config": {"date": start.strftime("%Y-%m-%d")}
},
"op2": {
"config": {"date": start.strftime("%Y-%m-%d")}
}
}
}
sean
09/15/2022, 9:02 PMsnowflake_io_manager
currently only handles partitions when used with assetsLevan
09/15/2022, 9:35 PMresult.collect()
to asset?Levan
09/15/2022, 9:41 PMcontext
still has _partition_key
available and If context.has_asset_key
is False
, why not assign time_window
based on partition_key
(if provided)? Which could solve this issuesean
09/15/2022, 10:03 PMsnowflake_io_manager
use the partition key for opssean
09/15/2022, 10:04 PMsnowflake_io_manager
Dagster Bot
09/15/2022, 10:04 PM