https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Cody Scott

06/27/2023, 6:45 PM
Hi! Looking for some feedback on managing partitions in a custom IO Manager. I've subclassed from
ConfigurableIOManager
to access SQL Server. What i'm a little stuck on is the best way to approach partitions. Looking at the implementations in snowflake/duckdb there seems to be calls to
dagster._core.storage.db_io_manager
to help manage some of the details like timewindows. these look like protected (as much as you can) in python apis. For the dev side, are we expected to utilize that for partitions? Right now i settled on using the
partition_expr
format for identifying the partition columns, and using a dict for multipartitions
Copy code
@asset(
        partitions_def=MultiPartitionsDefinition(
            {
                'date': DailyPartitionsDefinition(start_date=datetime(2023,6,1)),
                'tag': StaticPartitionsDefinition([
                    "value1",
                    "value2"
                ])
            }
        ), 
        io_manager_key='mssql_io_manager', 
        key_prefix=['data'],
        metadata={
            'partition_expr': {
                'date': 'data_datetime',
                'tag': 'source'
            }
        }
    )

#--- non multi

@asset(
        partitions_def=DailyPartitionsDefinition(start_date=datetime(2023,6,1)), 
        io_manager_key='mssql_io_manager', 
        key_prefix=['data'],
        metadata={
            'partition_expr': 'data_datetime'
        }
    )

#--static version
@asset(
        partitions_def=StaticPartitionsDefinition([
                    "value1",
                    "value2"
                ]), 
        io_manager_key='mssql_io_manager', 
        key_prefix=['data'],
        metadata={
            'partition_expr': 'source'
        }
    )
Then in my IO manager i access it by checking for a multi, then trying to parse from there. Seems clunky though, any suggestions on a better structure? I've basically made it fixed to look for
date
if there is a date partition, so i'll need to always add that, which is ok. Open to ideas.
Copy code
def _partition_where_clause(self, context: OutputContext) -> str:
        if not context.has_asset_partitions:
            return
        
        partition_expr = context.metadata.get('partition_expr')
        
        x = []
        if isinstance(context.partition_key, MultiPartitionKey):
            for _ in partition_expr:
                if _ == 'date':
                    # date stuff
                    ...
                    tw = self._time_window_where_clause(
                        context.asset_partitions_time_window,
                        partition_expr[_]
                    )
                    x.append(tw)
                else:
                    partition_keys = context.asset_partition_key.keys_by_dimension
                    x.append(self._static_where_clause(
                        partition_keys[_],
                        partition_expr[_]
                    ))
        
        else:
            try:
                time_window = context.asset_partitions_time_window
                x.append(self._time_window_where_clause(time_window, partition_expr))
            except:
                x.append(self._static_where_clause(context.partition_key, partition_expr))
        
        joined_data = " AND ".join(x)
        final_where = f'WHERE {joined_data}'
        return final_where
Full code here: https://gist.github.com/namur007/c01ba67835619bc1221bfbe15d20b6e0 Happy to share an implementation afterwards as well for others looking to access sql server/pandas
🤖 1
s

sean

06/27/2023, 11:30 PM
Hi Cody,
Looking at the implementations in snowflake/duckdb there seems to be calls to dagster._core.storage.db_io_manager to help manage some of the details like timewindows. these look like protected (as much as you can) in python apis. For the dev side, are we expected to utilize that for partitions?
So,
DbIOManager
is internal (not part of the public API) and isn’t finalized-- so I would say it can serve as a good source of inspiration for your own design, and you could try copying the pattern of snowflake/duckdb (inheriting from
ConfigurableIOManagerFactory
and generating instances of
DbIOManager
) but we can’t guarantee it won’t break.
Seems clunky though, any suggestions on a better structure?
Unfortunately I can’t really offer a better suggestion at present-- when we make
DbIOManager
public then we can provide provide a streamlined API/guide/pattern, but for now what you’re doing seems reasonable.
c

Cody Scott

06/27/2023, 11:33 PM
I can only dream of moving off prem before you finalize so I can sunset this task… if not I look forward to the updates! Cheers!
2 Views