Terry Lines
12/28/2022, 1:12 AMdata1 = SourceAsset(key='data1')
data1 = SourceAsset(key='data2')
@op(out=Out(io_manager_key='database_io_manager',asset_key=AssetKey('my_table_name),)
)
def infutor_address_tci(tci: str):
return tci
@op(
ins={
'start': In(Nothing),
'tch': In()
},
out=Out(
io_manager_key='kinetica_io_manager',
asset_key=AssetKey(['kinetica', 'address', 'infutor_address']),
metadata={
'load_options': {
'batch_size': '50000',
# tch only has a subset of data, hence the links to out of bounds (901....)
'columns_to_load': '29, 8, 10..20,24,901..902,,32,904',
'file_type': 'csv',
'text_delimiter': '\t',
'text_has_header': 'false',
'text_quote_character': '~',
'error_handling': 'permissive',
}
}
)
)
def infutor_address_tch(tch:str):
return tch
@op(required_resource_keys={'db'})
def update_infutor_address(context: OpExecutionContext, table):
# remove lat/lon not at geocode rooftop, line interpolated, or parcel_centroid level.
context.resources.db.execute_sql(f"""
UPDATE {table.name}
SET
latitude=NULL,
longitude=NULL
where
(latitude is not null or
longitude is not null) and
nvl(geocode not in ('01','02'),true)
"""
)
return None
@graph()
def infutor_address(tci: str, tch: str):
table = infutor_address_tch(tch, start=infutor_address_tci(tci))
return update_infutor_address(table)
infutor_address = AssetsDefinition.from_graph(
infutor_address,
keys_by_input_name={
'tci': AssetKey(['s3', 'infutor', 'tci']),
'tch': AssetKey(['s3', 'infutor', 'tch'])
},
key_prefix=['kinetica', 'address'])
sandy
12/28/2022, 5:01 PMTerry Lines
12/28/2022, 5:33 PMsandy
12/28/2022, 6:31 PMThanks. If I used the op APIs and ran via a job would I get the same benefits of displayed lineage etc. in dagit around the assets that were materalized in this process?Lineage currently only works when you're using software-defined assets