Alexandria Sodemann
05/15/2023, 1:25 PMfrom dagster import job, DynamicOut,DynamicOutput, op
from dagster_meltano import meltano_command_op, meltano_resource
import os
meltano_info = [
{
"tap": "tap-xx",
"target": "target-xx",
"catalog_path": "dagster_project/meltano/meltano_catalogs/tap-xx/xxx.json",
"dagster_name": "xx",
},
{
"tap": "tap-xx",
"target": "target-xx",
"catalog_path": "dagster_project/meltano/meltano_catalogs/tap-xx/xxx.json",
"dagster_name": "xx",
}
]
@op(out=DynamicOut())
def create_run_commands_op():
DAGSTER_PROJECT_ROOT = os.environ['DAGSTER_PROJECT_ROOT']
commands = []
for info in meltano_info:
command_string = f'elt {info["tap"]} {info["target"]} --catalog {DAGSTER_PROJECT_ROOT}/{info["catalog_path"]}, dagster_name="{info["dagster_name"]}"'
dagster_name = info["dagster_name"]
commands.append(DynamicOutput(str(command_string)), mapping_key=str(dagster_name))
return commands
@job(resource_defs={"meltano": meltano_resource})
def meltano_job():
contents = create_run_commands_op()
contents.map(meltano_command_op)