https://dagster.io/ logo
Title
j

Jeremy

12/12/2022, 7:07 PM
i have a job where every hour I want to call an api and then write the output to a table in snowflake. I do not think i need an asset, i can’t run a backfill - the API is only “value now”. what’s best practices for this kind of job? is there a simple example somewhere?
is this a use case for a custom IO_MANAGER?
n

nickvazz

12/12/2022, 7:42 PM
It sounds like this would fit into being an asset with a schedule without requiring a custom IO manager https://docs.dagster.io/integrations/snowflake
j

Jeremy

12/12/2022, 7:43 PM
asset with a schedule, and no partitions? by leaving he partitions off, it won’t offer backfills?
n

nickvazz

12/12/2022, 7:45 PM
You said you dont need backfills? And only the most recent run is valid?
j

Jeremy

12/12/2022, 7:45 PM
i want to retain the history of all previous runs
so i run regularly and generate a historical time series
n

nickvazz

12/12/2022, 7:53 PM
Ahh okay, so you cant re-create old data but you still want to remember what it was Seems like you could then use a partitioned asset like this: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#defining-partitioned-assets using an hourly partition definition https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#partitioned-asset-jobs and then build the schedule from the partitioned job https://docs.dagster.io/_apidocs/schedules-sensors#dagster.build_schedule_from_partitioned_job I am not sure how you could turn off the ability to backfill since it sounds like it would corrupt historical data
j

Jeremy

12/12/2022, 7:55 PM
this is exactly the problem I’m trying to solve
I am not sure how you could turn off the ability to backfill since it sounds like it would corrupt historical data
n

nickvazz

12/12/2022, 8:00 PM
You might be able to check the tag of the run at runtime to see if it is a backfill (since it will auto tag them) and not call your operations and/or send a notification somewhere that something was trying to backfill?
j

Jeremy

12/12/2022, 8:00 PM
this is a pretty basic use case. there should be a standard answer.
like wire up an io_manager to the op.
or something simple.
s

sandy

12/12/2022, 10:16 PM
i want to retain the history of all previous runs
does this mean that you want to append to the snowflake table each time this runs? or do you want to overwrite it?
j

Jeremy

12/12/2022, 10:41 PM
append only. yes
s

sandy

12/12/2022, 10:49 PM
got it - we don't currently have support for appending, with the snowflake IO manager. if you'd be up for filing an issue, we might be able to add support for it in the future. in the mean time, the easiest thing is likely to just roll your own code without trying to use the snowflake IO manager. something like:
from dagster import asset
from snowflake import connector  # pylint: disable=no-name-in-module
from dagster_snowflake.resources import SnowflakeConnection
from snowflake.connector.pandas_tools import pd_writer


@asset
def my_table(context) -> None:
    dataframe = ...

    connector.paramstyle = "pyformat"
    con = SnowflakeConnection(
        dict(
            schema=...,
            connector="sqlalchemy",
            database=...,
        ),
        context.log,
    ).get_connection(raw_conn=False)

    dataframe.to_sql(
        "my_table",
        con=con.engine,
        if_exists="append",
        index=False,
        method=pd_writer,
    )
j

Jeremy

12/12/2022, 11:04 PM
ok. thanks