Hey guys, I am still struggling with this one. I'v...
# ask-community
g
Hey guys, I am still struggling with this one. I've created a stackoverflow issue here: Now to solve the issue that I find in the proposed answer, I want to give the asset a specific io_manager. Is it possible to achieve this without subscribing them to the
Definition(resources={...})
?
Copy code
@asset(io_manager_def='parquet_io_manager',
       partitions_def=hourly_partitions)
def my_custom_df(context) -> pd.DataFrame:
    start, end = context.asset_partitions_time_window_for_output()

    df = pd.DataFrame({'timestamp': pd.date_range(start, end, freq='5T')})
    df['count'] = df['timestamp'].map(lambda a:  random.randint(1, 1000))
    return df



# @asset(io_manager_def=PartitionedParquetIOManager(base_path=os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), time_column='timestamp'),
#        partitions_def=daily_partitions)

# @asset(io_manager_def={'resource_fn': PartitionedParquetIOManager,
#                        'config_schema': {'base_path': os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), 'time_column': 'timestamp'}},
#        partitions_def=daily_partitions)

@asset(io_manager_def=IOManagerDefinition(resource_fn=PartitionedParquetIOManager,
                                          config_schema={'base_path': os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), 'time_column': 'timestamp'}),
       partitions_def=daily_partitions)
def another_custom_df(context, my_custom_df: pd.DataFrame) -> pd.DataFrame:
    return my_custom_df.set_index('timestamp').resample('H').agg({'count': 'mean'})
I am getting a
DagsterInvalidConfigDefinitionError
because a value cannot be resolved
🤖 1
OK I think that I've figured it out. I need to use the
@io_manager
decorator then I can provide an object that can call the
.configured()
method. I've added another answer to the stackoverflow post. Any comment would be appreciated.
s
Hi Guillaume, Trying to wrap my head around what you’re trying to do here and make sure you’re using the latest APIs. Confused by this part of your SO post:
Now I have an issue if the same asset is used with different paritions sizes because the names not necesserally are the same during the reading and writing of the files. e.g. I have some 1h partitions asset and some 1d partitions asset using the same base asset.
Could you elaborate on this? In Dagster an asset can have one and only one partitions definition.
g
Firstly thank you for looking at my problem, the way you are asking for more information about the issue makes me believe that I am trying to force pushing a square piece through a circle hole... I am using dagster 1.3.2
In Dagster an asset can have one and only one partitions definition.
Imagine you have an asset that have an hourly partition. I am using the start & end of the partition to write an unique file per partition
Copy code
# in my io_manager
def handle_output(self, context: OutputContext, obj: pandas.DataFrame):
    path = self._get_path(self, context)
    obj.to_parquet(path=path, index=False)
def _get_path(self, context) -> Path:
        path = self._base_path / '/'.join(context.asset_key.path)
    start, end = context.asset_partitions_time_window
            dt_format = "%Y-%m-%d_%H%M%S"
            partition_str = start.strftime(dt_format) + "_" + end.strftime(dt_format)
    path = path / f'{partition_str}.parquet'
    return path
As long as I am using the same time window, I can read the same file from
_get_path()
. Then imagine that we want the values contained in this asset but in a different granularity. Lets say daily. Then when I am in the context of the new asset and I try to read the files, since I have a different time window, the file generated by the
_get_path()
does not exist and I have an error saying exactly that. Starting from that, I've tried to use the
filters
parameter from
pd.read_parquet
. Does it makes any sense or even there I should have done differently?
s
I see-- so I don’t exactly understand what “filtering” has to do with the problem, but IIUC your IO manager needs, given the start and end of the time window, to figure out some set of files to write/read specific slices to/from. In the case where your partitions definition is aligned with the scheme that you are using to write files, then that’s straightforward-- you just resolve the start/end to a path and read/write the whole file. In the case where the partitions definition is not aligned, then one file does not necessarily correspond to the data you want to load. So in that case,
_get_path
doesn’t really make sense, you really need to get a set of (path, slice) pairs. You can do this with custom logic, an IO manager does not need to implement
_get_path
-- just
load_input
and
handle_output
.
g
In the case where the partitions definition is not aligned, then one file does not necessarily correspond to the data you want to load. So in that case,
_get_path
doesn’t really make sense, you really need to get a set of (path, slice) pairs.
yes that's it.
You can do this with custom logic, an IO manager does not need to implement
_get_path
-- just
load_input
and
handle_output
.
That's what I did, using the
filters
parameter (
pd.read_parquet(folder_path, filters=[('timestamp', '>=', start), ('timestamp', '<', end)])
),
start
and
end
being the partition thresholds. At this point I wanted to implement it in a way to parameterize the column on which the comparison is done. And I had a tough time understanding how it is possible to communicate from the asset to the io_manager. I mean it is still not perfectly clear but I manage to do it in two different ways. The first is using a
resource_def
and a custom
Config
class. The second by call the
@io_manager
decorator to create an object that can be provided to the
@asset
decorator. I guess that the second method is preferred.
s
the
@io_manager
decorator pattern is older and will eventually be deprecated/deleted
g
ty, i'll look at that to avoid using deprecated features.