https://dagster.io/ logo
Title
g

geoHeil

03/01/2022, 5:26 PM
I have an asset and IO manager:
class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return pd.read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path, index=False)

        yield <http://MetadataEntry.int|MetadataEntry.int>(obj.shape[0], label="number of rows")
        yield MetadataEntry.float(0.1234, "some_column mean")

    def get_output_asset_key(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        #return AssetKey(file_path)
        return file_path

    def get_output_asset_partitions(self, context):
        return set(context.config["partitions"])

@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-02-01"))
def dummy_asset_partitioned(context) -> DataFrame:
    """Creates a mini dummy asset which is partitioned"""
    partition_key = context.output_asset_partition_key
    get_dagster_logger().info(f"Partitioned asset from: {partition_key}")
    df = pd.DataFrame({'foo':[1,3,3], 'bar':['a', 'b', 'c']})
    df['partition_key'] = partition_key

    rand_metric_dummy_value = random.randrange(0, 101, 2)  
    yield Output(df, metadata={
        "path": EventMetadata.path('/path/to/file'),
        "value_counts": 10,
        "random_dummy_metric": rand_metric_dummy_value
    })
How can I use/set the
asset_key
only once? This is currently very unclear to me.
dagster.core.errors.DagsterInvariantViolationError: Both the OutputDefinition and the IOManager of output "result" on solid "dummy_asset_partitioned" associate it with an asset. Either remove the asset_key parameter on the OutputDefinition or use an IOManager that does not specify an AssetKey in its get_output_asset_key() function.
c

chris

03/01/2022, 8:09 PM
When you construct an
@asset
, its output is automatically associated with an AssetKey.
get_output_asset_key
is used to associate an output to an asset key after the fact, but since we already have an associated asset key, it isn't necessary
g

geoHeil

03/02/2022, 8:44 AM
@chris understood thanks.
However, I have one problem now a right click re-materialize asset fails -- as it does not trigger a partitioned run.
Only a run from the launchpad provides the right configuration.
Is it correct to consider this a bug in the dagit UI for partitioned runs of assets?
Furthermore, this IO Manager is not respecting the partitions - and overwriting the data
How can it be changed to include the partition as part of the keys/paths? I.e probably like asset/partition=part_key (Hadoop-style) sounds sensible. But perhaps you have a better idea.
c

chris

03/02/2022, 11:03 PM
Similarly to how your
@asset
defines its own asset key, it also defines its own set of partitions via the
partitions_def
argument. It doesn't look like this is documented at the moment, so going to file an issue for that.
You don't need to include the partition as part of the asset key / asset path, as the AssetMaterialization events created will have the partition as an attribute
@Dagster Bot issue [docs-content] partitions_def argument on @asset
d

Dagster Bot

03/02/2022, 11:05 PM
c

chris

03/02/2022, 11:05 PM
@Dagster Bot [docs-content] migrating from using AssetMaterializations to using software defined assets
d

Dagster Bot

03/02/2022, 11:05 PM
g

geoHeil

03/03/2022, 4:33 PM
@chris great to hear that. However, even after commenting out the
def get_output_asset_partitions(self, context):
part of the IO manager - the SAME path is still constructed for all partitions
file_path = os.path.join(self.base_path, context.step_key, context.name)
and thus one partition overwrites another one
s

sandy

03/03/2022, 4:38 PM
Hi @geoHeil. I would recommend changing this line:
file_path = os.path.join(self.base_path, context.step_key, context.name)
to
file_path = os.path.join(self.base_path, context.step_key, context.name, context.asset_partition_key)
g

geoHeil

03/03/2022, 4:54 PM
works! but in a different order:
file_path = os.path.join(self.base_path, context.step_key, context.asset_partition_key, context.name)
        get_dagster_logger().info(f"Suggested Path: {file_path}")
        Path(os.path.join(self.base_path, context.step_key, context.asset_partition_key)).mkdir(parents=True, exist_ok=True)
1
s

sandy

03/03/2022, 4:54 PM
awesome