Istvan Darvas
11/19/2020, 2:15 PM@pipeline(
mode_defs=[
ModeDefinition("Live", resource_defs={"df_reader": dataframe_reader_for_pgsql}),
],
input_defs=[InputDefinition(name="candidate", dagster_type=dict)],
)
def ignition_aggregate_pipeline(candidate):
candidate = { # This would be my preset
"device_inventory_id": 1,
"device_id_type": 2,
"device_id_name": "58909",
"proc_until": "2020-11-18 15:38:23+01",
"receive_time": "2020-11-19 12:35:34+01",
}
ignition_agg_base = new_ignitions_in_range(candidate)
@solid(
required_resource_keys={"df_reader"},
input_defs=[InputDefinition(name="candidate", dagster_type=dict)],
output_defs=[OutputDefinition(name="debug", dagster_type=int)],
)
def new_ignitions_in_range(context, candidate: dict) -> int:
sandy
11/19/2020, 2:51 PMexecute_pipeline
? via the command line? in dagit?Istvan Darvas
11/19/2020, 2:53 PMif __name__ == "__main__":
run_config = {}
with open("dagster-test2_resources.yaml", 'r') as yaml_cfg:
try:
run_config = yaml.safe_load(yaml_cfg)
except yaml.YAMLError as exc:
print(exc)
execute_pipeline(ignition_aggregate_pipeline, run_config=run_config)
sandy
11/19/2020, 4:19 PM@pipeline(
mode_defs=[ModeDefinition("Live", resource_defs={"df_reader": dataframe_reader_for_pgsql}),],
)
def ignition_aggregate_pipeline():
ignition_agg_base = new_ignitions_in_range()
@solid(
required_resource_keys={"df_reader"},
output_defs=[OutputDefinition(name="debug", dagster_type=int)],
config_schema={
"device_inventory_id": int,
"device_id_type": int,
"device_id_name": str,
"proc_until": str,
"receive_time": str,
},
)
def new_ignitions_in_range(context) -> int:
candidate = context.solid_config
execute_pipeline(
ignition_aggregate_pipeline,
run_config={
"solids": {
"new_ignitions_in_range": {
"device_inventory_id": 1,
"device_id_type": 2,
"device_id_name": "58909",
"proc_until": "2020-11-18 15:38:23+01",
"receive_time": "2020-11-19 12:35:34+01",
}
}
},
)
Istvan Darvas
11/19/2020, 5:06 PMcat
11/19/2020, 5:28 PMIstvan Darvas
11/19/2020, 5:32 PMsandy
11/19/2020, 5:40 PMIstvan Darvas
11/19/2020, 5:51 PMcat
11/19/2020, 8:58 PM