geoHeil
03/01/2022, 5:26 PMclass PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return pd.read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path, index=False)
yield <http://MetadataEntry.int|MetadataEntry.int>(obj.shape[0], label="number of rows")
yield MetadataEntry.float(0.1234, "some_column mean")
def get_output_asset_key(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
#return AssetKey(file_path)
return file_path
def get_output_asset_partitions(self, context):
return set(context.config["partitions"])
@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-02-01"))
def dummy_asset_partitioned(context) -> DataFrame:
"""Creates a mini dummy asset which is partitioned"""
partition_key = context.output_asset_partition_key
get_dagster_logger().info(f"Partitioned asset from: {partition_key}")
df = pd.DataFrame({'foo':[1,3,3], 'bar':['a', 'b', 'c']})
df['partition_key'] = partition_key
rand_metric_dummy_value = random.randrange(0, 101, 2)
yield Output(df, metadata={
"path": EventMetadata.path('/path/to/file'),
"value_counts": 10,
"random_dummy_metric": rand_metric_dummy_value
})
How can I use/set the asset_key
only once? This is currently very unclear to me.
dagster.core.errors.DagsterInvariantViolationError: Both the OutputDefinition and the IOManager of output "result" on solid "dummy_asset_partitioned" associate it with an asset. Either remove the asset_key parameter on the OutputDefinition or use an IOManager that does not specify an AssetKey in its get_output_asset_key() function.
chris
03/01/2022, 8:09 PM@asset
, its output is automatically associated with an AssetKey. get_output_asset_key
is used to associate an output to an asset key after the fact, but since we already have an associated asset key, it isn't necessarygeoHeil
03/02/2022, 8:44 AMchris
03/02/2022, 11:03 PM@asset
defines its own asset key, it also defines its own set of partitions via the partitions_def
argument. It doesn't look like this is documented at the moment, so going to file an issue for that.Dagster Bot
03/02/2022, 11:05 PMchris
03/02/2022, 11:05 PMDagster Bot
03/02/2022, 11:05 PMgeoHeil
03/03/2022, 4:33 PMdef get_output_asset_partitions(self, context):
part of the IO manager - the SAME path is still constructed for all partitions file_path = os.path.join(self.base_path, context.step_key, context.name)
and thus one partition overwrites another onesandy
03/03/2022, 4:38 PMfile_path = os.path.join(self.base_path, context.step_key, context.name)
to
file_path = os.path.join(self.base_path, context.step_key, context.name, context.asset_partition_key)
geoHeil
03/03/2022, 4:54 PMfile_path = os.path.join(self.base_path, context.step_key, context.asset_partition_key, context.name)
get_dagster_logger().info(f"Suggested Path: {file_path}")
Path(os.path.join(self.base_path, context.step_key, context.asset_partition_key)).mkdir(parents=True, exist_ok=True)
sandy
03/03/2022, 4:54 PM