I’m trying to create a number of jobs dynamically....
# ask-community
h
I’m trying to create a number of jobs dynamically. The jobs themselves are essentially the same (load data from source, store in datalake), but the source tables and destination files differ. When I build this using ops they tend to interfere with each other (sometimes data is unavailable for one of the sources), so it seems the job abstraction makes the most sense (they are independent, after all). So, my real question is, what’s the best way to structure this kind of pattern? I’m currently using a job factory that returns an op with the source and destination set, but this gets abstract and unreadable quite quickly. Is there a nicer way to go about this?
Copy code
def create_job(source, destination):
	@job
	def f():
		df = load(source)
		write(df, destination)
	return f

def load(source):
	@op
	def f():
		return load_from(source)
	return f()

@repository
def repo():
	return [
		create_job(s, d)
		for s, d in ORIGINS.items()
	]
as a minimal example of what I’m doing right now (of course there’s aliasing, scheduling, resources etc involved)
s
I don't think this works. Does it?
h
it does
it’s not very readable, but it works
s
I see, it's creating solids dynamically on init time.
h
exactly. is there a better pattern for this?
s
I don't know if you have the same problem. I've created a solid that can read different configurations, first one in a pipeline, it initializes specific classes for the specific problems, and the solids then use the common interface down the DAG/pipeline.
h
do you have a bit of code to show the idea?
s
Copy code
@solid(config_schema={
    "config_type": Field(Enum.from_python_enum(ConfigFactory.TypeEnum)),
    "config_1":..., 
    "config_2":...,
})
def get_config(context):
    factory = ConfigFactory(**context.solid_config)
    config = factory.create()
    return config
later the solids just do things based on the config, let's say config.get_s3_bucket() of config.get_save_file_destination(), ive made it up but you get the idea.
This way I can have different pipelines using the same config (different pipelines are in this case strategies) and I can reuse all solids
Yeah, it's basically your problem.
h
Thanks! your comment has put me on a tangential path that turned out to work quite well - I’m now setting the source as a config in a
to_job
call in a loop. This keeps the ops and jobs super clean, and transfers the work to the repository, so it is super simple to write tests for the logic as well 🙂
❤️ 1