Hi, I'm trying to figure out how to do something l...
# ask-community
j
Hi, I'm trying to figure out how to do something like this using a sensor, but consuming the filename from an asset, rather than an op. How do I pass the filename to asset1? I couldn't find the answer in the docs.
Copy code
@asset
def asset1(context):
   filename = context.op_config["filename"]
   # materializa with the filename

@asset
def asset2(asset1):
   # materialize 2nd asset based on asset1

asset_job = define_asset_job("asset_job", "*")


@sensor(job=asset_job)
def materializes_asset_sensor():
    yield RunRequest(
         run_key=filename,
                 run_config={
                     "ops": {"import_file": {"config": {"filename": filename}}}
                 },
)
m
How about this?
Copy code
from dagster import (
    asset,
    asset_sensor,
    define_asset_job,
    RunRequest,
    DefaultSensorStatus,
    AssetKey,
    Definitions,
)


@asset
def filenamesource():
    return {"filename": "thefilename.csv"}


@asset
def asset1(filenamesource):

    filename = filenamesource["filename"]

    return f"stuff you loaded from {filename}"
    # materializa with the filename


@asset
def asset2(asset1):
    # materialize 2nd asset based on asset1

    with open("outputdata.txt", "w") as outputfile:
        outputfile.write(asset1)


asset_job = define_asset_job("asset_job", [asset1, asset2])


@asset_sensor(
    job=asset_job,
    asset_key=AssetKey("filenamesource"),
    default_status=DefaultSensorStatus.RUNNING,
)
def materializes_asset_sensor():
    yield RunRequest()


defs = Definitions(
    sensors=[materializes_asset_sensor],
    jobs=[asset_job],
    assets=[filenamesource, asset1, asset2],
)
j
Sorry I wasn't very clear. The file that the sensor detects is an external resource, not created by assets 1 or 2.
Basically this pipeline detects a raw file and generates assets from it.
m
But the filename has to be determined programmatically, i.e. not hardcoded?
j
yes, it's done inside the sensor like this. Apparently the run_config part only works for ops not assets.
Copy code
def materializes_asset_sensor():
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={
                    "ops": {"import_file": {"config": {"filename": filename}}}
                },
            )
m
How about this?
Copy code
from dagster import (
    asset,
    define_asset_job,
    RunRequest,
    Definitions,
    Config,
    sensor
)

import os

MY_DIRECTORY = './subfolder'

class YourConfg(Config):
    filename:str


@asset
def asset1(config:YourConfg):
    filename = config.filename
    # materializa with the filename
    return f"stuff you loaded from {filename}"


@asset
def asset2(asset1):
    # materialize 2nd asset based on asset1
    with open("outputdata.txt", "w") as outputfile:
        outputfile.write(asset1)


asset_job = define_asset_job("asset_job", "*")


@sensor(job=asset_job)
def materializes_asset_sensor():
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"ops": {"asset1": {"config": {"filename": filename}}}},
            )

defs = Definitions(
    sensors=[materializes_asset_sensor],
    jobs=[asset_job],
    assets=[asset1, asset2],
)
j
Thanks. Let me try.
Wow, it works!!! Thank you very much. I would have never thought of this. 🙂
m
My pleasure. I'm trying to learn Dagster right now, so I figured this would be a good problem to try and figure out..
👍 1