Nicholas Cohn-Martin
07/12/2022, 6:14 PMTypeError: run_request_for_partition() got an unexpected keyword argument 'run_config'
Yang
07/12/2022, 6:40 PMyuhan
07/12/2022, 7:43 PMyuhan
07/12/2022, 7:43 PMdef partition_fn(partition_key: str):
return {"ops": {"my_op": {"config": {"partition": partition_key}}}} # here
@static_partitioned_config(partition_keys=["a", "b", "c", "d"])
def my_partitioned_config(partition_key: str):
return partition_fn(partition_key)
@op
def my_op(context):
print(context.op_config["partition"])
@job(config=my_partitioned_config)
def my_job():
my_op()
@schedule(cron_schedule="* * * * *", job=my_job, execution_timezone="US/Pacific")
def my_schedule(_context):
yield my_job.run_request_for_partition(partition_key="a", run_key=None)
@repository
def my_repo():
return [my_job, my_schedule]
sandy
07/12/2022, 8:14 PMYang
07/12/2022, 8:15 PM"ops": {
"unzip_and_write_to_gcs_date": {
"config": {
"zipfilepath": full_file_name ,
"xmlfolder": run_key}}}
sandy
07/12/2022, 8:23 PMYang
07/12/2022, 8:24 PMsandy
07/12/2022, 8:24 PMYang
07/12/2022, 8:24 PMsandy
07/12/2022, 8:26 PM2022-12-12
, the file name might be something like "zipfiles/20221212"Yang
07/12/2022, 8:31 PMYang
07/12/2022, 9:09 PMYang
07/12/2022, 9:10 PMYang
07/12/2022, 9:10 PMdaily_funds_job = define_asset_job(
"daily_funds_job",
selection=AssetSelection.assets(fund_holdings_asset),
partitions_def=daily_partitions_def)
Yang
07/12/2022, 9:43 PMYang
07/12/2022, 9:48 PMrun_config
sandy
07/12/2022, 10:05 PMYang
07/12/2022, 10:11 PMsandy
07/12/2022, 10:28 PMYang
07/12/2022, 10:37 PM