David C
12/15/2022, 4:45 PM@repository
def foo_jobs():
all_jobs = []
for customer_id in rpc_client.get_all_customers():
for job_name in rpc_client.get_all_foo_jobs(customer_id):
all_jobs.append(
dagster.JobDefinition( ??? )
)
return all_jobs
Or do I need to write some kind of generate to write out the python code with the annotations?
I am aware of this "assets" concept as well, but I don't think that will work because these jobs have different schedules, and I need direct control of the graph for each job (different FooJobs could have different graphs based on information known only at runtime).def make_expensive_job():
@job
def expensive_job():
for i in range(10000):
return_n.alias(f'return_n_{i}')()
return expensive_job
from https://docs.dagster.io/_apidocs/repositories#dagster.repositoryjamie
12/15/2022, 5:32 PMJobDefinition
but i wouldn’t say it’s something we highly recommend if theres another, easier, method. Youd have to manually set all of the dependencies of the ops that the job contains, which can get pretty complicated. It’s not something we have examples for, because, again, we don’t really recommend it.
I don’t know if i have a great understanding of what you’re use case is, but if your database is storing the full job (ie the object created by the @job decorated function) then you would probably just be able to add those objects directly to your repository since they are already JobDefinitionsDavid C
12/15/2022, 5:39 PMwe don’t really recommend it.Excellent, thanks! I don't have anything from dagster stored in the database. It is a list of objects that are created by customers. For example, imagine a table of configured "exports" with these fields: • customer_id: int • export_schedule: rrule • collection_to_export: string • export_settings: {dict} • export_channel: [ email | adwords | etc ] One of these "exports" can be added at any time by a user, and I need to get that into the scheduling system as soon as practical, but at least within 15 min. I did manage to demonstrate dynamic jobs using a closure (hwere,
random
takes the place of an RPC client):
17 @repository
18 def repo0():
19 jobs = []
20 for customer_id in range(random.randint(1, 10)):
21 @job(name="job{}".format(customer_id))
22 def test_job():
23 pass
24
25 jobs.append(test_job)
26 return jobs
Assuming the configmap (or similar) feature works, that is probably doable.
I still haven't figure out how to get dagster to pick up the changes quickly though. It looks like a person has to go to the UI and press a button.jamie
12/15/2022, 5:41 PMDavid C
12/15/2022, 5:41 PM