Vinnie
02/08/2023, 9:40 AM@usable_as_dagster_type
class UtilsSnowInput(pydantic.BaseModel):
class Config:
arbitrary_types_allowed = True
dest_namespace: str
data: pd.DataFrame
# s3_io_manager enhanced:
def handle_output(self, context: OutputContext, obj):
... # s3 load specific logic
if isinstance(obj, UtilsSnowInput):
context.log.debug(f"Attempting snowflake upload")
parquet_path = self._upload_df(obj) # loads obj.data as parquet
yield MetadataEntry(
"S3 parquet storage path",
value=MetadataValue.path(f"s3://{self.bucket}/{parquet_path}"),
)
# utils_snow is instantiated with a snowflake stage path as an optional parameter, runs a COPY INTO command into dest_namespace
yield from self.utils_snow.copy_into_landing_area(
context,
parquet_path,
)
# utils_snow.copy_into_landing_area wraps a COPY INTO command, cleans up the landing area, and yields some more metadata such as number of rows
def _get_copy_into_statement(
self,
remote_filepath: str,
dest_namespace: str,
):
return (
f"COPY INTO {dest_namespace}(RAW) FROM @{self.stage}\n"
f"FILES =('{remote_filepath}')\n"
f"FILE_FORMAT = (type = '{remote_filepath.split('.')[-1]}');"
)
This effectively means the users are free to write logic as they see fit, the only requirement is returning a UtilsSnowInput
with the required parametersStephen Bailey
02/08/2023, 12:17 PM@asset
def my_sagemaker_asset():
...
return MetadataAsset(name="baz", bar="foo") # just returns metdata about the thing
@asset
def my_snowflake_asset(my_sagemaker_training_job):
return SnowflakeSqlTableAsset(
database_name=...,
schema_name = ...,
table_name = "sagemaker_results",
create_sql = f"select * from {my_sagemaker_asset.name}"
)
the advantage it seems is that your users get to push their thinking from within the asset(...)
decorator into the asset definition, and there's also a clearer link between "I'm returning a UtilsSnowInput, and I know hwo Dagster will handle that." Sort of displaces talk about IO Manager. Interesting!Vinnie
02/08/2023, 3:15 PMutils
package providing high-level abstractions imported into every project. This ensures that code is well tested and enforces a lot of the structure that should be built into the entire platform for maintainability and ease of collaboration later down the line.