Cody Scott
06/27/2023, 6:45 PMConfigurableIOManager
to access SQL Server. What i'm a little stuck on is the best way to approach partitions.
Looking at the implementations in snowflake/duckdb there seems to be calls to dagster._core.storage.db_io_manager
to help manage some of the details like timewindows. these look like protected (as much as you can) in python apis. For the dev side, are we expected to utilize that for partitions?
Right now i settled on using the partition_expr
format for identifying the partition columns, and using a dict for multipartitions
@asset(
partitions_def=MultiPartitionsDefinition(
{
'date': DailyPartitionsDefinition(start_date=datetime(2023,6,1)),
'tag': StaticPartitionsDefinition([
"value1",
"value2"
])
}
),
io_manager_key='mssql_io_manager',
key_prefix=['data'],
metadata={
'partition_expr': {
'date': 'data_datetime',
'tag': 'source'
}
}
)
#--- non multi
@asset(
partitions_def=DailyPartitionsDefinition(start_date=datetime(2023,6,1)),
io_manager_key='mssql_io_manager',
key_prefix=['data'],
metadata={
'partition_expr': 'data_datetime'
}
)
#--static version
@asset(
partitions_def=StaticPartitionsDefinition([
"value1",
"value2"
]),
io_manager_key='mssql_io_manager',
key_prefix=['data'],
metadata={
'partition_expr': 'source'
}
)
Then in my IO manager i access it by checking for a multi, then trying to parse from there. Seems clunky though, any suggestions on a better structure? I've basically made it fixed to look for date
if there is a date partition, so i'll need to always add that, which is ok. Open to ideas.
def _partition_where_clause(self, context: OutputContext) -> str:
if not context.has_asset_partitions:
return
partition_expr = context.metadata.get('partition_expr')
x = []
if isinstance(context.partition_key, MultiPartitionKey):
for _ in partition_expr:
if _ == 'date':
# date stuff
...
tw = self._time_window_where_clause(
context.asset_partitions_time_window,
partition_expr[_]
)
x.append(tw)
else:
partition_keys = context.asset_partition_key.keys_by_dimension
x.append(self._static_where_clause(
partition_keys[_],
partition_expr[_]
))
else:
try:
time_window = context.asset_partitions_time_window
x.append(self._time_window_where_clause(time_window, partition_expr))
except:
x.append(self._static_where_clause(context.partition_key, partition_expr))
joined_data = " AND ".join(x)
final_where = f'WHERE {joined_data}'
return final_where
Full code here: https://gist.github.com/namur007/c01ba67835619bc1221bfbe15d20b6e0
Happy to share an implementation afterwards as well for others looking to access sql server/pandassean
06/27/2023, 11:30 PMLooking at the implementations in snowflake/duckdb there seems to be calls to dagster._core.storage.db_io_manager to help manage some of the details like timewindows. these look like protected (as much as you can) in python apis. For the dev side, are we expected to utilize that for partitions?So,
DbIOManager
is internal (not part of the public API) and isn’t finalized-- so I would say it can serve as a good source of inspiration for your own design, and you could try copying the pattern of snowflake/duckdb (inheriting from ConfigurableIOManagerFactory
and generating instances of DbIOManager
) but we can’t guarantee it won’t break.
Seems clunky though, any suggestions on a better structure?Unfortunately I can’t really offer a better suggestion at present-- when we make
DbIOManager
public then we can provide provide a streamlined API/guide/pattern, but for now what you’re doing seems reasonable.Cody Scott
06/27/2023, 11:33 PM