https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Abhinav Dhulipala

02/07/2024, 8:29 AM
Hey y'all, What's the best way to configure the fanout of a particular job. I have a graph that discovers files in s3, transforms them, and uploads them to our DB. Currently, I'm using regular jobs as such
Copy code
@op
def find_valid_files() -> list[pl.DataFrame]:
    return [df for df in dfs]

@op
def upload_dfs(dfs: list[pl.DataFrame], sqlalchemy_resource: SqlAlchemyresource):
    [sqlalchemy_resource.upload(df) for df in dfs]

@job
def discover_and_upload():
    upload_dfs(find_valid_files())
Before it's said, we've tried sensors but found that either our server side s3 server (not aws) or dagsters cursor get poisoned. Since we store our data in a hive partitioned format with date being one of the columns, we've found it more reliable to rely on schedule definitions and scan the directory every couple min. We've used dynamic graphs and enjoy them quite a bit, but are having trouble using it for this use case because our DB is small and self hosted, so we cannot afford to flood it with multiple live connections. Hence, part of the reason we use dagster as middleware in the first place. We were wondering if there was a way we can control the number of jobs dispatched in parrallel to avoid this. i.e
Copy code
@op
def find_valid_files() -> Generator[DynamicOut(pl.DataFrame), None, none]:
    ...
    yield DynamicOutput(df, mapping_key=s3_key)

@op
def upload_df(df: pl.DataFrame, sqlalchemy_resource: SqlAlchemyresource):
    sqlalchemy_resource.uplaod(df)

@job
def discover_and_upload():
    find_valid_files().map(upload_df).collect()
This approach offers a bunch of advantages, but the largest is definitely a nice error isolation and ui
z

Zach

02/07/2024, 2:46 PM
Have you tried op-based concurrency?
a

Abhinav Dhulipala

02/08/2024, 10:39 PM
This looks great. Seems to be exactly what I'm looking for. Thanks I didn't know about this. I'll give this a try and get back to you
3 Views