Jacob Marcil
03/01/2023, 12:49 AMassets
or for ops
but none with both of them together.
If I’m building my DAG using software defined assets
and I don’t want to save the result of some of the assets
, my thoughts would be to convert the to an Op
so no io_manager
would be configured and that would result on not having this Op
getting save to disk.
The problem I have is I’m not able to convert assets
to op
the same way I’m linking my assets
.
What’s the Dagster Way
of doing this?
Should I just create basic functions for thoses calls?
If you take the example below.
@asset(required_resource_keys={"snowflake"})
def get_data_from_snowflake(context):
query = "select * from my_table"
df = context.resources.snowflake.execute_query(query, use_pandas_result=True)
return df.unique().tolist()
@asset
def get_all_data(context, get_data_from_snowflake):
client = MyClient()
resp = client.get_data(get_data_from_snowflake)
return resp
How can I change the function get_data_from_snowflake
so it’s not getting save to disk, but while still keeping the option to use that as the input of get_all_data
?Tim Castillo
03/01/2023, 3:20 AMget_data_from_snowflake
, I'd recommend modifying your SQL query to do all that compute and storage in Snowflake, ex.
create or replace table unique_records as (
select distinct * from my_table
)
side-note: I would suggest de-duping with a `row_number`+`qualify` on a specific primary key to reduce those query costs 😉
Then, don't return anything in your get_data_from_snowflake
asset. Assets don't need to return data or use I/O managers if you don't need them to.
Now you can modify your get_all_data
asset to not have the get_data_from_snowflake
asset as a parameter, but rather add it as a non_argument_deps
to the @asset
decorator. This way, if you have a job of these two assets, get_all_data
won't try materializing without get_data_from_snowflake
materializing first. That said, if you didn't want your data from get_all_data
saved via an I/O manager, you likely also don't want your data from get_all_data to persist too. So you'll might want to consider following the same pattern in this asset too.
---
If you want an even more "Dagster way" to do this, I'd recommend using the Snowflake I/O manager and using partitions to reduce the amount of data loaded into memory during assets. You get 3 benefits this way:
• minimized compute from Snowflake because you're only querying a partition of data at a time
• with reasonable partitioning, you should likely be able to fit a whole partition of data into memory for compute
• the Snowflake I/O manager will read and write data from memory into Snowflake tables for you, so none of your data ever gets stored to the disk of the compute engineJacob Marcil
03/01/2023, 2:05 PMnon_argument_deps
makes Dagster wait for the completion of the parent asset before executing any downstream assets. That’s perfect.
But, in my usecase my get_data_from_snowflake
will be use as an input in my get_all_data
.
Let’s take an example: I query Snowflake to get a list of account in get_data_from_snowflake
, and I’ll query this list on another service with get_all_data
.
If I set up a non_argument_deps
I won’t have the list in accounts in get_all_data
. And I don’t want to save them with an IO/Manager, because they are already in Snowflake and this would create duplicates of the same info for nothing.
I know that I could use Dagster Partitions, but my keys are time
and static_values
and since the multi_partition_key feature is still experimental, I would prefer not to use it.
Basically, I would like to create Python functions that have preconfigured Snowflake query, so all my pipeline can use them. And since those records are already coming from my Data Warehouse I would like to only load them into memory and not persist them anywhere.
From what I understand. (And here is where I’m probably wrong)
• If I use non_argument_deps
in a downstream asset. The Parent will still be materialized. (In our example get_data_from_snowflake
• If I don’t return any value, I won’t have the required data to execute the downstream asset get_all_data
• If I return a value, it will get persist somewhere. If I configure an IO/Manager using _`io_manager_key`,_ this IO Manager will be choosen. If I don’t configure any, the default one will be selected.
• Is there something like io_manager_key=store_data_in_memory
, that will store the data for the current run, and drop it when the dagster container for the run finish?
TLDR; I would like to create Python functions that have preconfigured Snowflake query that are not persist on disk, but can be used as input in other assets,fs_io_manager
and mem_io_manager
I think this is what I’m looking for.
The mem_io_manager
seems to be erroring if I use it with my setup as shown in the image linked to this thread.
Is it not possible to use mem_io_manager
with another persistent io_manager?
Sorry for all of those questions ahaha.
Thanks for your help.io_manager
this override the default io_manager fs_io_manager
.
That’s why nothing was getting saved to disk by default when I was not configuring any io_managers in my asset decorator.
So with that sort out.
The only question left is, is it possible to convert fs_io_manager
to mem_io_manager
or I should keep fs_io_manager
as it’s the dagster
way?sandy
03/01/2023, 4:49 PMmem_io_manager
is very reasonable if you're using the in_process_executor. If you're using an executor that involves multiple processes, then you need an IO manager that can put data in a place that other processes can get at itJacob Marcil
03/01/2023, 5:55 PMclass SnowflakeClient:
def __init__(self):
self.client = snowflake.connector.connect(
account=os.getenv("SNOWFLAKE_ACCOUNT", ""),
user=os.getenv("SNOWFLAKE_USER", ""),
password=os.getenv("SNOWFLAKE_PASSWORD", ""),
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE", ""),
)
def get_delegator_accounts(self, ticker):
query = f"select distinct operator_address from mt_table where ticker = '{ticker}'"
df = self.client.cursor().execute(query).fetch_pandas_all()
return df["OPERATOR_ADDRESS"].tolist()
@asset
def get_all_grt_rewards(context) -> dict:
grt_client = GRTClient()
snowflake_client = SnowflakeClient()
result = snowflake_client.get_delegator_accounts("GRT")
resp = grt_client.get_rewards_for_multiple_addresses(result)
return resp
Option 2 (Create an asset for the query)
This is good, but the problem is that I can’t add any parameter to the function.
@asset(
required_resource_keys={"snowflake"},
)
def get_delegator_accounts_2(context):
query = "select distinct operator_address from my_table where ticker = 'GRT'"
df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
return df_accounts["OPERATOR_ADDRESS"].tolist()
@asset
def get_all_grt_rewards(context, get_delegator_accounts_2) -> dict:
grt_client = GRTClient()
resp = grt_client.get_rewards_for_multiple_addresses(get_delegator_accounts_2)
return resp
Option 3 (Not working, since ticker
can’t be provided)
@asset(
required_resource_keys={"snowflake"},
)
def get_delegator_accounts_3(context, ticker):
query = f"select distinct operator_address from my_table where ticker = '{ticker}'"
df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
return df_accounts["OPERATOR_ADDRESS"].tolist()
Option 4 (This works, but I all partition would be run for all of my downstream assets, and the filter should filter out which asset receive the results)
@asset(
partitions_def=StaticPartitionsDefinition(["GRT", "OTHER"]),
required_resource_keys={"snowflake"},
)
def get_delegator_accounts_3(context):
partition = context.asset_partition_key_for_output()
query = f"select distinct operator_address from my_table where ticker = '{partition}'"
df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
return df_accounts["OPERATOR_ADDRESS"].tolist()
Option 5 (This works, but the configuration seems overly complicated to be able to route the good partition to the good downstream asset)
@multi_asset(
partitions_def=StaticPartitionsDefinition(["GRT", "OTHER"]),
required_resource_keys={"snowflake"},
outs={
"GRT": AssetOut(
metadata={
"priority": "high",
},
io_manager_key="io_manager",
key=AssetKey("get_all_grt_rewards"),
),
"OTHER": AssetOut(
is_required=False,
metadata={
"priority": "low",
},
io_manager_key="io_manager",
key=AssetKey("other_assets"),
),
},
can_subset=False,
)
def get_delegator_accounts_3(context):
partition = context.asset_partition_key_for_output()
query = f"select distinct operator_address from my_table where ticker = '{partition}'"
df_accounts = context.resources.snowflake.execute_query(query, use_pandas_result=True)
yield Output(value=df_accounts["OPERATOR_ADDRESS"].tolist(), output_name=partition) #Partition here could be the name of the configured OUT
I’m sure I’m missing a basic concept so this is resolved, but I can’t find which one 😞Zach
03/01/2023, 7:19 PMclass SnowflakeClient:
def __init__(self):
self.client = snowflake.connector.connect(
account=os.getenv("SNOWFLAKE_ACCOUNT", ""),
user=os.getenv("SNOWFLAKE_USER", ""),
password=os.getenv("SNOWFLAKE_PASSWORD", ""),
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE", ""),
)
def get_delegator_accounts(self, ticker):
query = f"select distinct operator_address from mt_table where ticker = '{ticker}'"
df = self.client.cursor().execute(query).fetch_pandas_all()
return df["OPERATOR_ADDRESS"].tolist()
@resource
def snowflake_client():
return SnowflakeClient()
@resourc
def grt_client():
return GRTClient()
@asset
def get_all_grt_rewards(context) -> dict:
grt_client = context.resources.grt_client
snowflake_client = context.resources.snowflake_client
result = snowflake_client.get_delegator_accounts("GRT")
resp = grt_client.get_rewards_for_multiple_addresses(result)
return resp
that seems the cleanest to me. Does that address your comment "...but I’m sad that I can’t use the built in function of Dagster to provide the ressource to Snowflake the function"Jacob Marcil
03/01/2023, 8:02 PM