Hi, <@UH2NB6SF7> Following <issue>, i think that u...
# announcements
d
Hi, @nate Following issue, i think that using 
Permissive()
 is the best solution. 1. Where do you think is the right place for 
client_type
 in 
environment_dict
  configuration? 2. How do you suggest to handle the imports for all client_types, so that if a user want to use one type of cluster he does not need to install all of them? 3. What else do we need to handle?
n
yes Permissive is great. 1. I think top level in the dask executor config is fine. 2. I would just do lazy imports within each branch, so that you only import when that cluster is configured 3. I would add extras to setup.py for each cluster type in extras_require. also if you can add some tests for any cluster type you add, that would be great!
d
OK, i will start working on it. You can assign this issue to me.
n
Perfect. thanks for taking this on!
d
• What are you saying about changing dask_executor config to something like this:
Copy code
config={
    'cluster_type': Field(
        String,
        is_required=False,
        default_value='local',
        description='''Type of your cluster to initialize,
                    can be one of the following 'local', 'yarn', 'ssh', 'pbs', 'kube'),
                    the default is 'local'.'''
    ),
    'cluster_parameters': Field(
        Permissive(),
        is_required=False,
        description='''Dictionary of your cluster parameters,
                       e.g. {'timeout': 30, ...}''',
    ),
}
• How do you suggest to test all types of clusters?
n
ah I was thinking about the config being more like:
Copy code
config = {
    'cluster_type': Field(
        Selector(
            {
                'local': Field(
                    Permissive(), is_required=False, description='local cluster configuration'
                ),
                'yarn': Field(
                    Permissive(), is_required=False, description='YARN cluster configuration'
                ),
                ...
            }
        )
    )
}
this way the config system will ensure that only one cluster is specified
on testing, not sure - does Dask provide any cluster unit testing fixtures/other test machinery?
d
After changing the config section in
dask_executor
to
Selector
, how do you suggest to make changes in
DaskConfig
class. I will try to handle the tests after finishing all implements.
n
I think you can just toggle on the top level config key under `cfg['cluster_type']`—a Selector will ensure that the validated config that’s produced will be a dict with a single key, in this case that key will be
local
,
yarn
, etc.
d
Hi, i changed the
DaskConfig
class as following:
Copy code
class DaskConfig(
    namedtuple('DaskConfig', 'cluster_type cluster_configuration'),
    ExecutorConfig,
):
    def __new__(
        cls,
        cluster_type,
        cluster_configuration,
    ):
        return super(DaskConfig, cls).__new__(
            cls,
            cluster_type=check.opt_str_param(cluster_type, 'cluster_type'),
            cluster_configuration=check.opt_dict_param(cluster_configuration, 'cluster_configuration'),
        )

    @staticmethod
    def get_engine():
        from .engine import DaskEngine
        return DaskEngine
    
    @property
    def cluster_type(self):
        return getattr(self, 'cluster_type', 'local')
    
    def build_dict(self, pipeline_name):
        cluster_type = self.cluster_type
        dask_cfg = getattr(self, 'cluster_configuration', {'name': pipeline_name})

        if cluster_type == 'local' and 'address' not in dask_cfg:
            dask_cfg['threads_per_worker'] = 1

        return dask_cfg
How it's look like to you?
n
cool, generally looks reasonable, probably worth getting this in a PR and we can chime in on specifics there
d
thanks, i opened a PR.