Hi team, We have the following file (referred as ...
# ask-community
a
Hi team, We have the following file (referred as services.py) that we want to load as a code location. The dbt compile takes a while to run, which is why we have placed it inside a job and placed a schedule on it. We have another code location that loads the dbt assets from the manifests generated by this job. Two questions: 1. When running dagster dev -f services.py, this job seems to start executing and the code location does not load due to the error 'Exception: Timed out waiting for gRPC server to start after 180s'. 2. When viewing the logs, why do we see the dbt compile issue twice for each customer? Does building a dbt resource issue a compile command?
Copy code
from dagster import Definitions, build_resources, job, ScheduleDefinition
 
 import datasource_lake_defs 
 
 from custom_modules import spina_classes
 from custom_modules import helper_functions
 from custom_modules import dbt_functions
 from custom_modules import armada_functions 
 
 customer_code_list = []
 
 @job
 def job_generate_customer_dbt_manifests():
     # get the org list from armada
     organisations = armada_functions.get_organisations()
 
     organisation: spina_classes.Organisation
     for organisation in organisations:
         # get the config from armada]    
 
         run_config = armada_functions.get_config_from_armada(organisation)
 
         if run_config.datasets:
             customer_code = helper_functions.clean_object_name(organisation.organisation_name.lower())
             customer_code_list.append(customer_code)
             customer_name = organisation.organisation_name
 
             with build_resources(
                 resources={"dbt_res": datasource_lake_defs.configured_dbt_cli_resource},
                 resource_config = {
                     "dbt_res": {
                         "config": {
                             "vars": dbt_functions.get_dbt_vars_for_customer(customer_code),
                             "target_path": f"target/{customer_code}"
                         }
                     }   
                 }
             ) as resources:
                 resources.dbt_res.compile()
 
 schedule_job_generate_customer_dbt_manifests = ScheduleDefinition(job=job_generate_customer_dbt_manifests, cron_schedule="*/10 * * * *")
 
 
 defs = Definitions(
     jobs=[job_generate_customer_dbt_manifests],
     schedules=[schedule_job_generate_customer_dbt_manifests])
@Manan P
Hey @sandy, could you help us with this?
o
hi @Abhishek Agrawal -- (I think this answers both questions), code inside an
@job
decorated function is meant to be defining dependencies between `@op`s. So dagster will immediately run the code inside that function whenever this file is loaded, resulting in the behavior you're seeing. If you put this code inside an
@op
which is then called inside your job, I think you'd get the behavior you're expecting
a
Thanks @owen! So once it's loaded successfully, will Dagster keep it running to periodically reload definitions or let it run as per the cron schedule..?
o
I think you'll need to explicitly reload the other code location once the manifest is compiled so that dagster will pull in the most recent changes (docs on how to do that via graphql: https://docs.dagster.io/concepts/dagit/graphql-client#reloading-all-repositories-in-a-repository-location)
a
Thanks @owen, super helpful. Follow up question - We have a file which has no job/op that we use to initialise the assets and code definitions. It loops through a list of active clients we have and it prepares a list of assets for each of them. Now, when the code is being deployed, obviously Dagster will run it and because of that, it seldom times out or it takes really long. Can we tell Dagster to not run it while the code is being deployed?
As we scale to more clients, the process will become more and more slow. We just want it to not run when the code is being deployed.
o
how are you currently deploying your code changes? In an ideal world, you could circumvent this process of using one dagster job to update other dagster jobs by rebuilding the dbt manifest in the same process that deploys the dbt code changes (e.g. you merge a change to the dbt repo and a github action builds a docker image with a compiled manifest) similarly, for python/dagster code changes, the image building process might execute that script that prepares the list of assets. I understand you might not be using github actions / docker but this general pattern of executing these sorts of scripts at the time a code change is merged is quite effective
a
Thanks @owen. This is super helpful. I will discuss this approach internally.
Hey, not sure what I'm going to ask will even makes sense but our code location file is doing a lot of work creating assets for each of our client using asset factory approach. It takes too long and the timeout of 1200 also fails. I thought maybe we can put the above time-taking logic inside a job/op and create Defs once that job has run?
o
I guess what I would ask is what the source of truth is for your set of clients? If this source of truth lives within the same code that you use to generate definitions per client, then I think something like what I described would make the most sense. Basically, if your github repository contains all the information it needs in order for those files to be generated (i.e. no database calls are required), then it makes the most sense to do the slow work of turning (list of clients) + (code to turn that into definitions) only when changes are merged to your repository (i.e. you can write out a file that lives in a docker image or something of the sort) I am curious as to why it takes so long to generate this info though -- what's eating up most of the time? is it just a huge volume of definitions (like many many thousands)? or does each definition require non-trivial work to construct?
a
We make an API call to a different system, get all the clients, loop through them and create tables and views if they don't exist. We separated out the part which compile the dbt models for them so in this process, we read the manifest file from GCS bucket for each client and create dbt assets. We also initialise sensors. So, there's a lot happening and since this code is not inside an @op, it keeps on running multiple times too which is unnecessary. We need to get current client list, so API call is necessary and we can't put it within deployment process. Does that help?
@Manan P in case I missed anything
o
how often are new clients added? if it's somewhat rare (i.e. a few times a day), it still might make sense to put this inside a deployment process. you could have a separate process (even a dagster sensor!) polling for changes to the API response, and whenever something does change, you could trigger a redeploy. the reason I'm pushing on this point is just that it makes it much easier to reason about the state of your system if changes only happen between delpoyments. I.E. in your current setup, if a run starts, the first process might load in a completely different state of the world from a downstream process, if the API is updated before the second process starts
a
Hmm understood. So the code where we are calling the API and building assets and sensors would be a separate piece of code? I'm not able to wrap my head around this but it does sound logical. Where would this code actually sit? We run it as a .py file in the docker file? Whenever a k8s pod pulls and run the image, will it not get run then? Maybe an example would help here..
hey @owen, could you help me with an example please? am keen to try this approach..
o
hey! sorry this got lost in my backlog -- my rough feeling is that you could have some script (regular .py file) that lives in the same repo as your regular code. This script would call the API, do whatever expensive work that needs to be done, and write out the results to a file. I was thinking this could be a .yaml file of some sort. Then, you could construct your definitions by reading this .yaml file. My assumption is that this construction process would be much faster than the original process, as you're just reading from a static file. In the docker file, you could call this script, so that any time you built an image, that image would contain an up-to-date version of this yaml file, and therefore the definitions that you build would be up to date. Roughly:
Copy code
# my_script.py

def write_yaml_file():
    api = call_api()
    loop_through_response(...)
    write_yaml_file_representing_definitions_you_want(...)

if __name__ == "__main__":
    write_yaml_file()
then in your
defs.py
file:
Copy code
def get_definitions():
   yaml = read_yaml_file()
   assets = []
   sensors = []
   for thing in yaml:
      sensors.append(make_sensor(thing))
      assets.append(make_asset(thing))
   return Definitions(assets=assets, sensors=sensors, ...)

defs = get_definitions()
a
Thanks a lot @owen! I will find some time to test this out..
Hey @owen, a bit related to this - is it possible to reload definitions as part of a job? I saw there's way to do it via GraphQL Python Client but can we do it within a job too? Whenever the job is being run, I want to first load the assets again..
o
So you can execute that GraphQL query from inside of a job, but that would probably only work if this was a separate job from the job that's materializing your assets. Basically, the GRPC server (the thing that you send requests to kick off jobs from) is the thing that gets reloaded from that GraphQL query. If it kicks off a job, that job will already have a serialized representation of the job that it plans to execute, which it passes along to the process that executes the run. If the first step of that job reloads the definitions and therefore changes the structure of that job, the behavior there is somewhat undefined (depending on your architecture / where things are running, it might proceed with the old version of the job, or potentially get itself into a weird state)
a
I see your point. We will use GraphQL client then.
Hey @owen, I am thinking to give above a try - reloading repository as part of a job. Once that'll be done, I will kick off a different one. To do that, do I still have to create a client object by connecting to the localhost?