https://dagster.io/ logo
#ask-community
Title
# ask-community
b

Brian Pohl

03/21/2023, 5:23 PM
i am struggling with the op factory pattern. i guess it doesn't work great with loops. it looks like the problem is the loop is run at job deployment time, so when the ops are accessing variables set in the outer context of the loop, they all use the versions of those variables set in the final iteration of the loop. values that i put into the
@op
decorator seem to stick and remain different for each op, but the actual op bodies all match. here is my pared down example. i am using a class that i built here that copies data from an FTP site to S3, so much of the logic is not visible, but you get the idea. i have other examples as well, as i've run into this issue before.
Copy code
def attom_ingestion_job():
    for dataset in FTP_S3_OP_CONFIG:  # a variable with all the config options set elsewhere

        @op(name=f'copy_attom_{dataset["key"]}_from_ftp_to_s3')
        def ftp_copy(context):

            attom_ftp_loader = FtpToS3 (
                ftp_host = '<http://attomftp.com|attomftp.com>',
                ftp_username = os.environ.get('ATTOM_FTP_USERNAME'),
                ftp_password = os.environ.get('ATTOM_FTP_PASSWORD'),
                s3_bucket = 'my-attom-bucket',
                s3_directory = dataset["key"] + '/',
                ftp_file_prefix_suffix_pairs = dataset['ftp_file_prefix_suffix_pairs'],
                append_upload_time_to_s3_name = dataset['append_upload_time_to_s3_name'],
            )
            attom_ftp_loader.download_new_ftp_files(exclude_files_in_s3=True)

        ftp_copy()
there are 4 datasets in
FTP_S3_OP_CONFIG
, and what i get is 4 ops all with different op names, but they all attempt to copy the same FTP data, which means that they all have the same argument for
dataset['ftp_file_prefix_suffix_pairs']
. it seems this is because they're all referencing the same
dataset
, as that was last modified at deployment/job definition time. how do you get around this? i feel like i'm butting heads with fundamental design pattern: what is defined at deployment time vs. run time. i have found two solutions: either don't use a loop at all, and declare all 4 ops like this. this matches the example of op factories in the docs - no loop, just a function that returns an op.
Copy code
@op()
def op_1(context):
    ftp_copy_builder(FTP_S3_OP_CONFIG['key1'])

@op()
def op_2(context):
    ftp_copy_builder(FTP_S3_OP_CONFIG['key2'])

etc
or to use the op's name as a lookup against the configs, as each op can reference its own name at runtime. the configs can use the op name as a lookup key. this allows me to use a loop, but feels so hacky.
j

jamie

03/21/2023, 7:15 PM
hey @Brian Pohl - i’m checking to see if thisthatkind of pattern works. Typically we use op config for this kind of thing. So you might do something like this
Copy code
@op(config_schema={
    "key": str,
    "ftp_file_prefix_suffix_pairs": str,
    "append_upload_time_to_s3_name": str
})
def ftp_copy(context):

    attom_ftp_loader = FtpToS3 (
        ftp_host = '<http://attomftp.com|attomftp.com>',
        ftp_username = os.environ.get('ATTOM_FTP_USERNAME'),
        ftp_password = os.environ.get('ATTOM_FTP_PASSWORD'),
        s3_bucket = 'my-attom-bucket',
        s3_directory = context.op_config["key"] + '/',
        ftp_file_prefix_suffix_pairs = context.op_config['ftp_file_prefix_suffix_pairs'],
        append_upload_time_to_s3_name = context.op_config['append_upload_time_to_s3_name'],
    )
    attom_ftp_loader.download_new_ftp_files(exclude_files_in_s3=True)


config = {
    "ops": {
        "copy_attom_dataset_1_from_ftp_to_s3": {
            "key": "dataset_1",
            "ftp_file_prefix_suffix_pairs": ...,
            "append_upload_time_to_s3_name": ...
        },
        "copy_attom_dataset_2_from_ftp_to_s3": {
            "key": "dataset_2",
            "ftp_file_prefix_suffix_pairs": ...,
            "append_upload_time_to_s3_name": ...
        },
        # config for dataset_3 and dataset_4
    }
}

@job(
    config=config
)
def my_job():
    ftp_copy.alias("copy_attom_dataset_1_from_ftp_to_s3")()
    ftp_copy.alias("copy_attom_dataset_2_from_ftp_to_s3")()
    ftp_copy.alias("copy_attom_dataset_3_from_ftp_to_s3")()
    ftp_copy.alias("copy_attom_dataset_4_from_ftp_to_s3")()
👍 1
I spent a bit more time on this, and this also works
Copy code
from dagster import op, job, run_status_sensor
from dagster_gcp_pandas import bigquery_pandas_io_manager

MY_CONFIG=[
    {
        "name": "first",
        "a": 1,
        "b": 2
    },
    {
        "name": "second",
        "a": 3,
        "b": 4
    },
    {
        "name": "third",
        "a": 5,
        "b": 6
    }
]


def build_op(vals):
    @op(
        name=f"my_{vals['name']}_op"
    )
    def my_op(context):
        <http://context.log.info|context.log.info>(f"my_{vals['name']}_op values:")
        <http://context.log.info|context.log.info>(f"a: {vals['a']}")
        <http://context.log.info|context.log.info>(f"b: {vals['b']}")

    return my_op

@job
def my_factory_job():
    all_ops = [build_op(vals) for vals in MY_CONFIG]
    for o in all_ops:
        o()
apparently there are some particularities with how python handles loop closures (I don’t really understand the intricacies) but wrapping the op generation in a function and then calling that function in a loop works
you could also write the job like this if you prefer
Copy code
@job
def my_factory_job():
    for vals in MY_CONFIG:
         build_op(vals)()
9 Views