Oliver
06/30/2022, 6:51 AM@repository(
default_executor_def=ray_executor
)
def production():
return [define_asset_job(
"build_and_train",
AssetSelection.assets(*all_assets.values()),
resource_config
)] + list(all_assets.values())
I expect it to be run using the ray executor, instead it's using the multiprocess executor. any ideas what im doing wrong?chris
06/30/2022, 5:45 PMOliver
06/30/2022, 9:31 PMchris
06/30/2022, 10:05 PMdef test_asset_executor():
@executor
def ray_executor():
pass
@asset
def the_asset():
pass
all_assets = {"asset1": the_asset}
@repository(default_executor_def=ray_executor)
def production():
return [
define_asset_job("build_and_train", AssetSelection.assets(*all_assets.values()))
] + list(all_assets.values())
assert production.get_job("build_and_train").executor_def == ray_executor
what version of dagster are you running with?production.get_job("build_and_train").executor_def == ray_executor
, I would be curious as to the result.Oliver
06/30/2022, 10:12 PMTypeError: ray_executor() takes 0 positional arguments but 1 was given
File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/grpc/impl.py", line 92, in core_execute_run
yield from execute_run_iterator(
File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/api.py", line 911, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/utils/__init__.py", line 470, in generate_setup_events
obj = next(self.generator)
File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/context_creation_pipeline.py", line 333, in orchestration_context_event_generator
executor = create_executor(context_creation_data)
File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/context_creation_pipeline.py", line 425, in create_executor
return creation_fn(init_context)
dagster.core.errors.DagsterInvalidConfigError: Invalid default_value for Field. Error 1: You can only specify a single field at path root:config. You specified ['code_dir', 'host', 'port', 'requirements_path']. The available fields are ['in_process', 'multiprocess']
and if i comment that config and try to launch from the job launchpad I get
dagster.core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "multiprocess" at path root:execution:config. Expected: "{ code_dir?: String host?: String port?: Int requirements_path?: String }".
Finally on the overview page of the job if I click materialize all
and then materialize
then the job runs in the multiprocess executorFinally on the overview page of the job if I clickand thenmaterialize all
then the job runs in the multiprocess executormaterialize
@repository
def dev():
resource_config = yaml.safe_load(DATA_DIR.joinpath("dev.yaml").read_text())
resource_defs={
'aws': aws_profile
}
resourced_assets = with_resources(
all_assets.values(),
resource_defs,
resource_config
)
return [
*resourced_assets,
build_and_train_job(resource_config, resourced_assets)
]
@repository(
default_executor_def=ray_executor
)
def production():
resource_config = yaml.safe_load(DATA_DIR.joinpath("prod.yaml").read_text())
# raise Exception(resource_defs)
resource_defs={
'aws': aws_profile,
}
resourced_assets = with_resources(
all_assets.values(),
resource_defs,
resource_config
)
return [
*resourced_assets,
build_and_train_job(resource_config, resourced_assets, ray_executor)
]
chris
06/30/2022, 10:41 PMworkspace.yaml
?Oliver
06/30/2022, 10:41 PMchris
06/30/2022, 10:50 PMprod
version of build_and_train
when this happens?Oliver
06/30/2022, 10:52 PMprod.yaml
yea in the prod version, I caught on because it's trying to load my dev aws profile when clicking material for a single resource in the prod jobchris
06/30/2022, 11:02 PMOliver
06/30/2022, 11:08 PMray_executor.configured
as a workaround as wellchris
06/30/2022, 11:09 PMray_executor.configured
will also work. The launchpad uses whatever executor the underlying job is using, so it should be configurable from there under the key execution
Oliver
06/30/2022, 11:11 PMdefine_asset_job
chris
06/30/2022, 11:12 PMOliver
06/30/2022, 11:24 PMfrom dagster import AssetSelection, asset, define_asset_job, repository, with_resources, resource
def ray_executor():
pass
@resource
def test_dev(context):
raise Exception('dev')
@resource
def test_prod(context):
raise Exception('prod')
@asset(
required_resource_keys={'test'}
)
def the_asset():
pass
all_assets = {"asset1": the_asset}
@repository()
def dev():
resource_defs = {
'test': test_dev
}
resourced_assets = with_resources(
all_assets.values(),
resource_defs
)
return [
define_asset_job("build_and_train", AssetSelection.assets(*resourced_assets))
] + list(resourced_assets)
@repository()
def production():
resource_defs = {
'test': test_prod
}
resourced_assets = with_resources(
all_assets.values(),
resource_defs
)
return [
define_asset_job("build_and_train", AssetSelection.assets(*resourced_assets))
] + list(resourced_assets)
this repros the issue I mentioned where resources aren't being applied as expected when using multiple repos. I can't find any related issues, should I file one?chris
06/30/2022, 11:28 PMOliver
06/30/2022, 11:29 PM@resource
def test_dev(context):
raise Exception('dev')
@resource
def test_prod(context):
raise Exception('prod')
for me, running the dev job resulted in the prod exceptionchris
06/30/2022, 11:31 PMOliver
06/30/2022, 11:48 PMchris
06/30/2022, 11:57 PMOliver
07/01/2022, 12:02 AMchris
07/01/2022, 12:05 AMOliver
07/01/2022, 12:10 AMdef build_and_train_job(resource_config, resourced_assets, executor=None):
# return define_asset_job(
# "build_and_train",
# AssetSelection.assets(*all_assets.values()),
# resource_config
# )
return build_asset_selection_job(
'build_and_train',
resourced_assets,
source_assets=[],
executor_def=executor,
config=resource_config,
)
@repository
def repo():
resourced_assets = with_resources(
all_assets.values(),
resource_defs,
resource_config
)
return [
*resourced_assets,
build_and_train_job(resource_config, resourced_assets, ray_executor)
]
0.15.0
because 0.15.2
dagit wasn't updating and kept throwing errors about unknown eventschris
07/01/2022, 4:39 AMOliver
07/01/2022, 4:40 AMI think this is intended behavior. In the global asset graph, we don't use any of your jobs for execution, but instead construct an ad hoc jobthis is in the job graph though -- the train step works when presssing materialize selected though