Abhishek Agrawal
03/12/2023, 10:06 AMfrom dagster import Definitions, build_resources, job, ScheduleDefinition
import datasource_lake_defs
from custom_modules import spina_classes
from custom_modules import helper_functions
from custom_modules import dbt_functions
from custom_modules import armada_functions
customer_code_list = []
@job
def job_generate_customer_dbt_manifests():
# get the org list from armada
organisations = armada_functions.get_organisations()
organisation: spina_classes.Organisation
for organisation in organisations:
# get the config from armada]
run_config = armada_functions.get_config_from_armada(organisation)
if run_config.datasets:
customer_code = helper_functions.clean_object_name(organisation.organisation_name.lower())
customer_code_list.append(customer_code)
customer_name = organisation.organisation_name
with build_resources(
resources={"dbt_res": datasource_lake_defs.configured_dbt_cli_resource},
resource_config = {
"dbt_res": {
"config": {
"vars": dbt_functions.get_dbt_vars_for_customer(customer_code),
"target_path": f"target/{customer_code}"
}
}
}
) as resources:
resources.dbt_res.compile()
schedule_job_generate_customer_dbt_manifests = ScheduleDefinition(job=job_generate_customer_dbt_manifests, cron_schedule="*/10 * * * *")
defs = Definitions(
jobs=[job_generate_customer_dbt_manifests],
schedules=[schedule_job_generate_customer_dbt_manifests])
@Manan PAbhishek Agrawal
03/14/2023, 3:56 AMowen
03/14/2023, 5:35 PM@job
decorated function is meant to be defining dependencies between `@op`s. So dagster will immediately run the code inside that function whenever this file is loaded, resulting in the behavior you're seeing. If you put this code inside an @op
which is then called inside your job, I think you'd get the behavior you're expectingAbhishek Agrawal
03/14/2023, 8:32 PMowen
03/14/2023, 8:34 PMAbhishek Agrawal
03/15/2023, 2:16 PMAbhishek Agrawal
03/15/2023, 2:17 PMowen
03/15/2023, 4:22 PMAbhishek Agrawal
03/16/2023, 10:25 AMAbhishek Agrawal
03/16/2023, 1:45 PMowen
03/16/2023, 4:18 PMAbhishek Agrawal
03/16/2023, 8:50 PMAbhishek Agrawal
03/16/2023, 9:13 PMowen
03/16/2023, 9:51 PMAbhishek Agrawal
03/16/2023, 10:09 PMAbhishek Agrawal
03/22/2023, 2:38 PMowen
03/22/2023, 5:48 PM# my_script.py
def write_yaml_file():
api = call_api()
loop_through_response(...)
write_yaml_file_representing_definitions_you_want(...)
if __name__ == "__main__":
write_yaml_file()
then in your defs.py
file:
def get_definitions():
yaml = read_yaml_file()
assets = []
sensors = []
for thing in yaml:
sensors.append(make_sensor(thing))
assets.append(make_asset(thing))
return Definitions(assets=assets, sensors=sensors, ...)
defs = get_definitions()
Abhishek Agrawal
03/22/2023, 11:34 PMAbhishek Agrawal
03/27/2023, 10:50 PMowen
03/28/2023, 4:32 PMAbhishek Agrawal
03/28/2023, 10:29 PMAbhishek Agrawal
04/12/2023, 10:53 PM