Hey, This seems like a basic question, but I can’t...
# ask-community
j
Hey, This seems like a basic question, but I can’t find the info anywhere 😞. I’ve seen a lot of docs for
assets
or for
ops
but none with both of them together. If I’m building my DAG using
software defined assets
and I don’t want to save the result of some of the
assets
, my thoughts would be to convert the to an
Op
so no
io_manager
would be configured and that would result on not having this
Op
getting save to disk. The problem I have is I’m not able to convert
assets
to
op
the same way I’m linking my
assets
. What’s the
Dagster Way
of doing this? Should I just create basic functions for thoses calls? If you take the example below.
Copy code
@asset(required_resource_keys={"snowflake"})
def get_data_from_snowflake(context):
    query = "select * from my_table"

    df = context.resources.snowflake.execute_query(query, use_pandas_result=True)

    return df.unique().tolist()


@asset
def get_all_data(context, get_data_from_snowflake):
    client = MyClient()

    resp = client.get_data(get_data_from_snowflake)

    return resp
How can I change the function
get_data_from_snowflake
so it’s not getting save to disk, but while still keeping the option to use that as the input of
get_all_data
?
🤖 1
t
Hey there! It sounds like you have a pipeline of assets, and one of those assets doesn't actually want to persist its result to its I/O manager? I'll start by providing a solution to problem you outline in your code, and let me know if you'd like to answers to any of the tangential questions you've also asked. The tl;dr is that this page should hopefully have the "Dagster Way" practices that you seek https://docs.dagster.io/tutorial/assets/non-argument-deps In your
get_data_from_snowflake
, I'd recommend modifying your SQL query to do all that compute and storage in Snowflake, ex.
Copy code
create or replace table unique_records as (
   select distinct * from my_table
)
side-note: I would suggest de-duping with a `row_number`+`qualify` on a specific primary key to reduce those query costs 😉 Then, don't return anything in your
get_data_from_snowflake
asset. Assets don't need to return data or use I/O managers if you don't need them to. Now you can modify your
get_all_data
asset to not have the
get_data_from_snowflake
asset as a parameter, but rather add it as a
non_argument_deps
to the
@asset
decorator. This way, if you have a job of these two assets,
get_all_data
won't try materializing without
get_data_from_snowflake
materializing first. That said, if you didn't want your data from
get_all_data
saved via an I/O manager, you likely also don't want your data from get_all_data to persist too. So you'll might want to consider following the same pattern in this asset too. --- If you want an even more "Dagster way" to do this, I'd recommend using the Snowflake I/O manager and using partitions to reduce the amount of data loaded into memory during assets. You get 3 benefits this way: • minimized compute from Snowflake because you're only querying a partition of data at a time • with reasonable partitioning, you should likely be able to fit a whole partition of data into memory for compute • the Snowflake I/O manager will read and write data from memory into Snowflake tables for you, so none of your data ever gets stored to the disk of the compute engine
j
Thank you for taking the time to explain all of this. May I ask other questions? So I think I wasn’t clear enough in what I want to achieve. Sorry about that. The
non_argument_deps
makes Dagster wait for the completion of the parent asset before executing any downstream assets. That’s perfect. But, in my usecase my
get_data_from_snowflake
will be use as an input in my
get_all_data
. Let’s take an example: I query Snowflake to get a list of account in
get_data_from_snowflake
, and I’ll query this list on another service with
get_all_data
. If I set up a
non_argument_deps
I won’t have the list in accounts in
get_all_data
. And I don’t want to save them with an IO/Manager, because they are already in Snowflake and this would create duplicates of the same info for nothing. I know that I could use Dagster Partitions, but my keys are
time
and
static_values
and since the multi_partition_key feature is still experimental, I would prefer not to use it. Basically, I would like to create Python functions that have preconfigured Snowflake query, so all my pipeline can use them. And since those records are already coming from my Data Warehouse I would like to only load them into memory and not persist them anywhere. From what I understand. (And here is where I’m probably wrong) • If I use
non_argument_deps
in a downstream asset. The Parent will still be materialized. (In our example
get_data_from_snowflake
• If I don’t return any value, I won’t have the required data to execute the downstream asset
get_all_data
• If I return a value, it will get persist somewhere. If I configure an IO/Manager using _`io_manager_key`,_ this IO Manager will be choosen. If I don’t configure any, the default one will be selected. • Is there something like
io_manager_key=store_data_in_memory
, that will store the data for the current run, and drop it when the dagster container for the run finish? TLDR; I would like to create Python functions that have preconfigured Snowflake query that are not persist on disk, but can be used as input in other assets,
Ok so it appears that there’s 2 built in io_manager for this
fs_io_manager
and
mem_io_manager
I think this is what I’m looking for. The
mem_io_manager
seems to be erroring if I use it with my setup as shown in the image linked to this thread. Is it not possible to use
mem_io_manager
with another persistent io_manager? Sorry for all of those questions ahaha. Thanks for your help.
Ok update on this. Turns out when you create a ressource called
io_manager
this override the default io_manager
fs_io_manager
. That’s why nothing was getting saved to disk by default when I was not configuring any io_managers in my asset decorator. So with that sort out. The only question left is, is it possible to convert
fs_io_manager
to
mem_io_manager
or I should keep
fs_io_manager
as it’s the
dagster
way?
s
Using
mem_io_manager
is very reasonable if you're using the in_process_executor. If you're using an executor that involves multiple processes, then you need an IO manager that can put data in a place that other processes can get at it
👍 1
j
Ok so, hopefully, this would be my last message on this topic. Really sorry to bother you both again. So if my goal is to create a function that contains an SQL query to be able to use it with different assets with some minor filtering. Which option should I take? I’m really trying to find out a “good” solution, but none of the option I’ve found seems to be optimal. Here are the option that I could take... But all seems terrible. Option 1 (Create a Snowflake class and function and pass a variable directly) This seems like the best option of all, but I’m sad that I can’t use the built in function of Dagster to provide the ressource to Snowflake the function.
Copy code
class SnowflakeClient:
    def __init__(self):
        self.client = snowflake.connector.connect(
            account=os.getenv("SNOWFLAKE_ACCOUNT", ""),
            user=os.getenv("SNOWFLAKE_USER", ""),
            password=os.getenv("SNOWFLAKE_PASSWORD", ""),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE", ""),
        )

    def get_delegator_accounts(self, ticker):
        query = f"select distinct operator_address from mt_table where ticker = '{ticker}'"
        df = self.client.cursor().execute(query).fetch_pandas_all()

        return df["OPERATOR_ADDRESS"].tolist()

@asset
def get_all_grt_rewards(context) -> dict:
    grt_client = GRTClient()
    snowflake_client = SnowflakeClient()
    result = snowflake_client.get_delegator_accounts("GRT")

    resp = grt_client.get_rewards_for_multiple_addresses(result)

    return resp
Option 2 (Create an asset for the query) This is good, but the problem is that I can’t add any parameter to the function.
Copy code
@asset(
    required_resource_keys={"snowflake"},
)
def get_delegator_accounts_2(context):
    query = "select distinct operator_address from my_table where ticker = 'GRT'"

    df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
    
    return df_accounts["OPERATOR_ADDRESS"].tolist()

@asset
def get_all_grt_rewards(context, get_delegator_accounts_2) -> dict:
    grt_client = GRTClient()
    
    resp = grt_client.get_rewards_for_multiple_addresses(get_delegator_accounts_2)

    return resp
Option 3 (Not working, since
ticker
can’t be provided)
Copy code
@asset(
    required_resource_keys={"snowflake"},
)
def get_delegator_accounts_3(context, ticker):
    query = f"select distinct operator_address from my_table where ticker = '{ticker}'"

    df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
    
    return df_accounts["OPERATOR_ADDRESS"].tolist()
Option 4 (This works, but I all partition would be run for all of my downstream assets, and the filter should filter out which asset receive the results)
Copy code
@asset(
    partitions_def=StaticPartitionsDefinition(["GRT", "OTHER"]),
    required_resource_keys={"snowflake"},
)
def get_delegator_accounts_3(context):
    partition = context.asset_partition_key_for_output()
    query = f"select distinct operator_address from my_table where ticker = '{partition}'"

    df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
    
    return df_accounts["OPERATOR_ADDRESS"].tolist()
Option 5 (This works, but the configuration seems overly complicated to be able to route the good partition to the good downstream asset)
Copy code
@multi_asset(
    partitions_def=StaticPartitionsDefinition(["GRT", "OTHER"]),
    required_resource_keys={"snowflake"},
    outs={
        "GRT": AssetOut(
            metadata={
                "priority": "high",
            },
            io_manager_key="io_manager",
            key=AssetKey("get_all_grt_rewards"),
        ),
        "OTHER": AssetOut(
            is_required=False,
            metadata={
                "priority": "low",
            },
            io_manager_key="io_manager",
            key=AssetKey("other_assets"),
        ),
    },
    can_subset=False,
)
def get_delegator_accounts_3(context):
    partition = context.asset_partition_key_for_output()
    query = f"select distinct operator_address from my_table where ticker = '{partition}'"

    df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
    
    yield Output(value=df_accounts["OPERATOR_ADDRESS"].tolist(), output_name=partition) #Partition here could be the name of the configured OUT
I’m sure I’m missing a basic concept so this is resolved, but I can’t find which one 😞
z
is there a reason you can't do
Copy code
class SnowflakeClient:
    def __init__(self):
        self.client = snowflake.connector.connect(
            account=os.getenv("SNOWFLAKE_ACCOUNT", ""),
            user=os.getenv("SNOWFLAKE_USER", ""),
            password=os.getenv("SNOWFLAKE_PASSWORD", ""),
            warehouse=os.getenv("SNOWFLAKE_WAREHOUSE", ""),
        )

    def get_delegator_accounts(self, ticker):
        query = f"select distinct operator_address from mt_table where ticker = '{ticker}'"
        df = self.client.cursor().execute(query).fetch_pandas_all()

        return df["OPERATOR_ADDRESS"].tolist()

@resource
def snowflake_client():
    return SnowflakeClient()

@resourc
def grt_client():
    return GRTClient()

@asset
def get_all_grt_rewards(context) -> dict:
    grt_client = context.resources.grt_client
    snowflake_client = context.resources.snowflake_client
    result = snowflake_client.get_delegator_accounts("GRT")

    resp = grt_client.get_rewards_for_multiple_addresses(result)

    return resp
that seems the cleanest to me. Does that address your comment "...but I’m sad that I can’t use the built in function of Dagster to provide the ressource to Snowflake the function"
j
Oh right, I think this would work yeah. That looks great. Yahoo. 🙂 Thank you @Zach