Benny Renkens
04/30/2023, 9:18 PMdef some_api_to_read(machine_id, start_date, end_date) -> pd.DataFrame:
return pd.DataFrame([1,2,3,4])
class ReaderConfig(Config):
machine_id: str
start_date: str
end_date:str
@op
def read_machine_data(context: OpExecutionContext, config: ReaderConfig) -> pd.DataFrame:
# OP has access to graph partitions, but it already has it's custom config for other jobs
<http://context.log.info|context.log.info>(f"graph partition:", {context.partition_time_window[0]}")
return some_api_to_read(config.machine_id, config.start_date, config.end_date)
@graph_asset(
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"color": StaticPartitionsDefinition(["machine_a", "machine_b", "machine_c"]),
}
)
)
def machine_dataset() -> pd.DataFrame:
read_machine_a_sometime = read_machine_data.configured({
"machine_id": "<cannot access partition info here?>",
"start_date": "<cannot access partition info here?>",
"end_date": "<cannot access partition info here?>"
},
name="read_of_a_machine"
)
return read_machine_a_sometime()