clay
02/02/2023, 2:46 PMdef dbt_op(context):
as the function signatureLorenzo
02/02/2023, 2:52 PMdagster._core.errors.DagsterInvalidInvocationError: Resource initialization function has context argument, but no context was provided when invoking.
I then tried to use the build_op_context
function, but still it doesn't work. Dagster says that the --project-dir flag is not a dbt project. And at this point I was even more confused because I don't know how to create a context that contains dbt flagsjamie
02/02/2023, 3:29 PMLorenzo
02/02/2023, 3:34 PM# Define the nedeed paths as variables
DBT_PROJECT_DIR = file_relative_path(__file__, "../UM_FOX_AU-dbt/dbt")
DBT_PROFILES_DIR = file_relative_path(__file__, "../UM_FOX_AU-dbt/dbt/config")
DBT_CONFIG = {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
# Loading dbt assets
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR)
# Resources definition
resource_defs = {
"dbt": dbt_cli_resource.configured(DBT_CONFIG)
}
jamie
02/02/2023, 3:38 PM@op
def dbt_op():
cli_resource = dbt_cli_resource(context)
cli_resource.run(models=["my_model"])
@job(
resource_defs={
"dbt": dbt_cli_resource.configured(DBT_CONFIG)
}
)
def dbt_job():
dbt_op()
Lorenzo
02/02/2023, 3:49 PMNameError: name 'context' is not defined
).
-
I then tried:
@op
def dbt_op(context):
cli_resource = dbt_cli_resource(context)
cli_resource.run(models=["TMP_UM_CM_B_ADDRESS_P_AU_NEW_RECORDS"])
@job(
resource_defs={
"dbt": dbt_cli_resource.configured(DBT_CONFIG)
}
)
def dbt_job():
dbt_op()
And got back: dagster._check.ParameterCheckError: Param "context" is not a UnboundInitResourceContext. Got <dagster._core.execution.context.compute.OpExecutionContext object at 0x7f7c22ff1d00> which is type <class 'dagster._core.execution.context.compute.OpExecutionContext'>.
-
Finally, I removed the "context" from both and got: dagster._core.errors.DagsterInvalidInvocationError: Resource initialization function has context argument, but no context was provided when invoking.
jamie
02/02/2023, 3:52 PM@op(
required_resource_keys={"dbt}
)
def dbt_op(context):
cli_resource = context.resources.dbt
cli_resource.run(models=["TMP_UM_CM_B_ADDRESS_P_AU_NEW_RECORDS"])
@job(
resource_defs={
"dbt": dbt_cli_resource.configured(DBT_CONFIG)
}
)
def dbt_job():
dbt_op()
Lorenzo
02/02/2023, 3:57 PM