Brian Pohl
03/21/2023, 5:23 PM@op
decorator seem to stick and remain different for each op, but the actual op bodies all match. here is my pared down example.
i am using a class that i built here that copies data from an FTP site to S3, so much of the logic is not visible, but you get the idea. i have other examples as well, as i've run into this issue before.
def attom_ingestion_job():
for dataset in FTP_S3_OP_CONFIG: # a variable with all the config options set elsewhere
@op(name=f'copy_attom_{dataset["key"]}_from_ftp_to_s3')
def ftp_copy(context):
attom_ftp_loader = FtpToS3 (
ftp_host = '<http://attomftp.com|attomftp.com>',
ftp_username = os.environ.get('ATTOM_FTP_USERNAME'),
ftp_password = os.environ.get('ATTOM_FTP_PASSWORD'),
s3_bucket = 'my-attom-bucket',
s3_directory = dataset["key"] + '/',
ftp_file_prefix_suffix_pairs = dataset['ftp_file_prefix_suffix_pairs'],
append_upload_time_to_s3_name = dataset['append_upload_time_to_s3_name'],
)
attom_ftp_loader.download_new_ftp_files(exclude_files_in_s3=True)
ftp_copy()
there are 4 datasets in FTP_S3_OP_CONFIG
, and what i get is 4 ops all with different op names, but they all attempt to copy the same FTP data, which means that they all have the same argument for dataset['ftp_file_prefix_suffix_pairs']
. it seems this is because they're all referencing the same dataset
, as that was last modified at deployment/job definition time.
how do you get around this? i feel like i'm butting heads with fundamental design pattern: what is defined at deployment time vs. run time.
i have found two solutions: either don't use a loop at all, and declare all 4 ops like this. this matches the example of op factories in the docs - no loop, just a function that returns an op.
@op()
def op_1(context):
ftp_copy_builder(FTP_S3_OP_CONFIG['key1'])
@op()
def op_2(context):
ftp_copy_builder(FTP_S3_OP_CONFIG['key2'])
etc
or to use the op's name as a lookup against the configs, as each op can reference its own name at runtime. the configs can use the op name as a lookup key. this allows me to use a loop, but feels so hacky.jamie
03/21/2023, 7:15 PM@op(config_schema={
"key": str,
"ftp_file_prefix_suffix_pairs": str,
"append_upload_time_to_s3_name": str
})
def ftp_copy(context):
attom_ftp_loader = FtpToS3 (
ftp_host = '<http://attomftp.com|attomftp.com>',
ftp_username = os.environ.get('ATTOM_FTP_USERNAME'),
ftp_password = os.environ.get('ATTOM_FTP_PASSWORD'),
s3_bucket = 'my-attom-bucket',
s3_directory = context.op_config["key"] + '/',
ftp_file_prefix_suffix_pairs = context.op_config['ftp_file_prefix_suffix_pairs'],
append_upload_time_to_s3_name = context.op_config['append_upload_time_to_s3_name'],
)
attom_ftp_loader.download_new_ftp_files(exclude_files_in_s3=True)
config = {
"ops": {
"copy_attom_dataset_1_from_ftp_to_s3": {
"key": "dataset_1",
"ftp_file_prefix_suffix_pairs": ...,
"append_upload_time_to_s3_name": ...
},
"copy_attom_dataset_2_from_ftp_to_s3": {
"key": "dataset_2",
"ftp_file_prefix_suffix_pairs": ...,
"append_upload_time_to_s3_name": ...
},
# config for dataset_3 and dataset_4
}
}
@job(
config=config
)
def my_job():
ftp_copy.alias("copy_attom_dataset_1_from_ftp_to_s3")()
ftp_copy.alias("copy_attom_dataset_2_from_ftp_to_s3")()
ftp_copy.alias("copy_attom_dataset_3_from_ftp_to_s3")()
ftp_copy.alias("copy_attom_dataset_4_from_ftp_to_s3")()
from dagster import op, job, run_status_sensor
from dagster_gcp_pandas import bigquery_pandas_io_manager
MY_CONFIG=[
{
"name": "first",
"a": 1,
"b": 2
},
{
"name": "second",
"a": 3,
"b": 4
},
{
"name": "third",
"a": 5,
"b": 6
}
]
def build_op(vals):
@op(
name=f"my_{vals['name']}_op"
)
def my_op(context):
<http://context.log.info|context.log.info>(f"my_{vals['name']}_op values:")
<http://context.log.info|context.log.info>(f"a: {vals['a']}")
<http://context.log.info|context.log.info>(f"b: {vals['b']}")
return my_op
@job
def my_factory_job():
all_ops = [build_op(vals) for vals in MY_CONFIG]
for o in all_ops:
o()
apparently there are some particularities with how python handles loop closures (I don’t really understand the intricacies) but wrapping the op generation in a function and then calling that function in a loop works@job
def my_factory_job():
for vals in MY_CONFIG:
build_op(vals)()