https://dagster.io/ logo
#ask-community
Title
# ask-community
h

Hebo Yang

08/22/2022, 5:28 PM
Happy Monday team! I have a question on asset materialization partition. I am using
AssetsDefinition.from_graph()
to create assets and
GraphDefinition(output_mappings)
to output asset from an op of the graph. However, my asset generated doesn’t have partition metadata attached. Could someone help take a look please? (Will provide more details in the thread)
🤖 1
I already defined partitions_def in
AssetsDefinition.from_graph()
and
GraphDefinition()
. However, the in the asset page, the partition isn’t attached (see sc) I needs the partition info from the
context.instance.get_event_records()
so that I could figure out which partition the asset materialization was for from the sensor. (and it’s nice to have partitions properly display in the asset page). When I printed out the event logs, it looks like the AssetMaterialization is somehow missing the partition.
Copy code
event_specific_data=StepMaterializationData(materialization=AssetMaterialization(asset_key=AssetKey(['DATALAKE', 'prod', 'ml', 'datasets', 'canary_complex_pipeline']), description=None, metadata_entries=[ MetadataEntry(label='path', description=None, entry_data=PathMetadataValue(path='/opt/dagster/dagster_home/storage/DATALAKE/prod/ml/datasets/canary_complex_pipeline/2022-08-21'))], partition=None), asset_lineage=[])
Here is the code I am using to create the asset, job, and dummy Op. Am I missing something in get_asset_op or maybe I need to pass in
partition_mappings
or
metadata_by_output_name
to
from_graph()
?
Copy code
asset = AssetsDefinition.from_graph(
    graph,
    keys_by_input_name=self.source_assets,
    keys_by_output_name={self.asset_name: AssetKey(self.asset_key)},
    resource_defs={"active_date": active_date_resouce, "labels": labels},
    partitions_def=daily_partition,
)

job = graph.to_job(
    name=self.get_name_for_source(),
    config=self.get_daily_partition_config(),
    # partitions_def=daily_partition,
    resource_defs={"active_date": active_date_resouce, "labels": labels},
    hooks={metrics_failure_hook},
)

def get_asset_op(op_name, ins, out_asset_name, out_asset_key: List[str], partition_def):
    out = {
        out_asset_name: Out(
            Any,
            asset_key=AssetKey(out_asset_key),
            asset_partitions_def=partition_def
        )
    }

    def asset_func(context, *args, **kwargs):
        <http://context.log.info|context.log.info>(f"context: {type(context)} {context.partition_key}.")
        yield Output(
            value=out_asset_name,
            output_name=out_asset_name,
        )

    return OpDefinition(
        name=op_name,
        ins=ins,
        outs=out,
        compute_fn=asset_func,
    )
I couldn’t use `define_asset_job`() as I am hoping to include other ops inside my job.
s

sandy

08/22/2022, 6:14 PM
hey @Hebo Yang - I haven't had a chance to look at your question in detail yet, but I'm curious about this part:
I couldn’t use `define_asset_job`() as I am hoping to include other ops inside my job.
mind if I ask what kinds of things those other ops are doing?
h

Hebo Yang

08/22/2022, 6:40 PM
Hi Sandy! We have an existing graph that contains 3 ops. I am trying to have the graph output an asset, while keeping the existing ops structure. On a higher level, we’d like to use assets for dependencies between jobs. We are also trying to use asset sensor to re-trigger downstream jobs upon upstream asset materialization (even for historical partitions). I have got most of this to work, other than the asset materialization partition.
s

sandy

08/22/2022, 6:45 PM
you should be able to use
define_asset_job
with graph-backed assets:
Copy code
asset = AssetsDefinition.from_graph(...)

@repository
def repo():
    return [asset, define_asset_job("name", selection=AssetSelection.assets(asset))]
would that work for you?
h

Hebo Yang

08/22/2022, 8:25 PM
How do i include other ops inside `define_asset_job`() please? Do I have to convert all other ops into assets?
s

sandy

08/22/2022, 8:26 PM
ah I see - you have other ops that aren't inside the graph that you're calling
AssetsDefinition.from_graph
on? are those ops downstream of your assets? yes, currently you would need to convert them to assets. do they not make sense as assets? or it's just a pain to need to convert them all at once vs. incrementally?
h

Hebo Yang

08/22/2022, 8:32 PM
Right. Those ops are to generate the asset. I just think that these ops shouldn’t be assets themselves conceptually (and they kinda pollutes the asset space). But I guess if this is the only way to get it to work, I could convert them all into assets. I feel like the ability to mix ops with assets inside a graph is currently missing.
s

sandy

08/22/2022, 8:37 PM
I agree with you that it should be possible to define ops that help generate an asset, but not have those ops be assets themselves. This is the use case that graph-backed assets are designed for: https://docs.dagster.io/concepts/assets/software-defined-assets#graph-backed-assets. Would this work for you? Happy to chat on a call about this if it's easier.
❤️ 1
f

Fernando Mangussi

09/21/2022, 8:33 PM
@Hebo Yang Here is one snippet that provides upstream/downstream partitioned assets and a downstream graph.
Copy code
from dagster import (AssetsDefinition, asset, define_asset_job,
                     graph, op, repository,
                     TimeWindowPartitionsDefinition, with_resources,
                     )

every_10_min_partition = TimeWindowPartitionsDefinition(
    '2022-09-21-00:23', fmt="%Y-%m-%d-%H:%M",
    timezone='UTC',
    cron_schedule='*/10 * * * *')


@asset(partitions_def=every_10_min_partition)
def upstream_asset() -> int:
    return 1


@op
def add_one(value) -> int:
    return value + 1


@op
def middle_op(from_add_one) -> int:
    return from_add_one


@graph
def downstream_asset(upstream_asset) -> int:
    return middle_op(add_one(upstream_asset))


downstream_asset = AssetsDefinition.from_graph(
    graph_def=downstream_asset,
    partitions_def=every_10_min_partition
)

downstream_asset_job = define_asset_job(
    name="downstream_asset_job",
    selection=["downstream_asset", "upstream_asset"],
    partitions_def=every_10_min_partition,
)


@repository
def repo():
    return [
        downstream_asset,
        upstream_asset,
        downstream_asset_job,
    ]
👍 1
3 Views