Guillaume Latour
05/08/2023, 6:59 PMDefinition(resources={...})
?
@asset(io_manager_def='parquet_io_manager',
partitions_def=hourly_partitions)
def my_custom_df(context) -> pd.DataFrame:
start, end = context.asset_partitions_time_window_for_output()
df = pd.DataFrame({'timestamp': pd.date_range(start, end, freq='5T')})
df['count'] = df['timestamp'].map(lambda a: random.randint(1, 1000))
return df
# @asset(io_manager_def=PartitionedParquetIOManager(base_path=os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), time_column='timestamp'),
# partitions_def=daily_partitions)
# @asset(io_manager_def={'resource_fn': PartitionedParquetIOManager,
# 'config_schema': {'base_path': os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), 'time_column': 'timestamp'}},
# partitions_def=daily_partitions)
@asset(io_manager_def=IOManagerDefinition(resource_fn=PartitionedParquetIOManager,
config_schema={'base_path': os.getenv('DAGSTER_ROOT_DATA_FOLDER', '/tmp'), 'time_column': 'timestamp'}),
partitions_def=daily_partitions)
def another_custom_df(context, my_custom_df: pd.DataFrame) -> pd.DataFrame:
return my_custom_df.set_index('timestamp').resample('H').agg({'count': 'mean'})
I am getting a DagsterInvalidConfigDefinitionError
because a value cannot be resolvedGuillaume Latour
05/08/2023, 7:31 PM@io_manager
decorator then I can provide an object that can call the .configured()
method.
I've added another answer to the stackoverflow post.
Any comment would be appreciated.sean
05/08/2023, 7:43 PMNow I have an issue if the same asset is used with different paritions sizes because the names not necesserally are the same during the reading and writing of the files. e.g. I have some 1h partitions asset and some 1d partitions asset using the same base asset.Could you elaborate on this? In Dagster an asset can have one and only one partitions definition.
Guillaume Latour
05/08/2023, 8:07 PMIn Dagster an asset can have one and only one partitions definition.Imagine you have an asset that have an hourly partition. I am using the start & end of the partition to write an unique file per partition
# in my io_manager
def handle_output(self, context: OutputContext, obj: pandas.DataFrame):
path = self._get_path(self, context)
obj.to_parquet(path=path, index=False)
def _get_path(self, context) -> Path:
path = self._base_path / '/'.join(context.asset_key.path)
start, end = context.asset_partitions_time_window
dt_format = "%Y-%m-%d_%H%M%S"
partition_str = start.strftime(dt_format) + "_" + end.strftime(dt_format)
path = path / f'{partition_str}.parquet'
return path
As long as I am using the same time window, I can read the same file from _get_path()
.
Then imagine that we want the values contained in this asset but in a different granularity. Lets say daily.
Then when I am in the context of the new asset and I try to read the files, since I have a different time window, the file generated by the _get_path()
does not exist and I have an error saying exactly that.
Starting from that, I've tried to use the filters
parameter from pd.read_parquet
.
Does it makes any sense or even there I should have done differently?sean
05/08/2023, 9:08 PM_get_path
doesn’t really make sense, you really need to get a set of (path, slice) pairs.
You can do this with custom logic, an IO manager does not need to implement _get_path
-- just load_input
and handle_output
.Guillaume Latour
05/08/2023, 9:30 PMIn the case where the partitions definition is not aligned, then one file does not necessarily correspond to the data you want to load. So in that case,yes that's it.doesn’t really make sense, you really need to get a set of (path, slice) pairs._get_path
You can do this with custom logic, an IO manager does not need to implementThat's what I did, using the-- just_get_path
andload_input
.handle_output
filters
parameter (pd.read_parquet(folder_path, filters=[('timestamp', '>=', start), ('timestamp', '<', end)])
), start
and end
being the partition thresholds.
At this point I wanted to implement it in a way to parameterize the column on which the comparison is done.
And I had a tough time understanding how it is possible to communicate from the asset to the io_manager.
I mean it is still not perfectly clear but I manage to do it in two different ways.
The first is using a resource_def
and a custom Config
class.
The second by call the @io_manager
decorator to create an object that can be provided to the @asset
decorator.
I guess that the second method is preferred.sean
05/08/2023, 10:47 PMsean
05/08/2023, 10:48 PM@io_manager
decorator pattern is older and will eventually be deprecated/deletedGuillaume Latour
05/13/2023, 3:17 PM