Hi, I am new to dagster and am struggling to get m...
# ask-community
j
Hi, I am new to dagster and am struggling to get my head around it as it makes some different choices to airflow and azkaban which I am more used to. What I am trying to implement is a three step job that does the following: • Pulls a git repo into a tmp folder with a generated random id for a name eg.. tmp/{uuid}/.... • Then runs dbt or a a different shell command that references that folder • Deletes the tmp folder The issue I am running into is how I can generate a random uuid and then use that in the resource def for dbt and the two ops that pulls down the repo and deletes the folder. Anything that could point me in the right direction would be appreciated
🤖 1
t
My first thought: you could define an op for generating the uuid and pass it as output to the next steps
j
Is there a way to generate the dbt_cli_resource resource def using the output of an upstream op?
t
Ah got it, I thought you want to run dbt using shell command op. You can still pre-generate the UUID globally and use it for both dbt resource def and the git pull op (as input for example). Just to make sure that there is an execution order between the git pull op (so the models directory is there) and the dbt step.
j
Ok that is what I want to do I am just not sure the syntax for define a global variable in the job and have that pass through to the resource def and
create_shell_command_op
can you please point me to the documentation that explains how that works?
j
hey @Jacob Roe i think Tung Dang's suggestion is reasonable. I think they are saying that you can set a plain python global variable that both the op and the resource can have access to
Copy code
TEMP_DIR = f"/path/to/tmp/{uuid()}"

@op 
def fetch_code():
    # make the directory using TEMP_DIR
    # get the code from git

@op(
   require_resources={"dbt_cli_resource"}
)
def do_dbt(context):
     # whatever you want to do with dbt 

@job(
    resource_defs={"dbt_cli_resource": dbt_cli_resource.configured(project_dir=TEMP_DIR)}
)
def my_job():
    nothing = fetch_code()
    do_dbt(nothing)
You'll also need to using
Nothing
to ensure that the dbt step runs after the code is pulled (docs)
j
@jamie thanks for the reply this is the code I am currently using but the issue is that the uuid is regenerated for every op
Copy code
import uuid

from dagster import job
from dagster_shell import create_shell_command_op
from dagster_dbt import dbt_cli_resource, dbt_compile_op


RANDOM_ID = str(uuid.uuid4())

TMP_PATH = f'<<base path>>/tmp/{RANDOM_ID}/dbt/'
TMP_PATH_DBT = f'{TMP_PATH}lake/'

@job(resource_defs={"dbt": dbt_cli_resource.configured(
    {
        "project_dir": TMP_PATH_DBT,
    }
    )})
def dbt_run():
    
    git_pull = create_shell_command_op(f'git clone <<git repo>> {TMP_PATH}', name="git_pull")
    dbt_compile_op(start_after=git_pull())
j
is the temp directory changing every time to execute the
dbt_run
job or is it different between the
create_shell_command_op
and
dbt_compile_op
? if its different per job, that's expected because it's a fresh execution so the RANDOM_ID will be regenerated. If you need to use the same path for every execution of the job, you'll need to set
TMP_PATH
to something consistent like
TMP_PATH = f'<<base path>>/tmp/dagster_dbt_dir/dbt/
. if it's different between ops we'll need to dig in a bit more
j
It was different between the two ops within the same job execution
d
Hi Jacob - my guess is the reason its different for each op is that you're using the default multiprocess executor, which runs each op in its own process. So
Copy code
RANDOM_ID = str(uuid.uuid4())
gets executed within each ops process and produces a single result. If you put this as config on your job:
Copy code
execution:
  config:
    in_process:
it will run the whole job in a single process, and that UUID will be generated once
If you want to keep using multiprocess - my advice would be to generate that UUID in an initial op and pass the output through to any other ops that need it