jasono
02/10/2024, 2:51 AM@asset
def asset1(context):
filename = context.op_config["filename"]
# materializa with the filename
@asset
def asset2(asset1):
# materialize 2nd asset based on asset1
asset_job = define_asset_job("asset_job", "*")
@sensor(job=asset_job)
def materializes_asset_sensor():
yield RunRequest(
run_key=filename,
run_config={
"ops": {"import_file": {"config": {"filename": filename}}}
},
)
Michael Riley
02/10/2024, 5:23 AMfrom dagster import (
asset,
asset_sensor,
define_asset_job,
RunRequest,
DefaultSensorStatus,
AssetKey,
Definitions,
)
@asset
def filenamesource():
return {"filename": "thefilename.csv"}
@asset
def asset1(filenamesource):
filename = filenamesource["filename"]
return f"stuff you loaded from {filename}"
# materializa with the filename
@asset
def asset2(asset1):
# materialize 2nd asset based on asset1
with open("outputdata.txt", "w") as outputfile:
outputfile.write(asset1)
asset_job = define_asset_job("asset_job", [asset1, asset2])
@asset_sensor(
job=asset_job,
asset_key=AssetKey("filenamesource"),
default_status=DefaultSensorStatus.RUNNING,
)
def materializes_asset_sensor():
yield RunRequest()
defs = Definitions(
sensors=[materializes_asset_sensor],
jobs=[asset_job],
assets=[filenamesource, asset1, asset2],
)
jasono
02/10/2024, 5:25 AMjasono
02/10/2024, 5:27 AMMichael Riley
02/10/2024, 5:27 AMjasono
02/10/2024, 5:29 AMdef materializes_asset_sensor():
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)
if os.path.isfile(filepath):
yield RunRequest(
run_key=filename,
run_config={
"ops": {"import_file": {"config": {"filename": filename}}}
},
)
Michael Riley
02/10/2024, 5:44 AMfrom dagster import (
asset,
define_asset_job,
RunRequest,
Definitions,
Config,
sensor
)
import os
MY_DIRECTORY = './subfolder'
class YourConfg(Config):
filename:str
@asset
def asset1(config:YourConfg):
filename = config.filename
# materializa with the filename
return f"stuff you loaded from {filename}"
@asset
def asset2(asset1):
# materialize 2nd asset based on asset1
with open("outputdata.txt", "w") as outputfile:
outputfile.write(asset1)
asset_job = define_asset_job("asset_job", "*")
@sensor(job=asset_job)
def materializes_asset_sensor():
for filename in os.listdir(MY_DIRECTORY):
filepath = os.path.join(MY_DIRECTORY, filename)
if os.path.isfile(filepath):
yield RunRequest(
run_key=filename,
run_config={"ops": {"asset1": {"config": {"filename": filename}}}},
)
defs = Definitions(
sensors=[materializes_asset_sensor],
jobs=[asset_job],
assets=[asset1, asset2],
)
jasono
02/10/2024, 5:46 AMjasono
02/10/2024, 6:04 AMMichael Riley
02/10/2024, 7:44 AM