https://dagster.io/ logo
#ask-community
Title
# ask-community
t

Terry Lines

12/28/2022, 1:12 AM
Hi! I am trying to define an asset using a graph and I can't get the asset key to work as I'd hoped. The pattern is this:
Copy code
data1 = SourceAsset(key='data1') 
data1 = SourceAsset(key='data2') 

@op(out=Out(io_manager_key='database_io_manager',asset_key=AssetKey('my_table_name),)
)
def infutor_address_tci(tci: str):
    return tci


@op(
    ins={
        'start': In(Nothing),
        'tch': In()
        },
    out=Out(
        io_manager_key='kinetica_io_manager',
        asset_key=AssetKey(['kinetica', 'address', 'infutor_address']),
        metadata={
            'load_options': {
                'batch_size': '50000',
                # tch only has a subset of data, hence the links to out of bounds (901....)
                'columns_to_load': '29, 8, 10..20,24,901..902,,32,904',
                'file_type': 'csv',
                'text_delimiter': '\t',
                'text_has_header': 'false',
                'text_quote_character': '~',
                'error_handling': 'permissive',
            }
        }
    )
)
def infutor_address_tch(tch:str):
    return tch


@op(required_resource_keys={'db'})
def update_infutor_address(context: OpExecutionContext, table):
    # remove lat/lon not at geocode rooftop, line interpolated, or parcel_centroid level.
    context.resources.db.execute_sql(f"""
        UPDATE {table.name}
        SET
            latitude=NULL,
            longitude=NULL
        where
            (latitude is not null or
            longitude is not null) and 
            nvl(geocode not in ('01','02'),true)
        """
                                     )
    return None


@graph()
def infutor_address(tci: str, tch: str):
    table = infutor_address_tch(tch, start=infutor_address_tci(tci))
    return update_infutor_address(table)


infutor_address = AssetsDefinition.from_graph(
    infutor_address,
    keys_by_input_name={
        'tci': AssetKey(['s3', 'infutor', 'tci']),
        'tch': AssetKey(['s3', 'infutor', 'tch'])
    },
    key_prefix=['kinetica', 'address'])
s

sandy

12/28/2022, 5:01 PM
Hi Terry - your use case is a little bit outside the typical use case for software-defined assets, because Dagster expects a 1:1 relationship between op outputs and assets and it sounds like you need many outputs to map to the same asset. I believe you would need to either: • Use Dagster's op APIs instead of the software-defined asset APIs • Have only one op output map to each asset
t

Terry Lines

12/28/2022, 5:33 PM
Thanks. If I used the op APIs and ran via a job would I get the same benefits of displayed lineage etc. in dagit around the assets that were materalized in this process? I have managed to make a solution work by defining the table_name in the output metadata and having my io_manager look for it if an asset_key is not present. It's not elegant but it allows me to use the same io_manager.
👍 1
s

sandy

12/28/2022, 6:31 PM
Thanks. If I used the op APIs and ran via a job would I get the same benefits of displayed lineage etc. in dagit around the assets that were materalized in this process?
Lineage currently only works when you're using software-defined assets
2 Views