https://dagster.io/ logo
Title
h

Huib Keemink

02/08/2022, 8:48 AM
I’m trying to create a number of jobs dynamically. The jobs themselves are essentially the same (load data from source, store in datalake), but the source tables and destination files differ. When I build this using ops they tend to interfere with each other (sometimes data is unavailable for one of the sources), so it seems the job abstraction makes the most sense (they are independent, after all). So, my real question is, what’s the best way to structure this kind of pattern? I’m currently using a job factory that returns an op with the source and destination set, but this gets abstract and unreadable quite quickly. Is there a nicer way to go about this?
def create_job(source, destination):
	@job
	def f():
		df = load(source)
		write(df, destination)
	return f

def load(source):
	@op
	def f():
		return load_from(source)
	return f()

@repository
def repo():
	return [
		create_job(s, d)
		for s, d in ORIGINS.items()
	]
as a minimal example of what I’m doing right now (of course there’s aliasing, scheduling, resources etc involved)
s

Sebastian Napiorkowski

02/08/2022, 10:54 AM
I don't think this works. Does it?
h

Huib Keemink

02/08/2022, 10:55 AM
it does
it’s not very readable, but it works
s

Sebastian Napiorkowski

02/08/2022, 10:56 AM
I see, it's creating solids dynamically on init time.
h

Huib Keemink

02/08/2022, 10:56 AM
exactly. is there a better pattern for this?
s

Sebastian Napiorkowski

02/08/2022, 10:58 AM
I don't know if you have the same problem. I've created a solid that can read different configurations, first one in a pipeline, it initializes specific classes for the specific problems, and the solids then use the common interface down the DAG/pipeline.
h

Huib Keemink

02/08/2022, 10:59 AM
do you have a bit of code to show the idea?
s

Sebastian Napiorkowski

02/08/2022, 11:01 AM
@solid(config_schema={
    "config_type": Field(Enum.from_python_enum(ConfigFactory.TypeEnum)),
    "config_1":..., 
    "config_2":...,
})
def get_config(context):
    factory = ConfigFactory(**context.solid_config)
    config = factory.create()
    return config
later the solids just do things based on the config, let's say config.get_s3_bucket() of config.get_save_file_destination(), ive made it up but you get the idea.
This way I can have different pipelines using the same config (different pipelines are in this case strategies) and I can reuse all solids
Yeah, it's basically your problem.
h

Huib Keemink

02/08/2022, 4:13 PM
Thanks! your comment has put me on a tangential path that turned out to work quite well - I’m now setting the source as a config in a
to_job
call in a loop. This keeps the ops and jobs super clean, and transfers the work to the repository, so it is super simple to write tests for the logic as well 🙂
❤️ 1