Daniel Kim
07/16/2022, 5:41 PM@asset(group_name="nhtsa_mfr")
def fetch_manufacturers:
blah...blah...
@asset(config_schema=blah...blah, group_name="nhtsa_mfr")
def upload_mfrs_to_snowflake(fetch_manufacturers):
blah...blah...
The upload_mfrs_to_snowflake asset
has configuration. I have defined my @repository
, but I feel like it is a long-winded, verbose way and there is probably a less verbose way that I am not aware of.Daniel Kim
07/16/2022, 5:44 PM@job
first:
@job
def fetch_and_upload_mfrs_job():
upload_mfrs_to_snowflake(fetch_manufacturers())
Why I created a job? Well, I tried a few different things and this is the only way I was able to make this pipeline work because upload_mfrs_to_snowflake has configuration, I couldn't find a way to run or execute it otherwise.
Then defined my @repository like so:
@repository
def nhtsa_repo():
return [
fetch_manufacturers,
upload_mfrs_to_snowflake,
define_asset_job(
name="fetch_and_upload_mfrs_job",
selection=["fetch_manufacturers", "upload_mfrs_to_snowflake"],
config={
"ops": {
"upload_mfrs_to_snowflake": {
"config": {
'sf_account': SF_ACCOUNT,
'sf_warehouse': SF_WAREHOUSE,
'sf_database': SF_DATABASE,
'sf_schema': 'nhtsa',
'sf_role': SF_ROLE,
'sf_authenticator': SF_AUTHENTICATOR,
'sf_username': SF_USERNAME,
'sf_password': SF_PASSWORD,
'table_name': 'manufacturers',
}
}
}
}
)
]
So the above works as intended. But I feel like there must be another way where I didn't need to define a job first and then have to utilize the define_asset_job()
function. Furthermore, I also have two artifacts in dagit: the "fetch_and_upload_mfrs_job" job and a "nhtsa_mfr" asset group. Not sure why I need or have these 2 artifacts since it seems redundant, but I guess that is because of my repository definition.
I feel like the above is verbose. I was thinking I could instead do something along the lines of:
@repository
def nhtsa_repo():
return [
fetch_manufacturers,
upload_mfrs_to_snowflake.op.configured(
"ops": {
"upload_mfrs_to_snowflake": {
"config": {
'sf_account': SF_ACCOUNT,
'sf_warehouse': SF_WAREHOUSE,
'sf_database': SF_DATABASE,
'sf_schema': 'nhtsa',
'sf_role': SF_ROLE,
'sf_authenticator': SF_AUTHENTICATOR,
'sf_username': SF_USERNAME,
'sf_password': SF_PASSWORD,
'table_name': 'manufacturers',
}
}
}
)
]
But I got an error as expected as it isn't a return type allowed for repository definition:
Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition, AssetsDefinition, or SourceAsset.Got value of type <class 'dagster.core.definitions.solid_definition.SolidDefinition'> at index 1.
So I would appreciate it if someone can point me to a less verbose way to do this. Thanks in advance!Daniel Kim
07/17/2022, 4:27 PMname=
argument in define_asset_job()
refers to just an arbitrary name for the job, not a pre-existing job ðŸ¤jamie
07/18/2022, 4:46 PMupload_mfrs_to_snowflake
asset. We wouldn't want an upload_mfrs_to_snowflake
asset because it represents a task, not the actual data asset. So your code might look like this
@asset(group_name="nhtsa_mfr", io_manager_key="snowflake")
def manufacturers():
mfrs = # code here to get the manufacturers
return mrfs
then in your repository you can use the with_resources
function to attach the snowflake io manager to you asset
@repository
def my_repo():
return [
with_resources(definitions[manufacturers], resource_defs={'snowflake': snowflake_io_manager})
]
here's some helpful docs
https://docs.dagster.io/concepts/resources#providing-resources-to-software-defined-assets
https://docs.dagster.io/_apidocs/libraries/dagster-snowflake#dagster_snowflake.build_snowflake_io_managerDaniel Kim
07/18/2022, 4:51 PM