i have a job where every hour I want to call an ap...
# ask-community
j
i have a job where every hour I want to call an api and then write the output to a table in snowflake. I do not think i need an asset, i can’t run a backfill - the API is only “value now”. what’s best practices for this kind of job? is there a simple example somewhere?
is this a use case for a custom IO_MANAGER?
n
It sounds like this would fit into being an asset with a schedule without requiring a custom IO manager https://docs.dagster.io/integrations/snowflake
j
asset with a schedule, and no partitions? by leaving he partitions off, it won’t offer backfills?
n
You said you dont need backfills? And only the most recent run is valid?
j
i want to retain the history of all previous runs
so i run regularly and generate a historical time series
n
Ahh okay, so you cant re-create old data but you still want to remember what it was Seems like you could then use a partitioned asset like this: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#defining-partitioned-assets using an hourly partition definition https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#partitioned-asset-jobs and then build the schedule from the partitioned job https://docs.dagster.io/_apidocs/schedules-sensors#dagster.build_schedule_from_partitioned_job I am not sure how you could turn off the ability to backfill since it sounds like it would corrupt historical data
j
this is exactly the problem I’m trying to solve
I am not sure how you could turn off the ability to backfill since it sounds like it would corrupt historical data
n
You might be able to check the tag of the run at runtime to see if it is a backfill (since it will auto tag them) and not call your operations and/or send a notification somewhere that something was trying to backfill?
j
this is a pretty basic use case. there should be a standard answer.
like wire up an io_manager to the op.
or something simple.
s
i want to retain the history of all previous runs
does this mean that you want to append to the snowflake table each time this runs? or do you want to overwrite it?
j
append only. yes
s
got it - we don't currently have support for appending, with the snowflake IO manager. if you'd be up for filing an issue, we might be able to add support for it in the future. in the mean time, the easiest thing is likely to just roll your own code without trying to use the snowflake IO manager. something like:
Copy code
from dagster import asset
from snowflake import connector  # pylint: disable=no-name-in-module
from dagster_snowflake.resources import SnowflakeConnection
from snowflake.connector.pandas_tools import pd_writer


@asset
def my_table(context) -> None:
    dataframe = ...

    connector.paramstyle = "pyformat"
    con = SnowflakeConnection(
        dict(
            schema=...,
            connector="sqlalchemy",
            database=...,
        ),
        context.log,
    ).get_connection(raw_conn=False)

    dataframe.to_sql(
        "my_table",
        con=con.engine,
        if_exists="append",
        index=False,
        method=pd_writer,
    )
j
ok. thanks