Huib Keemink
02/08/2022, 8:48 AMdef create_job(source, destination):
@job
def f():
df = load(source)
write(df, destination)
return f
def load(source):
@op
def f():
return load_from(source)
return f()
@repository
def repo():
return [
create_job(s, d)
for s, d in ORIGINS.items()
]
Sebastian Napiorkowski
02/08/2022, 10:54 AMHuib Keemink
02/08/2022, 10:55 AMSebastian Napiorkowski
02/08/2022, 10:56 AMHuib Keemink
02/08/2022, 10:56 AMSebastian Napiorkowski
02/08/2022, 10:58 AMHuib Keemink
02/08/2022, 10:59 AMSebastian Napiorkowski
02/08/2022, 11:01 AM@solid(config_schema={
"config_type": Field(Enum.from_python_enum(ConfigFactory.TypeEnum)),
"config_1":...,
"config_2":...,
})
def get_config(context):
factory = ConfigFactory(**context.solid_config)
config = factory.create()
return config
Huib Keemink
02/08/2022, 4:13 PMto_job
call in a loop. This keeps the ops and jobs super clean, and transfers the work to the repository, so it is super simple to write tests for the logic as well 🙂