David
05/17/2020, 9:58 AMPermissive()
is the best solution.
1. Where do you think is the right place for client_type
in environment_dict
configuration?
2. How do you suggest to handle the imports for all client_types, so that if a user want to use one type of cluster he does not need to install all of them?
3. What else do we need to handle?nate
05/17/2020, 3:40 PMDavid
05/17/2020, 3:55 PMnate
05/17/2020, 4:19 PMDavid
05/17/2020, 7:38 PMconfig={
'cluster_type': Field(
String,
is_required=False,
default_value='local',
description='''Type of your cluster to initialize,
can be one of the following 'local', 'yarn', 'ssh', 'pbs', 'kube'),
the default is 'local'.'''
),
'cluster_parameters': Field(
Permissive(),
is_required=False,
description='''Dictionary of your cluster parameters,
e.g. {'timeout': 30, ...}''',
),
}
• How do you suggest to test all types of clusters?nate
05/18/2020, 2:46 PMconfig = {
'cluster_type': Field(
Selector(
{
'local': Field(
Permissive(), is_required=False, description='local cluster configuration'
),
'yarn': Field(
Permissive(), is_required=False, description='YARN cluster configuration'
),
...
}
)
)
}
this way the config system will ensure that only one cluster is specifiedDavid
05/19/2020, 5:07 PMdask_executor
to Selector
, how do you suggest to make changes in DaskConfig
class.
I will try to handle the tests after finishing all implements.nate
05/20/2020, 2:57 PMlocal
, yarn
, etc.David
05/20/2020, 5:24 PMDaskConfig
class as following:
class DaskConfig(
namedtuple('DaskConfig', 'cluster_type cluster_configuration'),
ExecutorConfig,
):
def __new__(
cls,
cluster_type,
cluster_configuration,
):
return super(DaskConfig, cls).__new__(
cls,
cluster_type=check.opt_str_param(cluster_type, 'cluster_type'),
cluster_configuration=check.opt_dict_param(cluster_configuration, 'cluster_configuration'),
)
@staticmethod
def get_engine():
from .engine import DaskEngine
return DaskEngine
@property
def cluster_type(self):
return getattr(self, 'cluster_type', 'local')
def build_dict(self, pipeline_name):
cluster_type = self.cluster_type
dask_cfg = getattr(self, 'cluster_configuration', {'name': pipeline_name})
if cluster_type == 'local' and 'address' not in dask_cfg:
dask_cfg['threads_per_worker'] = 1
return dask_cfg
How it's look like to you?nate
05/20/2020, 7:12 PMDavid
05/21/2020, 5:26 PM