I originally opened a discussion about how other s...
# integration-snowflake
v
I originally opened a discussion about how other snowflake users are loading data. After some playing around and now having onboarded a data scientist into the platform, I wanted to share the pattern I ultimately adopted. The code below is simplified to keep it high-level.
Copy code
@usable_as_dagster_type
class UtilsSnowInput(pydantic.BaseModel):
    class Config:
        arbitrary_types_allowed = True

    dest_namespace: str
    data: pd.DataFrame

# s3_io_manager enhanced:
def handle_output(self, context: OutputContext, obj):
    ... # s3 load specific logic

    if isinstance(obj, UtilsSnowInput):
        context.log.debug(f"Attempting snowflake upload")
        parquet_path = self._upload_df(obj) # loads obj.data as parquet
        yield MetadataEntry(
            "S3 parquet storage path",
            value=MetadataValue.path(f"s3://{self.bucket}/{parquet_path}"),
        )
        # utils_snow is instantiated with a snowflake stage path as an optional parameter, runs a COPY INTO command into dest_namespace
        yield from self.utils_snow.copy_into_landing_area(
            context,
            parquet_path,
        )

# utils_snow.copy_into_landing_area wraps a COPY INTO command, cleans up the landing area, and yields some more metadata such as number of rows
def _get_copy_into_statement(
    self,
    remote_filepath: str,
    dest_namespace: str,
):
    return (
        f"COPY INTO {dest_namespace}(RAW) FROM @{self.stage}\n"
        f"FILES =('{remote_filepath}')\n"
        f"FILE_FORMAT = (type = '{remote_filepath.split('.')[-1]}');"
    )
This effectively means the users are free to write logic as they see fit, the only requirement is returning a
UtilsSnowInput
with the required parameters
🌈 1
s
Interesting! I was playing around with a smiilar idea and wondering if that would be a more ergonomic entrypoint into Assets + io Managers in general: Instead of returning some arbitrary Python object, you return an AssetClass.
Copy code
@asset
def my_sagemaker_asset():
    ...
    return MetadataAsset(name="baz", bar="foo") # just returns metdata about the thing

@asset
def my_snowflake_asset(my_sagemaker_training_job):
    return SnowflakeSqlTableAsset(
        database_name=...,
        schema_name = ...,
        table_name = "sagemaker_results",
        create_sql = f"select * from {my_sagemaker_asset.name}"
    )
the advantage it seems is that your users get to push their thinking from within the
asset(...)
decorator into the asset definition, and there's also a clearer link between "I'm returning a UtilsSnowInput, and I know hwo Dagster will handle that." Sort of displaces talk about IO Manager. Interesting!
v
Yep, my aim here was to make onboarding into the platform as easy as possible without having to look too much into the intricacies of Dagster. A lot (well, pretty much all) of the logic calling external services with resources or using IO Managers is in a
utils
package providing high-level abstractions imported into every project. This ensures that code is well tested and enforces a lot of the structure that should be built into the entire platform for maintainability and ease of collaboration later down the line.
👌 1