https://dagster.io/ logo
#ask-community
Title
# ask-community
o

Oliver

06/30/2022, 6:51 AM
Hey all, I have a repository with the following definition
Copy code
@repository(
    default_executor_def=ray_executor
)
def production():
    return [define_asset_job(
        "build_and_train", 
        AssetSelection.assets(*all_assets.values()), 
        resource_config
    )] + list(all_assets.values())
I expect it to be run using the ray executor, instead it's using the multiprocess executor. any ideas what im doing wrong?
https://github.com/dagster-io/dagster/pull/8490 saw this, updated and still the same
c

chris

06/30/2022, 5:45 PM
this is when running from dagit?
o

Oliver

06/30/2022, 9:31 PM
yup
c

chris

06/30/2022, 10:05 PM
hmm when basically copying this on master, it seems to work. Test to observe similar behavior:
Copy code
def test_asset_executor():
    @executor
    def ray_executor():
        pass

    @asset
    def the_asset():
        pass

    all_assets = {"asset1": the_asset}

    @repository(default_executor_def=ray_executor)
    def production():
        return [
            define_asset_job("build_and_train", AssetSelection.assets(*all_assets.values()))
        ] + list(all_assets.values())

    assert production.get_job("build_and_train").executor_def == ray_executor
what version of dagster are you running with?
(also, if you attempted to run the last line
production.get_job("build_and_train").executor_def == ray_executor
, I would be curious as to the result.
o

Oliver

06/30/2022, 10:12 PM
just addding that line the workspace fails to deploy with invalid config, if I remove the ray_executor config it comes up fine with that line
im on 0.15.2
but your example does work .. hmmm (or at least fails as you would expect trying to pass args into the executor)
Copy code
TypeError: ray_executor() takes 0 positional arguments but 1 was given
  File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/grpc/impl.py", line 92, in core_execute_run
    yield from execute_run_iterator(
  File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/api.py", line 911, in __iter__
    yield from self.execution_context_manager.prepare_context()
  File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/utils/__init__.py", line 470, in generate_setup_events
    obj = next(self.generator)
  File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/context_creation_pipeline.py", line 333, in orchestration_context_event_generator
    executor = create_executor(context_creation_data)
  File "/home/ubuntu/miniconda3/envs/sepsis/lib/python3.8/site-packages/dagster/core/execution/context_creation_pipeline.py", line 425, in create_executor
    return creation_fn(init_context)
ok so when the repository is loading it requires the multiprocess/single process config and when I try to run it it requires ray config so When launching with ray executor config I get the following when reloading the repo
Copy code
dagster.core.errors.DagsterInvalidConfigError: Invalid default_value for Field. Error 1: You can only specify a single field at path root:config. You specified ['code_dir', 'host', 'port', 'requirements_path']. The available fields are ['in_process', 'multiprocess']
and if i comment that config and try to launch from the job launchpad I get
Copy code
dagster.core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Received unexpected config entry "multiprocess" at path root:execution:config. Expected: "{ code_dir?: String host?: String port?: Int requirements_path?: String }".
Finally on the overview page of the job if I click
materialize all
and then
materialize
then the job runs in the multiprocess executor
Am I doing something wrong by declaring multiple repos in the same file? I currently have the following code but after removing the dev repo, it fixes the last problem mentioned
Finally on the overview page of the job if I click
materialize all
and then
materialize
then the job runs in the multiprocess executor
Copy code
@repository
def dev():
    resource_config = yaml.safe_load(DATA_DIR.joinpath("dev.yaml").read_text())

    resource_defs={
        'aws': aws_profile
    }

    resourced_assets = with_resources(
            all_assets.values(), 
            resource_defs, 
            resource_config
        )
    return [
        *resourced_assets,
        build_and_train_job(resource_config, resourced_assets)
    ]

@repository(
    default_executor_def=ray_executor
)
def production():
    resource_config = yaml.safe_load(DATA_DIR.joinpath("prod.yaml").read_text())
    # raise Exception(resource_defs)

    resource_defs={
        'aws': aws_profile,
    }

    resourced_assets = with_resources(
            all_assets.values(), 
            resource_defs, 
            resource_config
        )
    return [
        *resourced_assets,
        build_and_train_job(resource_config, resourced_assets, ray_executor)
    ]
c

chris

06/30/2022, 10:41 PM
what command are you using to run dagit? Are you pointing at the python file or some
workspace.yaml
?
o

Oliver

06/30/2022, 10:41 PM
workspace.yaml with a module
c

chris

06/30/2022, 10:50 PM
gotcha, where exactly are you specifying ray executor config? In the launchpad?
And also just to be sure, you're executing on the
prod
version of
build_and_train
when this happens?
o

Oliver

06/30/2022, 10:52 PM
executor config specfied in in
prod.yaml
yea in the prod version, I caught on because it's trying to load my dev aws profile when clicking material for a single resource in the prod job
c

chris

06/30/2022, 11:02 PM
Ah okay i think I know what's going on. we create the job before applying the executor, which is causing the config from the ray executor to be used before it's actually applied to the job, which is where your error is coming from. https://github.com/dagster-io/dagster/issues/8698 tracks, will fix asap
in the meantime, if you remove the executor config from being applied at repo construction time, and then only apply it from the launchpad, it should work
o

Oliver

06/30/2022, 11:08 PM
Oh? How would I register the executor to be configurable from launchpad? doesn't it just use the default executor i.e the union of inprocess and multiprocess execcutors? I have found using
ray_executor.configured
as a workaround as well
c

chris

06/30/2022, 11:09 PM
yea
ray_executor.configured
will also work. The launchpad uses whatever executor the underlying job is using, so it should be configurable from there under the key
execution
o

Oliver

06/30/2022, 11:11 PM
ah and there's no way to set the executor when using
define_asset_job
c

chris

06/30/2022, 11:12 PM
We're probably going to add that eventually, but not as of now
o

Oliver

06/30/2022, 11:24 PM
Ok cool, thanks for all the help 🙂
Copy code
from dagster import AssetSelection, asset, define_asset_job, repository, with_resources, resource


def ray_executor():
    pass


@resource
def test_dev(context):
    raise Exception('dev')

@resource
def test_prod(context):
    raise Exception('prod')

@asset(
    required_resource_keys={'test'}
)
def the_asset():
    pass

all_assets = {"asset1": the_asset}


@repository()
def dev():
    resource_defs = {
        'test': test_dev
    }
    resourced_assets = with_resources(
            all_assets.values(), 
            resource_defs
        )

    return [
        define_asset_job("build_and_train", AssetSelection.assets(*resourced_assets))
    ] + list(resourced_assets)


@repository()
def production():
    resource_defs = {
        'test': test_prod
    }
    resourced_assets = with_resources(
            all_assets.values(), 
            resource_defs
        )

    return [
        define_asset_job("build_and_train", AssetSelection.assets(*resourced_assets))
    ] + list(resourced_assets)
this repros the issue I mentioned where resources aren't being applied as expected when using multiple repos. I can't find any related issues, should I file one?
c

chris

06/30/2022, 11:28 PM
okay now to go through the resource issue, what's exactly the issue you're hitting here?
o

Oliver

06/30/2022, 11:29 PM
if you load up those repos and run the jobs in them they should both raise exceptions based on the environment they're in eg from
Copy code
@resource
def test_dev(context):
    raise Exception('dev')

@resource
def test_prod(context):
    raise Exception('prod')
for me, running the dev job resulted in the prod exception
c

chris

06/30/2022, 11:31 PM
gotcha, let me try and repro
hmm okay when I use that same code I get the expected resource errors (dev for dev, prod for prod)
o

Oliver

06/30/2022, 11:48 PM
It doesn't seem to be consistent for me which one gets raise.. I think I missed a step in the repro though in the job overview tab click on the asset and then materialize selected
c

chris

06/30/2022, 11:57 PM
woah ok was able to reproduce using that. Do you want to make an issue? If not I can take that on
o

Oliver

07/01/2022, 12:02 AM
nice, not just me 😅 I can do it
c

chris

07/01/2022, 12:05 AM
thanks so much for surfacing 😅 and bearing with us as we work through these kinks
o

Oliver

07/01/2022, 12:10 AM
no worries 🙂 to be expected when running with experimental features https://github.com/dagster-io/dagster/issues/8700
do you think this is related? trying to materialize a specific asset, the underlying asset is returned configured by the repository and this specific asset should be taking its configuration from the job by underlying asset is configured I mean
Copy code
def build_and_train_job(resource_config, resourced_assets, executor=None):

    # return define_asset_job(
    #     "build_and_train", 
    #     AssetSelection.assets(*all_assets.values()), 
    #     resource_config
    # )

    return build_asset_selection_job(
        'build_and_train',
        resourced_assets,
        source_assets=[],
        executor_def=executor,
        config=resource_config,
    )

@repository
def repo():
    resourced_assets = with_resources(
            all_assets.values(), 
            resource_defs, 
            resource_config
        )
    return [
        *resourced_assets,
        build_and_train_job(resource_config, resourced_assets, ray_executor)
    ]
FYI I switched back to
0.15.0
because
0.15.2
dagit wasn't updating and kept throwing errors about unknown events
c

chris

07/01/2022, 4:39 AM
I think this is intended behavior. In the global asset graph, we don't use any of your jobs for execution, but instead construct an ad hoc job
What errors were you getting from dagit?
o

Oliver

07/01/2022, 4:40 AM
Oh just twigged that I probably had to bump my helm release too..woops
I think this is intended behavior. In the global asset graph, we don't use any of your jobs for execution, but instead construct an ad hoc job
this is in the job graph though -- the train step works when presssing materialize selected though
4 Views