https://dagster.io/ logo
#ask-community
Title
# ask-community
s

Sourabh Mishra

02/20/2024, 6:44 AM
Hi Team, I want to invoke one job using scheduler and pass config parameters from schedulers so that I can run parallel jobs. For e.g in this code snippet I want to invoke dynamic_job via scheduler. I tried to invoke using scheduler as but not sure what to pass in the ops.
Copy code
@op(out=DynamicOut(str))
def listTestCases(context):    
    for test_case in range(10) :
        testcase = f"testcase_{test_case}"
        yield DynamicOutput(
            value=testcase,
            # create a mapping key from the file name
            mapping_key=testcase,
        )


@op 
def A(context,testcase) : 
   <http://context.log.info|context.log.info>(f"[A] : {testcase}")

@op 
def B(context,testcase) : 
   <http://context.log.info|context.log.info>(f"[B] : {testcase}")

@op 
def C(context,testcase) : 
   <http://context.log.info|context.log.info>(f"[C] : {testcase}")

@job
def dynamic_job():
    listTestCases().map( A -> B -> C )


@schedule(job=dynamic_job, cron_schedule="*/1 * * * *")
def dynamic_job_scheduler(context: ScheduleEvaluationContext):
    for ticker_batch in ["abcd", "efgh", "hif"]():
        yield RunRequest(
            run_key=f"dynamic_job_scheduler_{time.time()}",
            run_config={
                "ops": {
                    "WHICH OPS ?": {
                        "config": {
                            "ticker_list": ticker_batch,
                        },
                    },

                }
            },
            tags={
                "ticker_list": ticker_batch
            }
        )
z

Zach

02/21/2024, 10:18 PM
It looks like you shouldn't have to pass config to any of the ops, because none of them specified a config schema