hi everyone, do we have somewhere the examples/doc...
# ask-community
d
hi everyone, do we have somewhere the examples/docs on how to execute
asyncio.run(function_configured_as_resource)
from the Op? I'm using an external Python module written with async functions to be executed via asyncio.run(), imported it, configured it as a Dagster resource, and tried to run it in the Op, but got the 405 Method not allowed error from Dagster, not from the API endpoint to which the module configured to make requests... the URL of API and some other configuration items are set in the environment variables yes, the module makes POST requests using aiohttp session within an awaited function when I'm trying to use the module directly without Dagster, it just works (with the same configuration items from environment variables), uses the manually downloaded data file, and makes successful POST requests... Dagster I wanted to use to combine configured steps of correct data downloading and running ingestion process against it please help me to understand, what I'm doing wrong???
z
It looks to me from the stack trace that the error is coming from your asyncio.run function call, but you're squashing most of the contextual information when you catch the exception and re-raise your own. I'd suggest doing
raise Exception(f"ingestion process failed with error {e}") from e
so that you still get the rest of the stack trace from the original exception. It certainly still might be related to Dagster or the environment Dagster is deployed in, but seeing the whole stack trace will make it a lot easier to debug
d
i understand, and will try to extend the error message with exception as you offered, but because of the module already has its own logger (for responses from API) which starts with log file creation but left empty without any record, I still assume the error 405 I'm getting from Dagster
ok, this time the log is much deeper, but I still don't understand what is wrong
z
yeah I don't either, but it looks like the error is coming from the method called by
asyncio.run()
, looks like it starts in
sac_data_ingestion.migration_async.py
, which seems to be the library your
context.resources.ingestion_client
is pointed at. the 405 would indicate that the API your trying to hit is using the wrong HTTP method - is it possible some of the environment variable configuration you have set when you're testing outside of Dagster isn't present when you're running inside Dagster? Maybe try hardcoding it inside the op temporarily to make sure that's not the case?
d
to make sure that I'm using the same values from the same environment variables... 1. successful execution directly with the same envars, and with non-empty requests_log 2. the resource definition (before Ops declaration) 3. the failed execution within Dagster, with empty requests log (no requests really done, checked with support mates on the side of service), and demonstrated config section for the resource
oh, I see now... 🤣 how good is it to place a post here before seeing my own mistake 🤦‍♂️ the value for the pq_path must be not from envars, but from result of previous Op
dagster spin 1
z
happens all the time, seems like you're on the right path though!
d
I think yes... but just a little confused... how to pass the result of Op #1 (string) to the value of one of the config items of the resource before it's usage in Op #2?
I can make the assignment like
Copy code
asyncio.run(context.resources.ingestion_client(
    pq_path = files #result from Op1
))
z
so you can't pass an output from an op directly to a resource. what you can do instead is make a resource which returns an initialization function that you can pass the output into at runtime. it's a little tricky in your case because you have config you're providing in the resource definition, and additional config that's generated at runtime and needs to be added in your second op. so you need to partially initialize ContractCustomerPost, return it, then finish initializing it in your op. you might be able to do it with a closure, something like
Copy code
@resource(config_schema={...})
def ingestion_client(init_context):
    url = init_context.resource_config["url"]
    ...
    def resource_factory(pq_path):
        return ContractCustomerPost(url=url, pq_path=pq_path, ...)
    return resource_factory
d
ah no, I can't, the assignment of the argument must be done on the resource declaration, when the client object is initialized
interesting
I understood the trick of passing the function to finish the initialization in the Op, but thinking about how it must be realized... in the Op, will I do the following?
Copy code
def process_files(context, file):
    context.resources.ingestion_client(file)
    asyncio.run(context.resources.ingestion_client.run())
z
close, I think it'd be more like
Copy code
def process_files(context, file):
    ingestion_client = context.resources.ingestion_client(file)
    asyncio.run(ingestion_client.run())
because the resource factory is returning the client
d
ah yeah, the wrapping 😃
thank you very much @Zach the trick is good, and it works 😃
🦜 1