Davi
09/05/2022, 8:34 AMresource_defs
paradigm. I've defined two resources myself: redshift_resource()
and load_env_variables()
, where the _redshift_resource_ resource uses the _load_env_variables_ resource:
@resource
def load_env_variables():
return load_dotenv(find_dotenv())
@resource(required_resource_keys={"load_env_variables"},
config_schema={"database_name":
Field(str, is_required=False, default_value="oip")})
def redshift_resource(context):
_ = context.resources.load_env_variables
return RedshiftOperator(database=context.resource_config["database_name"])
Then I define an asset that uses these resources:
@asset(group_name="group_name",
required_resource_keys={"redshift_resource", "load_env_variables"},
config_schema={"database": Field(str, is_required=False, default_value="oip"),
"table": Field(str, is_required=True),
"schema": Field(str, is_required=True),
"method": Field(str, is_required=False, default_value="auto")})
def read_from_redshift(context) -> pd.DataFrame:
[...]
return df_from_redshift
From this asset I create still several others that will apply this asset and return a Pandas Dataframe with the table retrieved from redshift. Down here you can find an example of one of those functions:
@asset(required_resource_keys={"redshift_resource", "load_env_variables"},
group_name="group_name",
ins={"read_from_redshift": AssetIn("read_from_redshift")})
def table_name_1(read_from_redshift) -> pd.DataFrame:
context = build_init_resource_context(
config={"table": "table_name", "schema": "schema_name"})
return read_from_redshift(context=context)
Then I have an @op which call all this "_table_name_" assets:
@op(required_resource_keys={"redshift_resource", "load_env_variables"})
def load_tables(table_name_1,
table_name_2,
table_name_3,
table_name_4,
, ...)
And finally, in the job we define a resource_defs
dictionary in the decorator and call this op:
@job(resource_defs={"redshift_resource": redshift_resource,
"load_env_variables": load_env_variables})
def job_loading_tables():
load_tables(table_name_1=table_name_1(),
table_name_2=table_name_2(),
table_name_3=table_name_3(),
table_name_4=table_name_4(),
, ...)
When I run this job with the "_dagit_" command, It trows an error:
UserWarning: Error loading repository location rcd_dagster:dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'load_env_variables' required by op 'read_from_redshift' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'load_env_variables', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
What am I doing wrong here ? Do I really have to apply required_resource_keys={"redshift_resource", "load_env_variables"}
everywhere as I did ?
Thank you very much for your help !yuhan
09/06/2022, 6:59 PMDo I really have to applyno i dont think so. becauseeverywhere as I did ?required_resource_keys={"redshift_resource", "load_env_variables"}
"load_env_variables"
is a dependency of the "redshift_resource"
, i think you just need to specify it on that resource def.
here’s an example: https://docs.dagster.io/concepts/resources#resource-to-resource-dependencies