Adriana Jiménez Ambel
06/08/2023, 5:33 PMMissing required config entry "inputs" at the root. Sample config for missing entry: {'inputs': {'collection_id': '...', 'collections_triggered': [], 'file_prefix': '...', 'filename': '...'}}
My sensor is creating the variables file_prefix
, filename
and collection_id
, and the variable collections_triggered
is storage in the io manager.
The point is I’ve them defined in the op and I’ve also defined the resources in the job (io manager).
If I follow the instructions of the error and I put this as a job decorator (inputs) it’s overwriting the variables I’m giving to the job throw the sensor. and throw the io manager.
What would you suggest ?
My op has this decorators:
@op(
description="Upload Collections and Delete",
ins={
"filename": In(dagster_type=String, description="-"),
"collection_id": In(dagster_type=String, description="-"),
"file_prefix": In(dagster_type=String, description="-"),
"collections_triggered": In(
input_manager_key="io_manager", dagster_type=List, description="-"
),
},
)
And my job:
@job(
resource_defs={"io_manager": fs_io_manager},
)
def upload_collections_to_bigquery_job(
filename, collection_id, file_prefix, collections_triggered
):
upload_collections_to_bigquery(
filename, collection_id, file_prefix, collections_triggered
)
And for you to have context, the Run request of my sensor looks like this:
else:
for element in list_of_files_and_ids:
filename_key = element["filename"]
filename = element["filename"]
collection_id = element["collection_id"]
file_prefix = filename.split("/")[0]
# Upload files
yield RunRequest(
run_key=filename_key,
run_config={
"ops": {
"upload_collections_to_bigquery": {
"config": {
"filename": filename,
"collection_id": collection_id,
"file_prefix": file_prefix,
}
},
}
},
)
Thanks in advanced!sandy
06/08/2023, 11:37 PM"upload_collections_to_bigquery": {
"config": {
"filename": filename,
"collection_id": collection_id,
"file_prefix": file_prefix,
}
},
Adriana Jiménez Ambel
06/09/2023, 7:24 AMsandy
06/12/2023, 3:55 PM@job(
resource_defs={"io_manager": fs_io_manager},
)
def upload_collections_to_bigquery_job():
upload_collections_to_bigquery()
I believe it might fix the issueAdriana Jiménez Ambel
06/12/2023, 3:58 PM@sensor(
job=upload_collections_to_bigquery_job, default_status=DefaultSensorStatus.RUNNING
)
def upload_collections_sensor():
my_logger = logging.getLogger("root")
<http://my_logger.info|my_logger.info>("Checking for new files...")
list_of_files_and_ids = list(rainforest_def.get_filenames_in_storage(search_term))
if not list_of_files_and_ids:
yield SkipReason(f"No new files found.")
return
else:
for element in list_of_files_and_ids:
filename_key = element["filename"]
filename = element["filename"]
collection_id = element["collection_id"]
file_prefix = filename.split("/")[0]
# Upload files
yield RunRequest(
run_key=filename_key,
run_config={
"ops": {
"upload_collections_to_bigquery": {
"inputs": {
"filename": filename,
"collection_id": collection_id,
"file_prefix": file_prefix,
}
},
}
},
)
print(
f"this is the filename_key {filename_key} of the file that just jumped"
)
@repository()
def code_repository():
return [upload_collections_sensor]
Adriana Jiménez Ambel
06/12/2023, 3:58 PMsandy
06/12/2023, 3:59 PMinput_manager_key
defined on one of the `In`s, but not the other ones?Adriana Jiménez Ambel
06/12/2023, 4:01 PM@op(
description="Upload results to BigQuery and move them to processed folder",
ins={
"filename": In(dagster_type=String, description="-"),
"collection_id": In(dagster_type=String, description="-"),
"file_prefix": In(dagster_type=String, description="-"),
"collections_triggered": In(input_manager_key="io_manager", description="-"),
},
out={"collection_ids_uploaded": Out(dagster_type=List)},
)
def upload_collections(filename, collection_id, file_prefix, collections_triggered):
collection_ids_uploaded = set()
print(f"this is the file name omg {filename}")
print(collection_id)
print(file_prefix)
print(f"this is the collections_triggered {collections_triggered}")
<http://my_logger.info|my_logger.info>(f"{len(filename)} file found.")
if collection_id in collections_triggered:
# Upload files
rainforest_def.upload_files_in_storage(file_prefix, filename)
# Move files
rainforest_def.move_files_in_storage(filename)
# Append collection_id to list of collections to delete later on
collection_ids_uploaded.add(collection_id)
print(f"this is the collection_ids_uploaded {collection_id}")
else:
<http://my_logger.info|my_logger.info>(
f"Found a file not matching collection_id criteria. File {filename} with id {collection_id} doesn't belong to this iteration."
)
print(
f"Found a file not matching collection_id criteria. File {filename} with id {collection_id} doesn't belong to this iteration."
)
# Move files
rainforest_def.move_files_in_storage(filename)
<http://my_logger.info|my_logger.info>(f"Moved unknown file {filename} to /processed/.")
print(f"Moved unknown file {filename} to /processed/.")
time.sleep(10)
print(collection_ids_uploaded)
return list(collection_ids_uploaded)
@op(
description="Delete collections",
ins={
"collection_ids_uploaded": In(
dagster_type=List, description="List of collection IDs uploaded"
)
},
)
def delete_collections(collection_ids_uploaded):
my_logger = logging.getLogger("root")
rainforest_def.delete_collection(collection_ids_uploaded)
print(f"Deleted {len(collection_ids_uploaded)} collections.")
return
@op(
description="Upload Collections and Delete",
ins={
"filename": In(dagster_type=String, description="-"),
"collection_id": In(dagster_type=String, description="-"),
"file_prefix": In(dagster_type=String, description="-"),
"collections_triggered": In(
input_manager_key="io_manager", dagster_type=List, description="-"
),
},
)
def upload_collections_to_bigquery(
filename, collection_id, file_prefix, collections_triggered
):
collection_ids_uploaded = upload_collections(
filename, collection_id, file_prefix, collections_triggered
)
print(collection_ids_uploaded)
delete_collections(collection_ids_uploaded)
sandy
06/12/2023, 4:03 PMinput_manager_key
Adriana Jiménez Ambel
06/12/2023, 4:04 PMsandy
06/12/2023, 4:19 PMAdriana Jiménez Ambel
06/12/2023, 7:43 PM