https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Daniel Kim

07/16/2022, 5:41 PM
Hello! I am finally getting around to using SDA. I have a pipeline that consists of just 2 assets:
Copy code
@asset(group_name="nhtsa_mfr")
def fetch_manufacturers:
    blah...blah...
Copy code
@asset(config_schema=blah...blah, group_name="nhtsa_mfr")
def upload_mfrs_to_snowflake(fetch_manufacturers):
    blah...blah...
The
upload_mfrs_to_snowflake asset
has configuration. I have defined my
@repository
, but I feel like it is a long-winded, verbose way and there is probably a less verbose way that I am not aware of.
This is what I did. I created a
@job
first:
Copy code
@job
def fetch_and_upload_mfrs_job():
    upload_mfrs_to_snowflake(fetch_manufacturers())
Why I created a job? Well, I tried a few different things and this is the only way I was able to make this pipeline work because upload_mfrs_to_snowflake has configuration, I couldn't find a way to run or execute it otherwise. Then defined my @repository like so:
Copy code
@repository
def nhtsa_repo():
    return [
        fetch_manufacturers,
        upload_mfrs_to_snowflake,
        define_asset_job(
            name="fetch_and_upload_mfrs_job",
            selection=["fetch_manufacturers", "upload_mfrs_to_snowflake"],
            config={
                "ops": {
                    "upload_mfrs_to_snowflake": {
                        "config": {
                            'sf_account': SF_ACCOUNT,
                            'sf_warehouse': SF_WAREHOUSE,
                            'sf_database': SF_DATABASE,
                            'sf_schema': 'nhtsa',
                            'sf_role': SF_ROLE,
                            'sf_authenticator': SF_AUTHENTICATOR,
                            'sf_username': SF_USERNAME,
                            'sf_password': SF_PASSWORD,
                            'table_name': 'manufacturers',
                        }
                    }
                }
            }
        )
    ]
So the above works as intended. But I feel like there must be another way where I didn't need to define a job first and then have to utilize the
define_asset_job()
function. Furthermore, I also have two artifacts in dagit: the "fetch_and_upload_mfrs_job" job and a "nhtsa_mfr" asset group. Not sure why I need or have these 2 artifacts since it seems redundant, but I guess that is because of my repository definition. I feel like the above is verbose. I was thinking I could instead do something along the lines of:
Copy code
@repository
def nhtsa_repo():
    return [
        fetch_manufacturers,
        upload_mfrs_to_snowflake.op.configured(
            "ops": {
                "upload_mfrs_to_snowflake": {
                    "config": {
                        'sf_account': SF_ACCOUNT,
                        'sf_warehouse': SF_WAREHOUSE,
                        'sf_database': SF_DATABASE,
                        'sf_schema': 'nhtsa',
                        'sf_role': SF_ROLE,
                        'sf_authenticator': SF_AUTHENTICATOR,
                        'sf_username': SF_USERNAME,
                        'sf_password': SF_PASSWORD,
                        'table_name': 'manufacturers',
                    }
                }
            }
        )
    ]
But I got an error as expected as it isn't a return type allowed for repository definition:
Copy code
Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition, AssetsDefinition, or SourceAsset.Got value of type <class 'dagster.core.definitions.solid_definition.SolidDefinition'> at index 1.
So I would appreciate it if someone can point me to a less verbose way to do this. Thanks in advance!
Oooops nevermind! I realized the
name=
argument in
define_asset_job()
refers to just an arbitrary name for the job, not a pre-existing job 🤭
j

jamie

07/18/2022, 4:46 PM
Hi @Daniel Kim glad you got this working! a couple other things that might be helpful as you iterate: When you work with SDAs a good way to think about them is as the data asset you're creating, rather than the steps to create it. So in the case of the manufacturers, you want a single asset that represents the manufacturers table and then you can set the IO manager of the asset to take care of uploading the asset to snowflake. This removes the need for an
upload_mfrs_to_snowflake
asset. We wouldn't want an
upload_mfrs_to_snowflake
asset because it represents a task, not the actual data asset. So your code might look like this
Copy code
@asset(group_name="nhtsa_mfr", io_manager_key="snowflake")
def manufacturers():
    mfrs = # code here to get the manufacturers
    return mrfs
then in your repository you can use the
with_resources
function to attach the snowflake io manager to you asset
Copy code
@repository 
def my_repo():
    return [
       with_resources(definitions[manufacturers], resource_defs={'snowflake': snowflake_io_manager})
    ]
here's some helpful docs https://docs.dagster.io/concepts/resources#providing-resources-to-software-defined-assets https://docs.dagster.io/_apidocs/libraries/dagster-snowflake#dagster_snowflake.build_snowflake_io_manager
d

Daniel Kim

07/18/2022, 4:51 PM
@jamie Thanks Jamie! Gotcha! Yup my example was contrived only to quickly get a feel for SDA. Definitely will not be modeling steps as assets!
🎉 1
2 Views