Hello! I’m working on a pipeline that works like t...
# ask-community
a
Hello! I’m working on a pipeline that works like this (picture attached). The issue that I’m facing is that when I try to run my sensor, this message is jumping:
Copy code
Missing required config entry "inputs" at the root. Sample config for missing entry: {'inputs': {'collection_id': '...', 'collections_triggered': [], 'file_prefix': '...', 'filename': '...'}}
My sensor is creating the variables
file_prefix
,
filename
and
collection_id
, and the variable
collections_triggered
is storage in the io manager. The point is I’ve them defined in the op and I’ve also defined the resources in the job (io manager). If I follow the instructions of the error and I put this as a job decorator (inputs) it’s overwriting the variables I’m giving to the job throw the sensor. and throw the io manager. What would you suggest ? My op has this decorators:
Copy code
@op(
    description="Upload Collections and Delete",
    ins={
        "filename": In(dagster_type=String, description="-"),
        "collection_id": In(dagster_type=String, description="-"),
        "file_prefix": In(dagster_type=String, description="-"),
        "collections_triggered": In(
            input_manager_key="io_manager", dagster_type=List, description="-"
        ),
    },
)
And my job:
Copy code
@job(
    resource_defs={"io_manager": fs_io_manager},
)
def upload_collections_to_bigquery_job(
    filename, collection_id, file_prefix, collections_triggered
):
    upload_collections_to_bigquery(
        filename, collection_id, file_prefix, collections_triggered
    )
And for you to have context, the Run request of my sensor looks like this:
Copy code
else:
        for element in list_of_files_and_ids:
            filename_key = element["filename"]
            filename = element["filename"]
            collection_id = element["collection_id"]
            file_prefix = filename.split("/")[0]
            # Upload files
            yield RunRequest(
                run_key=filename_key,
                run_config={
                    "ops": {
                        "upload_collections_to_bigquery": {
                            "config": {
                                "filename": filename,
                                "collection_id": collection_id,
                                "file_prefix": file_prefix,
                            }
                        },
                    }
                },
            )
Thanks in advanced!
s
Hi Adriana - try changing "config" to "inputs" here:
Copy code
"upload_collections_to_bigquery": {
                            "config": {
                                "filename": filename,
                                "collection_id": collection_id,
                                "file_prefix": file_prefix,
                            }
                        },
a
Still getting the same error: @sandy
s
Ah I think I see what's going on here. if you change your job to this:
Copy code
@job(
    resource_defs={"io_manager": fs_io_manager},
)
def upload_collections_to_bigquery_job():
    upload_collections_to_bigquery()
I believe it might fix the issue
a
@sandy I’ve remove it but maybe I should change something else? My sensor looks like:
Copy code
@sensor(
    job=upload_collections_to_bigquery_job, default_status=DefaultSensorStatus.RUNNING
)
def upload_collections_sensor():
    my_logger = logging.getLogger("root")
    <http://my_logger.info|my_logger.info>("Checking for new files...")
    list_of_files_and_ids = list(rainforest_def.get_filenames_in_storage(search_term))
    if not list_of_files_and_ids:
        yield SkipReason(f"No new files found.")
        return

    else:
        for element in list_of_files_and_ids:
            filename_key = element["filename"]
            filename = element["filename"]
            collection_id = element["collection_id"]
            file_prefix = filename.split("/")[0]
            # Upload files
            yield RunRequest(
                run_key=filename_key,
                run_config={
                    "ops": {
                        "upload_collections_to_bigquery": {
                            "inputs": {
                                "filename": filename,
                                "collection_id": collection_id,
                                "file_prefix": file_prefix,
                            }
                        },
                    }
                },
            )
            print(
                f"this is the filename_key {filename_key} of the file that just jumped"
            )


@repository()
def code_repository():
    return [upload_collections_sensor]
Because now the error is:
s
why do you have an
input_manager_key
defined on one of the `In`s, but not the other ones?
a
Mmmmm where should I include the input_manager_key? These are my ops before the job:
Copy code
@op(
    description="Upload results to BigQuery and move them to processed folder",
    ins={
        "filename": In(dagster_type=String, description="-"),
        "collection_id": In(dagster_type=String, description="-"),
        "file_prefix": In(dagster_type=String, description="-"),
        "collections_triggered": In(input_manager_key="io_manager", description="-"),
    },
    out={"collection_ids_uploaded": Out(dagster_type=List)},
)
def upload_collections(filename, collection_id, file_prefix, collections_triggered):
    collection_ids_uploaded = set()

    print(f"this is the file name omg {filename}")
    print(collection_id)
    print(file_prefix)
    print(f"this is the collections_triggered {collections_triggered}")
    <http://my_logger.info|my_logger.info>(f"{len(filename)} file found.")

    if collection_id in collections_triggered:
        # Upload files
        rainforest_def.upload_files_in_storage(file_prefix, filename)
        # Move files
        rainforest_def.move_files_in_storage(filename)

        # Append collection_id to list of collections to delete later on
        collection_ids_uploaded.add(collection_id)
        print(f"this is the collection_ids_uploaded {collection_id}")

    else:
        <http://my_logger.info|my_logger.info>(
            f"Found a file not matching collection_id criteria. File {filename} with id {collection_id} doesn't belong to this iteration."
        )
        print(
            f"Found a file not matching collection_id criteria. File {filename} with id {collection_id} doesn't belong to this iteration."
        )
        # Move files
        rainforest_def.move_files_in_storage(filename)

        <http://my_logger.info|my_logger.info>(f"Moved unknown file {filename} to /processed/.")
        print(f"Moved unknown file {filename} to /processed/.")

    time.sleep(10)
    print(collection_ids_uploaded)
    return list(collection_ids_uploaded)


@op(
    description="Delete collections",
    ins={
        "collection_ids_uploaded": In(
            dagster_type=List, description="List of collection IDs uploaded"
        )
    },
)
def delete_collections(collection_ids_uploaded):
    my_logger = logging.getLogger("root")
    rainforest_def.delete_collection(collection_ids_uploaded)
    print(f"Deleted {len(collection_ids_uploaded)} collections.")
    return


@op(
    description="Upload Collections and Delete",
    ins={
        "filename": In(dagster_type=String, description="-"),
        "collection_id": In(dagster_type=String, description="-"),
        "file_prefix": In(dagster_type=String, description="-"),
        "collections_triggered": In(
            input_manager_key="io_manager", dagster_type=List, description="-"
        ),
    },
)
def upload_collections_to_bigquery(
    filename, collection_id, file_prefix, collections_triggered
):
    collection_ids_uploaded = upload_collections(
        filename, collection_id, file_prefix, collections_triggered
    )
    print(collection_ids_uploaded)
    delete_collections(collection_ids_uploaded)
s
I was going to recommend that you actually leave out the
input_manager_key
a
I’m using it because I need that the variable collection_tiggered enter the sensor, and this variable is generated by another job (first picture of what my pipeline looks like on the beginning of the question 😊)
s
I see. What's causing the last error you saw is that Dagster basically doesn't have enough information to load the input from the other job. It needs to know where to look on the filesystem, but I don't believe that's specified in the code. You might need to write a custom IO manager that encodes this information. FWIW, this is a situation where using software-defined assets would help, because each software-defined asset corrsponds to a known location in storage.
a
Okay Sandy, thank you so much for the help ! Tomorrow I will try to fix it!