Lucas Santiago Cardoso
05/25/2023, 6:46 PMTransient Layer
on our S3 lake, that receives multiple JSON per day, and we have a job that add some metadata with Pandas and takes to the other layer Raw Layer
, but still keep it as unitary JSONs per file (same amount of jsons in transient as in raw layer)
We want to execute this process once a day (start, loads all new jsons received at transient, add metadata in each json, write each json in raw)...
How could we organize the code?
Just using op, and make a Job? Should we make it an Asset?
Should we use any IOManagement, for loading it (even considering we have multiple files)? Resources for load the data?owen
05/25/2023, 9:02 PMload_input
function that knows how to read jsons from s3, and a handle_output
function that knows how to write jsons to s3.
then, your assets could look something like:
transient_layer = SourceAsset(
"transient_layer",
io_manager_key="s3_json_io_manager",
)
@asset(io_manager_key="s3_json_io_manager")
def raw_layer(transient_layer: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
... # process the json objects in the transient layer
return processed_jsons
the actual types you might have here might vary -- it sounds like you might actually want to have a list of lists of dictionaries (one list of dictionaries per file), but the principle is the same -- you can tuck away all the file handling concerns into the IOManager and have your python function for raw_layer
purely deal with adding metadata.
that's not to say there's anything wrong with a resource-based approach, if you find that more natural (this would essentially mean invoking the resource within raw_layer
to read each file one by one, then invoke it again to write each file). the main downside of that is that it becomes slightly more difficult to unit-test your function, as you'd need to mock out that resource's functionality