Gustavo Carvalho
05/13/2023, 5:14 AMcontext.asset_partitions_time_window
inside an op
which is part of my graph_asset
, but got an error.
Code to reproduce:
from dagster import (
DailyPartitionsDefinition,
Definitions,
OpExecutionContext,
asset,
graph_asset,
op,
)
day_partitions_def = DailyPartitionsDefinition(start_date="2023-05-01")
@op
def extract(context: OpExecutionContext):
# Error happens here!
tw = context.asset_partitions_time_window_for_output()
<http://context.log.info|context.log.info>(f"Extracting Data: {tw.start} - {tw.end}")
x = 1
<http://context.log.info|context.log.info>(f"Extracted Data: {x}")
return x
@op
def transform(context: OpExecutionContext, x):
<http://context.log.info|context.log.info>(f"Transform Input: {x}")
y = x + 1
<http://context.log.info|context.log.info>(f"Transform Output: {y}")
return y
@op
def load(context: OpExecutionContext, y):
<http://context.log.info|context.log.info>("Loaded Data: {y}")
@graph_asset(partitions_def=day_partitions_def)
def my_graph_asset():
x = extract()
y = transform(x)
return load(y
defs = Definitions(
assets=[my_graph_asset],
)
I found a thread where @owen suggests using context.partition_key
inside the op
, but it does not work with the single-run backfill.