Jacob Roe
05/17/2022, 7:16 AMTung Dang
05/17/2022, 8:18 AMJacob Roe
05/17/2022, 8:48 AMTung Dang
05/17/2022, 9:33 AMJacob Roe
05/17/2022, 11:44 AMcreate_shell_command_op
can you please point me to the documentation that explains how that works?jamie
05/17/2022, 3:11 PMTEMP_DIR = f"/path/to/tmp/{uuid()}"
@op
def fetch_code():
# make the directory using TEMP_DIR
# get the code from git
@op(
require_resources={"dbt_cli_resource"}
)
def do_dbt(context):
# whatever you want to do with dbt
@job(
resource_defs={"dbt_cli_resource": dbt_cli_resource.configured(project_dir=TEMP_DIR)}
)
def my_job():
nothing = fetch_code()
do_dbt(nothing)
You'll also need to using Nothing
to ensure that the dbt step runs after the code is pulled (docs)Jacob Roe
05/17/2022, 11:51 PMimport uuid
from dagster import job
from dagster_shell import create_shell_command_op
from dagster_dbt import dbt_cli_resource, dbt_compile_op
RANDOM_ID = str(uuid.uuid4())
TMP_PATH = f'<<base path>>/tmp/{RANDOM_ID}/dbt/'
TMP_PATH_DBT = f'{TMP_PATH}lake/'
@job(resource_defs={"dbt": dbt_cli_resource.configured(
{
"project_dir": TMP_PATH_DBT,
}
)})
def dbt_run():
git_pull = create_shell_command_op(f'git clone <<git repo>> {TMP_PATH}', name="git_pull")
dbt_compile_op(start_after=git_pull())
jamie
05/18/2022, 1:24 PMdbt_run
job or is it different between the create_shell_command_op
and dbt_compile_op
? if its different per job, that's expected because it's a fresh execution so the RANDOM_ID will be regenerated. If you need to use the same path for every execution of the job, you'll need to set TMP_PATH
to something consistent like TMP_PATH = f'<<base path>>/tmp/dagster_dbt_dir/dbt/
. if it's different between ops we'll need to dig in a bit moreJacob Roe
05/18/2022, 1:34 PMdaniel
05/18/2022, 10:35 PMRANDOM_ID = str(uuid.uuid4())
gets executed within each ops process and produces a single result.
If you put this as config on your job:
execution:
config:
in_process:
it will run the whole job in a single process, and that UUID will be generated once