Fernando Cagua
02/26/2024, 11:15 PMScout
02/26/2024, 11:15 PMUPathIOManager
, you can subclass UPathIOManager
and override the dump_to_path
and load_from_path
methods to incorporate the partition key into the file path. Here's an example based on the documentation: python import pandas as pd from upath import UPath from dagster import OutputContext, UPathIOManager class MyPartitionedParquetIOManager(UPathIOManager): extension: str = ".parquet" def _get_partitioned_path(self, context: OutputContext, path: UPath) -> UPath: if context.has_asset_partitions: partition_key = context.asset_partition_key # Assuming partition_key is in the format 'YYYY-MM-DD' year, month, day = partition_key.split('-') # Construct the new path with partition directories partitioned_path = path / year / month / f"{partition_key}{self.extension}" return partitioned_path else: # If there's no partition, use the default behavior return super()._get_partitioned_path(context, path) def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath): partitioned_path = self._get_partitioned_path(context, path) with partitioned_path.open("wb") as file: obj.to_parquet(file) def load_from_path(self, context: OutputContext, path: UPath) -> pd.DataFrame: partitioned_path = self._get_partitioned_path(context, path) with partitioned_path.open("rb") as file: return pd.read_parquet(file)
In this example, the _get_partitioned_path
method is used to modify the path based on the partition key. If the OutputContext
has a partition key, it splits the key into year, month, and day components and constructs a new path that includes these components. The dump_to_path
and load_from_path
methods use this new path to write and read the parquet files, respectively. Please note that this is a simplified example, and you may need to adjust the path construction logic to fit your specific use case and ensure that the directories exist before writing to them.Fernando Cagua
02/26/2024, 11:19 PMScout
02/26/2024, 11:20 PMadd_output_metadata
method within the handle_output
method of your custom IO manager class. This method allows you to attach arbitrary metadata to the output of an asset when it is being handled by the IO manager. The metadata you add will be attached to the materialization event for that asset and will be displayed on the Asset Details page in the Dagit UI. Here's an example of how you might implement this in a custom IO manager: python from dagster import IOManager, OutputContext class DataframeTableIOManagerWithMetadata(IOManager): def handle_output(self, context: OutputContext, obj): table_name = context.name # Perform your logic to write the dataframe to a table write_dataframe_to_table(name=table_name, dataframe=obj) # Add metadata entries context.add_output_metadata({ "num_rows": len(obj), "table_name": table_name }) def load_input(self, context): # Perform your logic to load the dataframe from a table table_name = context.upstream_output.name return read_dataframe_from_table(name=table_name)
In this example, the handle_output
method is used to write a dataframe to a table and then add metadata about the number of rows and the table name to the output. This metadata will be recorded and displayed in Dagit. Please note that the write_dataframe_to_table
and read_dataframe_from_table
functions are placeholders for your actual logic to write and read dataframes to and from a table or other storage system. Remember to configure your pipeline to use this custom IO manager for the relevant assets or outputs. You can do this by setting the io_manager
key in the resource configuration when defining your pipeline or job.Fernando Cagua
02/27/2024, 1:26 AMScout
02/27/2024, 1:26 AMload_input
method. The OutputContext
object available in the handle_output
method allows you to add metadata using add_output_metadata
, which is then associated with the materialization event for that asset. However, this metadata is not directly accessible in the InputContext
during the loading phase.
If you need to access output metadata within an IOManager, you would typically need to use a different approach, such as:
1. Storing the metadata in a place where it can be retrieved later, such as a database or a metadata store, and then reading it from there in the load_input
method.
2. Using the Dagster event log to query for materialization events and extract the metadata from there. This would involve using Dagster's instance to query the event log for the specific run and asset key.
Here's a conceptual example of how you might use the event log to access metadata:
python
from dagster import IOManager, InputContext, AssetKey
from dagster.core.definitions.events import EventRecordsFilter, DagsterEventType
class MyIOManager(IOManager):
def load_input(self, context: InputContext):
# Assuming you want to access metadata for a specific asset
asset_key = AssetKey(["my_asset"])
# Query the event log for materialization events for the asset
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
),
ascending=False, # Get the most recent event first
limit=1, # We only want the most recent materialization
)
if event_records:
# Extract metadata from the most recent materialization event
metadata = event_records[0].event_log_entry.dagster_event.event_specific_data.materialization.metadata_entries
# Now you can use the metadata as needed
# ...
# Perform your logic to load the input
# ...
In this example, get_event_records
is used to retrieve the most recent materialization event for a specific asset, and then the metadata is extracted from that event. This is a more advanced use case and may not be suitable for all scenarios, especially if you have a high volume of events or need to access metadata frequently.
Please note that the code above is a conceptual example and may require adjustments to fit your specific use case. Additionally, the actual implementation details may vary depending on the version of Dagster you are using and any changes to the API thatScout
02/27/2024, 1:26 AMFernando Cagua
02/27/2024, 1:32 AMScout
02/27/2024, 1:32 AMresource config
field in the context object to access the configuration values. While you cannot directly modify the IO manager's configuration at runtime from within an asset or op, you can pass parameters to the IO manager by including them in the configuration when you define your pipeline or job. Then, within the IO manager's methods, you can access these parameters through the context object. Here's a conceptual example of how you might set up an IO manager to accept dynamic parameters via its configuration: python from dagster import IOManager, OutputContext, InputContext, io_manager, resource class MyDynamicIOManager(IOManager): def __init__(self, config): self.config = config def handle_output(self, context: OutputContext, obj): # Use the dynamic parameter from the config dynamic_param = self.config.get('dynamic_param') # Perform the output handling logic using the dynamic parameter # ... def load_input(self, context: InputContext): # Use the dynamic parameter from the config dynamic_param = self.config.get('dynamic_param') # Perform the input loading logic using the dynamic parameter # ... @io_manager(config_schema={"dynamic_param": str}) def my_dynamic_io_manager(init_context): return MyDynamicIOManager(config=init_context.resource_config) # In your job or pipeline definition, you can then configure the IO manager: my_job = my_pipeline.to_job(resource_defs={ "io_manager": my_dynamic_io_manager.configured({"dynamic_param": "my_value"}) })
In this example, my_dynamic_io_manager
is an IO manager that accepts a configuration with a dynamic_param
. When you define your job, you use the configured
method to provide the dynamic parameter's value. If you need to pass parameters that are only known at runtime from within an asset or op, you would typically need to include that information as part of the data being passed between assets or ops, or store it in a place where the IO manager can retrieve it, such as a database or a metadata store. As of my knowledge cutoff date, Dagster does not support dynamically altering the configuration of an IO manager from within an asset or op at runtime. If you need to pass different parameters for different runs, you would need to configure your job or pipeline with the appropriate parameters before each run.