Could someone guide me how to create dynamic pipel...
# integration-airflow
w
Could someone guide me how to create dynamic pipelines, similarly to Airflow, but in Dagster. I have a json file in the format:
Copy code
[
    {"client_id": 1, "client_name": Joe},
    {"client_id": 2, "client_name": Ruth}
]
Where do I need to create a pipeline/job for each element in the list, with the same internal operations in the assets, only differentiating the id parameter. I searched in some forums, but little is said about dynamic jobs in Dagster. 😞 My question would be how to create a function in Dagster with operations similar to the
generate_compact_dags
function, defined in the
dynamic_pipelines.py
s
we use a lot of asset factories
Copy code
from dagster import asset

def generate_asset(asset_id: str):

   @asset(name=asset_id)
   def _my_asset():
       # do something
       my_result = asset_id
       return my_result
   
   return _my_asset
then you just loop through and generate them
Copy code
asset_list = [generate_asset(a) for a in ["a", "b", "c"]]
dagster spin 1
With ops and jobs, you can vary it by configuring the jobs though
Copy code
from dagster import op, job, OpExecutionContext

@op(config_schema={"my_s3_uri"})
def do_something_op(context: OpExecutionContext):
    return context.op_config["my_s3_uri"]

@job
def do_something_job():
    do_something_op()

# now, you can configure the job based on different params
job_list = []
for uri in ["<s3://a>", "<s3://b>", "<s3://c>"]:
    cfg = {"ops": {"do_something_op": {"config": {"my_s3_uri": uri}}}
    _job = do_something_job.configured(cfg)
    job_list.append(job)