Ben Wilson
12/23/2022, 10:33 PMGabe Schine
12/24/2022, 12:25 AMBen Wilson
12/24/2022, 12:55 AMGabe Schine
12/24/2022, 1:52 AMBen Wilson
12/24/2022, 6:41 PMop()
@op(config_schema={"base_directory": str, "customer": str))
def generate_asset_metadata(context):
customer = context.op_config["customer"]
directory = context.op_config["base_directory"]
customer_directory = f'{directory}/{customer}/files/'
path_exists = os.path.exists(customer_directory)
if not path_exists:
# Create a new directory because it does not exist
os.makedirs(customer_directory)
<http://context.log.info|context.log.info>(f"Created Directory: {customer_directory}")
file_metadata = {
"customer_directory": customer_directory,
"directory": directory
}
context.log_event(
AssetMaterialization(
asset_key="file_metadata", description="Metadata for file copy"
)
)
return file_metadata
@op()
def get_file_count(context, generate_asset_metadata):
source_files = os.listdir(generate_asset_metadata["customer_directory"])
file_count = len(source_files)
<http://context.log.info|context.log.info>(str(file_count))
@job()
def file_count_job():
get_file_count(generate_asset_metadata())
@op()
def copy_files(context):
# I'd like to reference the asset here
file_path = file_metadata["customer_directory"]
copy_path = f'{file_path}/copy'
customer_files = os.listdir(file_path)
for file in customer_files:
# Copy files to new directory
return copy_path
@op()
def validate_file_copy(context, copy_files):
file_path = file_metadata["customer_directory"]
copy_path = copy_files
## Validate files
@job()
def file_count_job():
validate_file_copy(copy_files())
Gabe Schine
12/24/2022, 8:12 PMBen Wilson
12/24/2022, 8:15 PMGabe Schine
12/24/2022, 9:10 PM