Danny
05/15/2020, 5:06 AM@solid
def my_solid(context, num: int):
<http://context.log.info|context.log.info>(f'num = {num}')
execute_pipeline_iterator(my_pipeline, instance=DagsterInstance.get(), environment_dict={
'solids': {
'my_solid': {
'inputs': {
'num': num + 1
}
}
}
})
@pipeline
def my_pipeline():
my_solid()
The result is that a new pipeline run is listed in dagit but is stuck on state Starting...
- am I doing something wrong?Dmitry Krylov
05/15/2020, 2:05 PMTobias Macey
05/15/2020, 6:28 PMTobias Macey
05/15/2020, 6:29 PMTobias Macey
05/15/2020, 6:33 PMTobias Macey
05/15/2020, 8:31 PMTobias Macey
05/15/2020, 8:31 PMParam "bash_command" is not a str. Got <dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7f7a69f208e0> which is type <class 'dagster.core.definitions.composition.InvokedSolidOutputHandle'>
Ben Sully
05/16/2020, 4:03 AMemr
module to see how it serializes and stores DagsterEvent
s from a pipeline executed remotely (trying to create a similar Databricks integration), but it looks like events aren't rehydrated properly?
@>>> event
DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".')
@>>> pickle.loads(pickle.dumps(event))
_DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".')
David
05/17/2020, 9:58 AMPermissive()
is the best solution.
1. Where do you think is the right place for client_type
in environment_dict
configuration?
2. How do you suggest to handle the imports for all client_types, so that if a user want to use one type of cluster he does not need to install all of them?
3. What else do we need to handle?sephi
05/18/2020, 10:00 AMspark UDF function
on a yarn
cluster, but having trouble configuring the spark resource
.
Since we are running on anaconda we are trying to use the instructions from conda to config spark.
When running outside of dagster
we can use the spark.yarn.dist.archives
configuration pointing to a hdfs folder that is accessible to all the workers.
We tried to define the sparksubmitpyFiles resource config but we are still getting the error:
Cannot run program "path/to/python/env/bin/python3.7" error=2, No such file or directory.Additionally we tried sparkdriverextraLibararyPath but still getting the same error. What would be the best way to try and implement the
PYSPARK_PYTHON
and archives
variables?
is it through parse_spark_configs.py ?Ben Sully
05/18/2020, 2:41 PM@resource(
{
'databricks_host': Field(
StringSource,
is_required=True,
description='Databricks host, e.g. <http://uksouth.azuredatabricks.com|uksouth.azuredatabricks.com>',
),
'databricks_token': Field(
StringSource, is_required=True, description='Databricks access token',
),
}
)
def databricks_pyspark_step_launcher(context):
...
then i use something like this when running it:
resources:
pyspark_step_launcher:
config:
databricks_host: <http://uksouth.azuredatabricks.net|uksouth.azuredatabricks.net>
databricks_token:
env: DATABRICKS_TOKEN
which launches the job just fine, but when it gets to executing the step remotely the DATABRICKS_TOKEN
env var isn't present (and doesn't need to be) so the step fails. is there an idiomatic solution to this? maybe passing credentials in some other way a'la boto?Chris Roth
05/18/2020, 8:07 PMdagster.yaml
, but i get Undefined field "loggers" at the root. Expected:
...
so i'm not sure where to put thisChris Roth
05/19/2020, 1:01 AMwbonelli
05/19/2020, 1:07 AMSystemComputeExecutionContext
within a composite solid? E.g., for loggingChris Roth
05/19/2020, 4:18 AMNick
05/19/2020, 12:05 PMalex torres
05/19/2020, 5:50 PMalir
05/19/2020, 6:08 PMwbonelli
05/20/2020, 1:39 AMDagsterType
) in the pipeline definition, but got a DagsterInvalidDefinitionError
saying Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
What is the composition function in this context?Binh Pham
05/20/2020, 7:56 AMmatas
05/20/2020, 10:42 AMFran Sanchez
05/20/2020, 12:57 PMrepository.yaml
or dagster.yaml
should be built. For example, in the case of dagster.yaml
I don't want to hard-coded any credentials there, how can I do that? Or what are all the options for repository.yaml
?
I hope to learn more about dagster, it seems super cool and I'm excited to start exploring it.Gaetan DELBART
05/20/2020, 1:11 PMvalues.yaml
, i've got the following
imagePullSecrets:
- name: secret-name
When I try to up my stack using Helm, the container is in status "CrashloopBackOff", and, if i get the logs of the pods, I got the following :
Error 1: Invalid scalar at path root:image_pull_secrets[0]. Value "{'name': 'secret-name'}" of type "<class 'dict'>" is not valid for expected type "String".
I've tried to dig into the issue, and try to edit the configmap-instance.yaml file, specificaly, this part
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
{{- with .Values.imagePullSecrets }}
image_pull_secrets:
{{- toYaml . | nindent 10 }}
{{- end }}
I've tried to "hardcode" the value of image_pull_secrets
After some tries, I've tried
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
image_pull_secrets:
- "secret-name"
And got the following issue
dagster.check.CheckError: Member of list mismatches type. Expected <class 'dict'>. Got 'ecr-auth' of type <class 'str'>
Seems like there is a missmatch betwwen the expected type in the config, and the one in the k8sLauncher
Any input on this, or any solutions to solve this issue ?
Thanks again for your great work 💪sandy
05/20/2020, 3:36 PMJohn Mav
05/20/2020, 5:57 PMalex torres
05/20/2020, 7:30 PMalir
05/20/2020, 9:00 PMFran Sanchez
05/20/2020, 11:43 PMcontext
or do I need to use *args
or **kwargs
?Fran Sanchez
05/21/2020, 5:27 PMDanny
05/21/2020, 7:05 PMexecute_pipeline
and execute_pipeline_iterator
seem to always block, and I tried copying how the CLI does it (https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/cli/pipeline.py#L405-L413) but both my code and the cli are throwing this:
$ DAGSTER_HOME=tmp/dagster dagster pipeline launch my_pipeline -y pipelines/repository.yaml -e pipelines/my_pipeline_config.yaml
Traceback (most recent call last):
File "/home/danny/.pyenv/versions/project-3.7.7/bin/dagster", line 8, in <module>
sys.exit(main())
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/cli/__init__.py", line 38, in main
cli(obj={}) # pylint:disable=E1123
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/core/telemetry.py", line 231, in wrap
result = f(*args, **kwargs)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/cli/pipeline.py", line 417, in pipeline_launch_command
return instance.launch_run(pipeline_run.run_id)
File "/home/danny/.pyenv/versions/3.7.7/envs/project-3.7.7/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 788, in launch_run
return self._run_launcher.launch_run(self, run)
AttributeError: 'NoneType' object has no attribute 'launch_run'
In case it matters: I haven't configured a dagster.yaml file anywhere. If it's a run_launcher
config that I'm missing, what should it be if I'm running my code or the cli on the same machine where dagit is running on localhost?