Laura Moraes
04/27/2021, 10:08 PMdagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time
Paul Wyatt
04/28/2021, 12:04 AMjonathan
04/28/2021, 1:16 PMBennett Kanuka
04/28/2021, 2:22 PMLaura Moraes
04/28/2021, 2:45 PMSteve Pletcher
04/28/2021, 2:46 PMRyan Ernst
04/28/2021, 2:50 PMAgon Shabi
04/28/2021, 3:32 PM@pipeline
def my_pipeline():
features = ...
untrained_models = generate_models()
trained_models = untrained_models.map(
train_model # I want to also pass in "features" here
)
best_model = pick_best(trained_models.collect())
...
The generate_models
solid yields a variable number of untrained model instances, driven by config.
Is there another obvious way to do this that I'm missing, besides feeding the features
output through generate_models
?Jonas Mock
04/28/2021, 7:12 PMParamjit Singh
04/28/2021, 9:59 PMKirk Stennett
04/28/2021, 11:11 PMdagster.core.errors.DagsterImportError: Encountered ImportError: `cannot import name 'DOCKER_IMAGE_TAG' from 'dagster.core.storage.tags'
Any idea what this comes from? It appears to be around the import for from dagster_celery_k8s import celery_k8s_job_executor
Marco
04/29/2021, 9:39 AMCharles Lariviere
04/29/2021, 2:34 PM@daily_schedule
partitioned-based schedule? The idea being that I would like to refresh the last N partitions every day.
I saw the partition_days_offset
parameter, which sounds like it would result in the expected behaviour if I created N schedules and incremented the partition_days_offset
for each — but I’m curious if there’s a way to directly do this within a single schedule.Swadhin Swain
04/29/2021, 2:35 PMMehdi Nazari
04/29/2021, 3:50 PMKirk Stennett
04/29/2021, 4:06 PMdagster.core.errors.DagsterUnknownResourceError: Unknown resource `custom_resource`. Specify `custom_resource` as a required resource on the compute / config function that accessed it.
I have the resource decorated in it's own file with @resource
and it's loaded in the mode definition as:
resource_defs={ "custom_resource": custom_resource }
Am I missing something obvious with how the resource needs to be loaded?Dipit
04/29/2021, 4:35 PMSteve Pletcher
04/29/2021, 6:23 PMMarco
04/29/2021, 9:10 PMCharles Lariviere
04/29/2021, 9:16 PM0.11.0
to 0.11.6
today, while nothing changed in the code.
AttributeError: 'ConfigEnumValueSnap' object has no attribute 'config_value'
I have an Enum
field with `EnumValue`s in the config schema for one of my solids, and I get this as a PythonError
when I open the Playground tab without the config. This gets resolved when I do add a valid config for that field. It initially made me think something was wrong in the code and so it took quite a while before I realized it was simply because I was missing the config value in the Playground config.Arun Kumar
04/29/2021, 10:25 PMDipit
04/30/2021, 4:48 AMTiri Georgiou
04/30/2021, 1:16 PMDaniel Michaelis
04/30/2021, 1:35 PMaws sso login
which creates temporary credentials. I created a custom pyspark_s3_resource
which accesses these credentials via boto3.Session().get_credentials()
and adjusts the `pyspark_resource`'s hadoop config, so it can read from S3. However, I am unsure how I can access these temporary credentials from within the Dagster Pod on Kubernetes. I was suggested to mount the folder with the credentials into the Dagster user code Pod via hostPath but I'm unsure how to do that and if it's a valid solution. Any thoughts on that? (I'm only interested in a quick workaround solution for my local cluster as the AWS authentication will be solved differently in our Production cluster on EKS.)
2. Are there any best practices on how to run Spark jobs efficently with Dagster? A naive approach would be to save all intermediate results of each solid (especially Dataframes as parquet) on S3, however saving ALL intermediates and starting new Spark sessions in every solid effectively negates the advantages of Spark, i.e. lazy evaluation, caching, etc. This could be avoided by combining several solids into a monolith solid but this would contradict the single-responsibility principle (each step only does one thing). Is it possible to share a single Spark session in several consecutive solids within a pipeline, and e.g. pass the results from one solid to another via a custom IOManager that caches the results instead of saving them, or only passes them without doing anything?
3. As my pipeline will contain several steps that don't depend on one another, I would like to run solids in parallel as well. This means running independent Spark jobs in parallel. As I'm not a Spark expert, I don't know what's the best approach to do so, especially on Dagster and on Kubernetes. Is this something the celery-kubernetes executor can solve? (is it recommended to combine Spark and Celery at all)?
I know this is a lot at once but even partial help on any of these questions would be greatly appreciated as the entire framework is starting to get a bit overwhelming, based on the more and more complex infrastructure requirements from our core developer and DevOps team.Jeff Hulbert
04/30/2021, 2:14 PM2021-04-30 03:00:17 - SchedulerDaemon - INFO - Evaluating schedule `load_all_tables_schedule` at 2021-04-30 03:00:00+0000
2021-04-30 03:00:18 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:20 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-30 03:00:23 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:28 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:33 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:38 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:43 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:48 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:50 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-30 03:00:53 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:00:58 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:03 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:08 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:13 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:18 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:20 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-30 03:01:23 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-04-30 03:01:58 - dagster-daemon - ERROR - Thread for SCHEDULER did not shut down gracefully
self.check_daemon_heartbeats()
sys.exit(main())
Exception: Stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SCHEDULER']
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/controller.py", line 192, in check_daemon_loop
File "/usr/local/bin/dagster-daemon", line 8, in <module>
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/controller.py", line 175, in check_daemon_heartbeats
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/cli/__init__.py", line 39, in run_command
return ctx.invoke(self.callback, **ctx.params)
controller.check_daemon_loop()
failed_daemons=failed_daemons
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
cli(obj={}) # pylint:disable=E1123
return callback(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/cli/__init__.py", line 126, in main
Kirk Stennett
04/30/2021, 3:53 PMenvSecrets
section. The creds appear in the pod but not in the run pod that gets launched by celery. Is my best bet adding it as container config on the pipeline or am I missing something obvious?
I should add, I'd prefer not to use creds in a pipeline/config yaml if at all possible. I don't want other people who might be running the pipeline to see the credentials for appsThomas
04/30/2021, 4:12 PMEduardo Santizo
04/30/2021, 7:28 PM@pipeline(
mode_defs = [basic_mode]
)
def test_pipeline(context, input1, input2):
resources.basic_mode.run()
Paramjit Singh
04/30/2021, 8:58 PMAref
04/30/2021, 11:36 PMfrom __future__ import annotations
@solid
def dmt_verify_solid(context: Any, data_release_directory: str) -> Output:
...
I get:
dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidDefinitionError: Invalid type: dagster_type must be DagsterType, a python scalar, or a python type that has been marked usable as a dagster type via @usable_dagster_type or make_python_type_usable_as_dagster_type: got str.
What's the reason? It there a way to resolve it while keeping the annotations feature?