Michel Rouly
07/12/2021, 12:23 PMversioned_filesystem_io_manager
, my Solid has a version property defined, but I'm consistently seeing:
dagster.check.ParameterCheckError: Param "context.version" is not a str. Got None which is type <class 'NoneType'>.
esztermarton
07/12/2021, 3:04 PMDEBUG
logs available 2021-07-12 14:02:44 - dagster - DEBUG - multiprocess_pipeline - 845b164c-932e-4c7e-9f6b-85877d5b5f24 - 78555 - dog - ENGINE_EVENT - Starting initialization of resources [io_manager, mlflow].
- somehow subscribe to these logs and if a solid sees a log like this then wait for it to get the respective 2021-07-12 14:02:45 - dagster - DEBUG - multiprocess_pipeline - 845b164c-932e-4c7e-9f6b-85877d5b5f24 - 78555 - dog - ENGINE_EVENT - Finished initialization of resources [io_manager, mlflow].
2. Is there a better source of events (e.g. ENGINE_EVENT) that can be subscribed to maybe? To do the above, but with an event rather than looking at logs
3. Is there a way to restrict parallelisation of resource initialisation by any chance? Or set some kind of minimum time elapsed between executing 2 solids?
4. hacky - sleep a random fraction of a second. Hopefully in our case this will usually solve the problem
5. ugly - do a second check after creating the run to see if 2 have been created and if so, delete the one that was created laterwkeifenheim
07/12/2021, 5:56 PMQ1|Q2|.. of year YYYY
level can be shared at this time.Noah Sanor
07/12/2021, 7:08 PMrun_dbt_models = dbt_cli_run.configured(default_dbt_config, name="run_dbt_models")
And then in each of my pipelines, "reconfigure" the solids to add additional configs like so:
run_daily_models = configured(run_dbt_models, name="run_daily_models")({"models": ["+tag:daily"]})
(I ask because I am trying and failing to do this currently)Noah Sanor
07/12/2021, 9:02 PM.Values.global.postgresqlSecretName
should be set to the name of the externally managed Secret."
Is there any more info on using an external secret? We use AWS secrets manager for our secrets, so I'll like to leverage that if possible.Jordan W
07/12/2021, 11:02 PMasync
solids? currently working through this and there is a DagsterTypeCheckDidNotPass
with type failure along the lines of expected return_type got coroutine
. Our tests that use execute_solid
for our async solids still work fwiwjeremy
07/13/2021, 3:00 AMworkspace.yaml
you use to run Dagit to load from that module.”
Is this saying that the celery workers should load from the same python modules that are specified in the workspace.yaml that Dagit uses, or that there is a way to configure a workspace.yaml for a dagster celery worker?Peter B
07/13/2021, 6:59 AMllucid-97
07/13/2021, 8:14 AMDmitry Mogilevsky
07/13/2021, 9:14 AM.map()
function isn't called, and neither do any of the subsequent solids in the graph. The graph works properly when not running on dask executorStepan Dvoiak
07/13/2021, 11:16 AMdagster/priority
.
So I tried and thats works only for InProcessExecutor
and MultiprocessExecutor
with max_concurrent: 1
but for max_concurrent > 1
that will not work as well as for CeleryExecutor
with dagster-celery/priority
tag
So tag dagster-celery/priority
in case of CeleryExecutor with only one process (autoscale=1,1
) have no effect in that particular example case
Can anyone know how to achieve same effect on CeleryExecutor with one worker to execute all parts of composite_solid on first DynamicOutput and only after - on secondPeter B
07/13/2021, 11:22 AMesztermarton
07/13/2021, 12:19 PMDarren Haken
07/13/2021, 12:23 PM@pipeline(mode_defs=[ModeDefinition(resource_defs={"databricks_client": databricks_client})])
def spark_job():
data_bricks_job_solid = create_databricks_job_solid(name="daily_stock")
data_bricks_job_solid()
This code works.
How do I add config to databricks for the job?
I tried:
data_bricks_job_solid(
{
"name": "SparkPi Python job",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"spark_python_task": {
"python_file": "dbfs:/docs/pi.py",
"parameters": [
"10"
]
}
}
)
But I get an exceptionJordan W
07/13/2021, 4:54 PMDaniel Kim
07/14/2021, 12:22 AMDAGSTER_HOME
environment variable isn't explicitly mentioned in the "Getting Started" or "Tutorial" docs? I understand that there is a warning, but it is at run or execution time.Rob Meng
07/14/2021, 12:52 AMDonald Leonhard-MacDonald
07/14/2021, 8:30 AMmap()
on solids with dynamic outputs seems to collect all results first, only then apply any downstream solids (preventing infinite event streams) is this the expected behaviour?
We have a few concerns:
The time to instantiate a workflow for a single sample infernce could be quite slow. (can we keep workflows alive for multiple runs?)
The workflows are blocking so we need multiple workflows, (are celery worker the solution to this?)
Do you have any suggestions, ideas how we should be thinking about this?George Pearse
07/14/2021, 9:50 AMPeter B
07/15/2021, 2:50 AMszalai1
07/15/2021, 1:50 PMAdrian Rumpold
07/15/2021, 3:51 PM2021-07-15 17:46:47 - dagster - DEBUG - parallel_pipeline - f3b33113-c28d-47d5-a5dc-541b60912777 - 159937 - PIPELINE_START - Started execution of pipeline "parallel_pipeline".
2021-07-15 17:46:47 - dagster - ERROR - parallel_pipeline - f3b33113-c28d-47d5-a5dc-541b60912777 - 159937 - PIPELINE_FAILURE - Execution of pipeline "parallel_pipeline" failed. An exception was thrown during execution.
ModuleNotFoundError: No module named 'celery.exceptions'; 'celery' is not a package
Stack Trace:
File "/home/adriano/playground/dagster/dagster-demo/.venv/lib/python3.9/site-packages/dagster/core/execution/api.py", line 756, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/home/adriano/playground/dagster/dagster-demo/.venv/lib/python3.9/site-packages/dagster_celery/executor.py", line 157, in execute
from .core_execution_loop import core_celery_execution_loop
File "/home/adriano/playground/dagster/dagster-demo/.venv/lib/python3.9/site-packages/dagster_celery/core_execution_loop.py", line 4, in <module>
from celery.exceptions import TaskRevokedError
The dagster-celery
and celery
(I tried both v4.4.6 and the latest v5.1.2) packages are installed in the virtualenv (I can even set a breakpoint on the offending import statement and import the module in my debug console just fine; the site-packages
of the venv is on sys.path
). Do you have any ideaswhy this import might fail or how I might further debug the issue? Thanks a ton! DMit Dhami
07/15/2021, 4:47 PMMartim Passos
07/15/2021, 6:21 PMThere are incompatible versions in the resolved dependencies:
graphql-core<3,>=2.0 (from graphql-ws==0.3.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
graphql-core<3,>=2.1 (from dagster-graphql==0.12.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
graphql-core<3,>=2.1 (from flask-graphql==2.0.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
graphql-core<3,>=2.3 (from graphql-server-core==1.2.0->flask-graphql==2.0.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
graphql-core<3,>=2.3.2 (from gql==2.0.0->dagster-graphql==0.12.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
graphql-core<4,>=3.1.2 (from graphene==3.0b7->dagster-graphql==0.12.1->dagit==0.12.1->-r /tmp/pipenvvg9eykvbrequirements/pipenv-vug0hh0m-constraints.txt (line 13))
this happened when Flask published a breaking release about two months ago, could this be the reason again?szalai1
07/15/2021, 7:55 PMThis engine is only compatible with a CeleryK8sRunLauncher; configure the CeleryK8sRunLauncher on your instance to use it.
my mode def:
return ModeDefinition(
name="mode",
resource_defs=resources,
executor_defs=[celery_k8s_job_executor] if prod else default_executors,
)
what am I doing wrong ?Josh Lloyd
07/15/2021, 9:08 PMdagster.core.errors.DagsterInvalidInvocationError: Schedule decorated function has context argument, but no context argument was provided when invoking.
Here’s the code. I know that validate_run_config
is currently experimental. So maybe this is just a bug that needs some work.
@schedule(
pipeline_name="collective_pipeline",
cron_schedule="0 7 * * *",
execution_timezone="UTC",
mode='prod'
)
def coll_daily_schedule():
return {}
def test_coll_daily_schedule():
run_config = coll_daily_schedule()
assert validate_run_config(collective_pipeline, run_config)
Neha Bansal
07/15/2021, 10:01 PMMissing required config entry "solids" at the root. Sample config for missing entry: {'solids': {'fit_predict_inspect': {'inputs': {'use_dates': '<selector>'}}}}
Jessica Franks
07/16/2021, 9:26 AMdbt deps
in dagster?Suraj Narwade
07/16/2021, 10:01 AMGeorge Pearse
07/16/2021, 10:36 AM