Dimka Filippov
01/25/2023, 4:20 PMasyncio.run(function_configured_as_resource)
from the Op?
I'm using an external Python module written with async functions to be executed via asyncio.run(), imported it, configured it as a Dagster resource, and tried to run it in the Op, but got the 405 Method not allowed error from Dagster, not from the API endpoint to which the module configured to make requests... the URL of API and some other configuration items are set in the environment variables
yes, the module makes POST requests using aiohttp session within an awaited function
when I'm trying to use the module directly without Dagster, it just works (with the same configuration items from environment variables), uses the manually downloaded data file, and makes successful POST requests...
Dagster I wanted to use to combine configured steps of correct data downloading and running ingestion process against it
please help me to understand, what I'm doing wrong???Zach
01/25/2023, 5:40 PMraise Exception(f"ingestion process failed with error {e}") from e
so that you still get the rest of the stack trace from the original exception. It certainly still might be related to Dagster or the environment Dagster is deployed in, but seeing the whole stack trace will make it a lot easier to debugDimka Filippov
01/25/2023, 6:05 PMZach
01/25/2023, 6:44 PMasyncio.run()
, looks like it starts in sac_data_ingestion.migration_async.py
, which seems to be the library your context.resources.ingestion_client
is pointed at. the 405 would indicate that the API your trying to hit is using the wrong HTTP method - is it possible some of the environment variable configuration you have set when you're testing outside of Dagster isn't present when you're running inside Dagster? Maybe try hardcoding it inside the op temporarily to make sure that's not the case?Dimka Filippov
01/25/2023, 8:03 PMZach
01/25/2023, 8:08 PMDimka Filippov
01/25/2023, 8:11 PMasyncio.run(context.resources.ingestion_client(
pq_path = files #result from Op1
))
Zach
01/25/2023, 8:21 PM@resource(config_schema={...})
def ingestion_client(init_context):
url = init_context.resource_config["url"]
...
def resource_factory(pq_path):
return ContractCustomerPost(url=url, pq_path=pq_path, ...)
return resource_factory
Dimka Filippov
01/25/2023, 8:21 PMdef process_files(context, file):
context.resources.ingestion_client(file)
asyncio.run(context.resources.ingestion_client.run())
Zach
01/25/2023, 8:41 PMdef process_files(context, file):
ingestion_client = context.resources.ingestion_client(file)
asyncio.run(ingestion_client.run())
because the resource factory is returning the clientDimka Filippov
01/25/2023, 8:42 PM