Joel Olazagasti
04/25/2023, 6:28 PMupdated_at
field or similar) from an observable_source_asset
that has a Snowflake IO manager?Tim Castillo
04/25/2023, 8:02 PMJoel Olazagasti
04/25/2023, 8:22 PM@observable_source_asset(
key_prefix = ['database', 'schema']
io_manager_key = 'snowflake_io'
group_name = 'group'
)
def observed_source_asset(context):
result = context.resources.snowflake_io.execute_query(
'SELECT max(updated_at) as updated_at',
fetch_results = True
)
version = stringify_result(result)
return DataVersion(version)
But whenever I try to run observe sources
I get ___ASSET_JOB_ cannot be executed with the provided config)
error. Am I access the io manager in an incorrect fashion? It's strange because I don't even get any local console output, just the error in the UI.Tim Castillo
04/25/2023, 8:23 PMrequired_resource_keys
onto your definition?Joel Olazagasti
04/25/2023, 8:27 PMDefinitions(... resources={"snowflake_io": snowflake_io} ...)
at the top level of my package. It's the same resource I'm using for the functioning unobserved sources, currently. I don't think this configuration changed, but just in case, I am pinned to version 1.2.6
right now, pending a bugfix with airbyte resource declaration.Tim Castillo
04/25/2023, 8:36 PMrequired_resource_keys
to the @observable_source_asset
def?
For reference, the example in the doc has:
@asset(required_resource_keys={"snowflake"})
def small_petals(context):
return context.resources.snowflake.execute_query(
(
'SELECT * FROM IRIS_DATASET WHERE "petal_length_cm)" < 1 AND'
' "petal_width_cm" < 1'
),
fetch_results=True,
use_pandas_result=True,
)
Also, you'd have a separate resource declared for the io manager and another for raw resource usage (ex. snowflake.execute_query
.Tim Castillo
04/25/2023, 8:37 PMJoel Olazagasti
04/25/2023, 8:49 PM@observed_source_asset
decorator doesn't have a required_resource_key
param, as far as I can tell. Should I be defining an Asset
and an observable_source_asset
? I did just try explicitly setting the resource via the resource_defs
paramater, to no avail.Tim Castillo
04/25/2023, 8:49 PMJoel Olazagasti
04/25/2023, 8:51 PMTim Castillo
04/25/2023, 8:54 PMrequired_resource_keys
as a param until recently:Joel Olazagasti
04/25/2023, 8:55 PMTim Castillo
04/25/2023, 9:00 PMfrom dagster import DataVersion, Definitions, asset, observable_source_asset
from dagster_snowflake import snowflake_resource, snowflake_pandas_io_manager
@observable_source_asset(
require_resource_keys={'snowflake'},
key_prefix = ['database', 'schema'],
)
def observed_source_asset(context):
result = context.resources.snowflake.execute_query(
'select max(updated_at) from table_name',
fetch_results = True
)
return DataVersion(str(result[0][0]))
@asset(
io_manager_key='db_io_manager'
)
def downstream_asset(observable_source_asset):
return observable_source_asset.groupby('date').sum()
SNOWFLAKE_CREDS = {
'account': 'account',
'etc': 'etc'
}
defs = Definitions(
assets=[observed_source_asset, downstream_asset],
resources={
'snowflake': snowflake_resource.configured(SNOWFLAKE_CREDS),
'db_io_manager': snowflake_pandas_io_manager.configured(SNOWFLAKE_CREDS)
},
)
Tim Castillo
04/25/2023, 9:01 PMsnowflake
and db_io_manager
. snowflake
is just a vanilla resource that you can use to run execute_query
on, and db_io_manager
is a dedicated io manager for Snowflake
Let me know if this doesn't help, once you're unblockedJoel Olazagasti
04/26/2023, 1:39 PM1.2.6
?Tim Castillo
04/26/2023, 1:43 PMJoel Olazagasti
04/26/2023, 1:52 PMJoel Olazagasti
05/12/2023, 2:59 PM1.3.4
! Thanks Tim