What would be the best way to extract a data versi...
# integration-snowflake
j
What would be the best way to extract a data version (ideally generated from an
updated_at
field or similar) from an
observable_source_asset
that has a Snowflake IO manager?
t
Have you thought about connecting to the Snowflake resource and running a query to get it? https://docs.dagster.io/integrations/snowflake/reference#executing-custom-sql-commands-with-the-snowflake-resource
j
That's what I was attempting. With code something like:
Copy code
@observable_source_asset(
    key_prefix = ['database', 'schema']
    io_manager_key = 'snowflake_io'
    group_name = 'group'
)
def observed_source_asset(context):
    result = context.resources.snowflake_io.execute_query(
        'SELECT max(updated_at) as updated_at',
         fetch_results = True
         )
    version = stringify_result(result)
    return DataVersion(version)
But whenever I try to run
observe sources
I get
___ASSET_JOB_ cannot be executed with the provided config)
error. Am I access the io manager in an incorrect fashion? It's strange because I don't even get any local console output, just the error in the UI.
t
Have you added the
required_resource_keys
onto your definition?
j
I have
Definitions(... resources={"snowflake_io": snowflake_io} ...)
at the top level of my package. It's the same resource I'm using for the functioning unobserved sources, currently. I don't think this configuration changed, but just in case, I am pinned to version
1.2.6
right now, pending a bugfix with airbyte resource declaration.
t
Sorry, to clarify: Have you added
required_resource_keys
to the
@observable_source_asset
def? For reference, the example in the doc has:
Copy code
@asset(required_resource_keys={"snowflake"})
def small_petals(context):
  return context.resources.snowflake.execute_query(
    (
      'SELECT * FROM IRIS_DATASET WHERE "petal_length_cm)" < 1 AND'
      ' "petal_width_cm" < 1'
    ),
    fetch_results=True,
    use_pandas_result=True,
  )
Also, you'd have a separate resource declared for the io manager and another for raw resource usage (ex.
snowflake.execute_query
.
Also, wanna send me the link to the gh issue for the airbyte bug? I can help get an update/move that forward, if you'd like
j
It may be that I'm confused as to the full scope of setting up an observed source asset. I'm following the template laid out here in the docs. the
@observed_source_asset
decorator doesn't have a
required_resource_key
param, as far as I can tell. Should I be defining an
Asset
and an
observable_source_asset
? I did just try explicitly setting the resource via the
resource_defs
paramater, to no avail.
t
Hi! okay gimme a couple seconds so I can type out a full example
j
Awesome, thanks! and RE: Airbyte bug, it doesn't look like there's an issue opened yet, I was piggybacking off of this support thread. I'd be happy to open one, if needed
t
aaah, I see part of the confusion, my bad. we didn't add
required_resource_keys
as a param until recently:
j
Ahh, makes sense. No worries, that's not a super urgent issue for us. Our manually updated sources we need to observe don't update that often, so we can trigger downstream materializations manually for now. Once the duplicate resource key issue from 1.3 is fixed, that should set me up 👍
t
Here's a small example to help illustrate how it'd work:
Copy code
from dagster import DataVersion, Definitions, asset, observable_source_asset
from dagster_snowflake import snowflake_resource, snowflake_pandas_io_manager


@observable_source_asset(
    require_resource_keys={'snowflake'},
    key_prefix = ['database', 'schema'],
)
def observed_source_asset(context):
    result = context.resources.snowflake.execute_query(
        'select max(updated_at) from table_name',
         fetch_results = True
    )
    return DataVersion(str(result[0][0]))

@asset(
   io_manager_key='db_io_manager'
)
def downstream_asset(observable_source_asset):
    return observable_source_asset.groupby('date').sum()

SNOWFLAKE_CREDS = {
    'account': 'account',
    'etc': 'etc'
}

defs = Definitions(
    assets=[observed_source_asset, downstream_asset],
    resources={
        'snowflake': snowflake_resource.configured(SNOWFLAKE_CREDS),
        'db_io_manager': snowflake_pandas_io_manager.configured(SNOWFLAKE_CREDS)
    },
)
Notice that there are two resources:
snowflake
and
db_io_manager
.
snowflake
is just a vanilla resource that you can use to run
execute_query
on, and
db_io_manager
is a dedicated io manager for Snowflake Let me know if this doesn't help, once you're unblocked
j
Thanks Tim! I'll keep you posted. Is this concept of vanilla resource vs dedicated IO manager something that's new with the concept of pythonic resources, or is it something I should also be looking out for in the meantime on
1.2.6
?
t
Hahaha, I dubbed them as vanilla yesterday. Vanilla/generic resources pre-date I/O managers. They are usable in 1.2.6 You can think of resources as ways to connect to external services and I/O managers as a special type of resource that figure out how to write/read fromthe external storage and figure out what type the data should be in-memory for you. I'm going to add the disclaimer that our tutorial doesn't touch resources yet as I'm in the middle of writing that section about it, so apologies for that.
j
Understood, I suppose I've just managed this far without the need for a non-I/O manager resource. There's definitely a lot of power & flexibility in Dagster we haven't fully utilized yet.
Just wanted to let you know that this worked perfectly now that I'm on
1.3.4
! Thanks Tim
daggy love 1