Wellington Braga
02/20/2023, 8:00 PM[
{"client_id": 1, "client_name": Joe},
{"client_id": 2, "client_name": Ruth}
]
Where do I need to create a pipeline/job for each element in the list, with the same internal operations in the assets, only differentiating the id parameter.
I searched in some forums, but little is said about dynamic jobs in Dagster. 😞
My question would be how to create a function in Dagster with operations similar to the generate_compact_dags
function, defined in the dynamic_pipelines.py
Stephen Bailey
02/21/2023, 8:19 PMfrom dagster import asset
def generate_asset(asset_id: str):
@asset(name=asset_id)
def _my_asset():
# do something
my_result = asset_id
return my_result
return _my_asset
then you just loop through and generate them
asset_list = [generate_asset(a) for a in ["a", "b", "c"]]
Stephen Bailey
02/21/2023, 8:22 PMfrom dagster import op, job, OpExecutionContext
@op(config_schema={"my_s3_uri"})
def do_something_op(context: OpExecutionContext):
return context.op_config["my_s3_uri"]
@job
def do_something_job():
do_something_op()
# now, you can configure the job based on different params
job_list = []
for uri in ["<s3://a>", "<s3://b>", "<s3://c>"]:
cfg = {"ops": {"do_something_op": {"config": {"my_s3_uri": uri}}}
_job = do_something_job.configured(cfg)
job_list.append(job)