https://dagster.io/ logo
#ask-community
Title
# ask-community
b

Ben Wilson

12/23/2022, 10:33 PM
Hi there...I'm sorry but I can't seem to figure this out. I have an op running in a job that materializes an asset. I have another job that I run subsequently and I want an op in that job to reference the asset I created in the first job. Is there any way to do that or am I just missing something?
g

Gabe Schine

12/24/2022, 12:25 AM
What do you mean by "reference"?
b

Ben Wilson

12/24/2022, 12:55 AM
As example, let's say in job1 I have an op that takes in some config values and builds out some file paths and stores them in a dictionary. After populating the dictionary, I materialize it as asset. Later on in job2, I would like one of the ops in this job to reference the asset and read the file paths
g

Gabe Schine

12/24/2022, 1:52 AM
So you want to load the asset contents in job2? Or only metadata?
b

Ben Wilson

12/24/2022, 6:41 PM
Hi @Gabe Schine thanks for the help here. I'd like to load the assets contents. Here is a simplified example of what I'm trying to accomplish
Copy code
op()
Hi @Gabe Schine thanks for the help here. I'd like to load the contents of the asset. Here is some simplified code that I hope conveys what I'm trying to accomplish
Copy code
@op(config_schema={"base_directory": str, "customer": str))
def generate_asset_metadata(context):
    customer = context.op_config["customer"]
    directory = context.op_config["base_directory"]
    customer_directory = f'{directory}/{customer}/files/'
    path_exists = os.path.exists(customer_directory)
    if not path_exists:
        # Create a new directory because it does not exist
        os.makedirs(customer_directory)
        <http://context.log.info|context.log.info>(f"Created Directory: {customer_directory}")
    file_metadata = {
        "customer_directory": customer_directory,
        "directory": directory
    }
    context.log_event(
        AssetMaterialization(
            asset_key="file_metadata", description="Metadata for file copy"
        )
    )
    return file_metadata

@op()
def get_file_count(context, generate_asset_metadata):
    source_files = os.listdir(generate_asset_metadata["customer_directory"])
    file_count = len(source_files)
    <http://context.log.info|context.log.info>(str(file_count))

@job()
def file_count_job():
    get_file_count(generate_asset_metadata())
Copy code
@op()
def copy_files(context):
    # I'd like to reference the asset here
   file_path = file_metadata["customer_directory"]
   copy_path = f'{file_path}/copy'
   customer_files = os.listdir(file_path)
   for file in customer_files:
    # Copy files to new directory
   return copy_path

@op()
def validate_file_copy(context, copy_files):
    file_path = file_metadata["customer_directory"]
    copy_path = copy_files
    ## Validate files 

@job()
def file_count_job():
    validate_file_copy(copy_files())
In the first code block, I generate a dictionary and materialize it as an asset. In the second block, I want to reference that asset to pull in the stored file path
g

Gabe Schine

12/24/2022, 8:12 PM
Got it yeah I wanted to do something just like this, and it's not possible right now apparently. You can read the metadata from past materialization events, I believe, but you can't get the contents of an asset from a job without re-materializing the asset.
Here's a feature request that is related: https://github.com/dagster-io/dagster/issues/10874
b

Ben Wilson

12/24/2022, 8:15 PM
Thanks Gabe! can you point to a sample for how I might read the metadata?
3 Views