Tara Larrue
02/14/2024, 11:45 AMScout
02/14/2024, 11:45 AMpython
from dagster import (
Definitions,
AssetKey,
OutputContext,
InputContext,
ConfigurableIOManager,
)
class MyIOManager(ConfigurableIOManager):
root_path: str
def _get_path(self, asset_key: AssetKey) -> str:
return self.root_path + "/".join(asset_key.path)
def handle_output(self, context: OutputContext, obj):
write_csv(self._get_path(context.asset_key), obj)
def load_input(self, context: InputContext):
return read_csv(self._get_path(context.asset_key))
defs = Definitions(
assets=...,
resources={"io_manager": MyIOManager(root_path="/tmp/")},
)
In this example, MyIOManager
is an IOManager that has a root_path
attribute. This IOManager is then added to the resources under the key "io_manager"
.
If you need to access the root_path
within an asset definition function, you would typically pass it as a resource to the context of the asset:
python
from dagster import asset
@asset(required_resource_keys={"io_manager"})
def my_asset(context):
# You can access the root_path from the IOManager resource here
root_path = context.resources.io_manager.root_path
# Perform your asset computation
In this asset definition, the required_resource_keys
parameter specifies that the asset requires the "io_manager"
resource, which allows you to access the root_path
attribute from the IOManager within the asset definition function through the context's resources.
Keep in mind that this is a simplified example, and the actual implementation details may vary depending on your specific use case and the version of Dagster you are using. Enabling using assets as input for a job... button Pythonic Input/Output Metadata button I/O Managers | Dagster button IOManagerDefinition.address_for_asset button Thank you for your feedback!Tara Larrue
02/14/2024, 11:50 AMScout
02/14/2024, 11:50 AMload_input
method to fetch data from the database.
2. Register the custom IOManager as a resource in your Dagster repository.
3. Use the input_manager_key
argument on AssetIn
to specify that the downstream asset should use this custom IOManager to load its input.
Here's an example that illustrates these steps:
python
from dagster import IOManager, InputContext, asset, AssetIn, repository, with_resources
import pandas as pd
import sqlalchemy
class PostgresTableIOManager(IOManager):
def __init__(self, engine):
self.engine = engine
def load_input(self, context: InputContext):
table_name = context.upstream_output.name
query = f"SELECT * FROM {table_name}"
with self.engine.connect() as conn:
return pd.read_sql(query, conn)
# Define the resource that provides the IOManager
postgres_io_manager = PostgresTableIOManager(
engine=sqlalchemy.create_engine('<postgresql://username:password@host:port/dbname>')
)
# Define the source asset
@asset
def source_asset():
# Logic to load data from the PostgreSQL table
...
# Define the downstream asset with a partitioned input
@asset(ins={"source_asset": AssetIn(input_manager_key="postgres_io_manager")})
def downstream_asset(source_asset):
# Logic that uses the data from the source_asset
...
# Define the repository and include the IOManager resource
@repository
def my_repo():
return with_resources(
[source_asset, downstream_asset],
{"postgres_io_manager": postgres_io_manager}
)
In this example, PostgresTableIOManager
is a custom IOManager that reads from a PostgreSQL table. The load_input
method constructs a SQL query to read the entire table and uses Pandas to load the result into a DataFrame.
The source_asset
is the asset representing the table in the PostgreSQL database. The downstream_asset
is the asset that depends on the source_asset
. It specifies that it should use the postgres_io_manager
to load its input.
Finally, the my_repo
function defines the repository and attaches the postgres_io_manager
resource to it.
Please note that this is a simplified example, and you would need to adapt it to fit your specific schema, table names, and authentication details. Additionally, if your downstream asset is partitioned, you wouldScout
02/14/2024, 11:51 AMload_input
method, potentially by using the asset_partition_key
property from the InputContext
to filter the data for the specific partition.Tara Larrue
02/14/2024, 11:57 AMasset_partition_key
?Scout
02/14/2024, 11:57 AMasset_partition_key
by using the keys_by_dimension
method of the MultiPartitionKey
object. This method returns the key for each dimension of the multi-partition. Here's an example of how you might access the partition keys from a multi-partitioned asset: python from dagster import asset, MultiPartitionKey @asset(partitions_def=MultiPartitionsDefinition({ "date": DailyPartitionsDefinition(start_date="2022-01-01"), "color": StaticPartitionsDefinition(["red", "yellow", "blue"]), })) def multi_partitions_asset(context): partition_keys = context.partition_key.keys_by_dimension date_key = partition_keys['date'] color_key = partition_keys['color'] <http://context.log.info|context.log.info>(f"Date partition: {date_key}, Color partition: {color_key}")
In this example, multi_partitions_asset
is an asset that is partitioned by both date and color. Inside the asset function, context.partition_key.keys_by_dimension
is used to access the partition keys for each dimension. The date_key
and color_key
variables hold the partition keys for the "date" and "color" dimensions, respectively. This allows you to use the specific partition keys to tailor the behavior of your asset computation based on the partitions being processed.Tara Larrue
02/14/2024, 12:47 PMScout
02/14/2024, 12:47 PM