Jeremy Hermann
05/29/2023, 12:40 AMdagster.materialize
(or similar) to materialize a set of assets using a specific executor (eg a multiprocess_executor
)?claire
05/30/2023, 6:30 PMdagster.materialize
and execute_in_process
both execute the assets in a single process, and can't be configured to use another executor.
You could instead use execute_job
, which will work with the multiprocess executor.
https://github.com/dagster-io/dagster/blob/48a8c8aac33499a4b7ca03785d3e2b4709dab257/python_modules/dagster/dagster/_core/execution/api.py#L329-L430Jeremy Hermann
06/11/2023, 3:44 PMload_assets_from_modules
, how do i turn those into a job that i can pass these to execute_job
?
i have tried using define_asset_job
but keep getting errors when i try to convert that to a reconstructable job
define_asset_job
gives an UnresolvedAssetJobDefinition
instance, and then i can't figure out how to convert this to a ResolvedJobDefinition
calling unresolved_job_dev.resolve()
results in errors like this...
Traceback (most recent call last):
File "/Users/jeremyhermann/dev/tutorial-project/test_materialize.py", line 35, in <module>
print(job.resolve())
File "/Users/jeremyhermann/miniforge3/delphina-alpha/lib/python3.10/site-packages/dagster/_core/definitions/unresolved_asset_job_definition.py", line 181, in resolve
check.failed(
File "/Users/jeremyhermann/miniforge3/delphina-alpha/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1669, in failed
raise CheckError(f"Failure condition: {desc}")
dagster._check.CheckError: Failure condition: If asset_graph is not provided, must provide both assets and source_assets
Jeremy Hermann
06/18/2023, 12:39 AMAbhishek Agrawal
06/18/2023, 2:49 AMJeremy Hermann
06/18/2023, 3:19 AMexecute_job
If I define the job like this, then i get no errors, but no assets get materialized
@job
def the_job():
...
instance = DagsterInstance.get()
result = execute_job(reconstructable(the_job), instance=instance)
assert result.success
if I define the job like this:
the_job = define_asset_job(name="all_assets_job")
instance = DagsterInstance.get()
result = execute_job(reconstructable(the_job), instance=instance)
assert result.success
then i get errors like this...
dagster._core.errors.DagsterInvariantViolationError: Reconstructable target should be a function or definition produced by a decorated function, got <class 'dagster._core.definitions.unresolved_asset_job_definition.UnresolvedAssetJobDefinition'>.
Abhishek Agrawal
06/18/2023, 9:09 AM