vinesh nukala
03/22/2023, 11:05 PM@asset(
ins={
"df": AssetIn(
input_manager_key="pors_io_manager",
metadata={
"namespace_name": "name",
"document_id": 42343,
},
)
}
)
def read_asset(df):
print(df)
@repository
def repo():
return [
*with_resources(
[read_asset],
resource_defs={
"pors_io_manager": pors_io_manager,
},
),
define_asset_job("temp"),
]
if __name__ == "__main__":
result = repo.get_job("temp").execute_in_process()
But it doesn’t work yet this page shows that it can be done with Ops using the following:
@op(
ins={
"df": In(
input_manager_key="pors_io_manager",
metadata={
"namespace_name": "name",
"document_id": 42343,
},
)
}
)
def op_2(df):
print(df)
@job(resource_defs={"pors_io_manager": pors_io_manager})
def a_job():
op_2()
if __name__ == "__main__":
result = a_job.execute_in_process()
Is their any way to replicate that functionally using assets?jamie
03/23/2023, 2:14 PMsource_df = SourceAsset(key=["unique_name_for_the_asset"], io_manager="pors_io_manager")
@asset
def read_asset(unique_name_for_the_asset):
print(df)
that should get you moving in the right direction. Basically when unique_name_for_the_asset
is loaded as input to read_asset
the pors_io_manager
will be invoked to load unique_name_for_the_asset
https://docs.dagster.io/concepts/assets/software-defined-assets#defining-external-asset-dependencies