rex
04/14/2021, 11:59 PMrex
04/15/2021, 12:02 AMSimon Späti
04/15/2021, 11:07 AMcompute_logs
properly:
# compute_logs:
# module: dagster_aws.s3.compute_log_manager
# class: S3ComputeLogManager
# config:
# bucket:
# env: AWS_STORAGE_BUCKET_NAME
# local_dir: "/tmp/cool"
# prefix: "compute_logs"
# endpoint_url:
# env: AWS_S3_ENDPOINT_URL
compute_logs:
module: dagster.core.storage.noop_compute_log_manager
class: NoOpComputeLogManager
Now they appear on the console, but not any longer in dagit (View Raw Step Output). Is there an easy way to have it on both console and dagit? 😅 Or do we need to write our own ComputeLogManager
? Any hint where to do that is much appreciated. cc: @alex. In the docs we missed exactly that part :).Marco
04/15/2021, 12:10 PMDavid Smit
04/15/2021, 1:09 PMYugal Sharma
04/15/2021, 1:37 PMYugal Sharma
04/15/2021, 1:37 PMStéphan Taljaard
04/15/2021, 1:43 PMEduardo Santizo
04/15/2021, 4:42 PMArun Kumar
04/15/2021, 7:56 PMconfigured
API based on the data in our DB. I have two questions
1. Could someone validate if my approach is correct?
2. I am not sure if this can be improved with Dynamic outputs. With Dynamic outputs, instead of looping the DB rows within the pipeline definition I am thinking if I can have a solid that can read the rows and yield them as Dynamic Output so that Dagster creates a downstream solid instance for each row. Also, can Dynamic Outputs be nested?
def make_event_source_pipeline(source_name):
pipeline_name = f"{source_name}_pipeline"
@pipeline(
name=pipeline_name,
mode_defs=[
ModeDefinition(
name="default",
resource_defs={"partition_config": partition_config},
),
]
)
def _event_source_pipeline():
source = compute_source_events()
# get all analysis and metrics for the source from DB
# analyses = get_analyses(source)
# metrics = get_metrics(analysis_name)
for analysis_name in analyses:
for metric_name in analyses[analysis_name]:
metric = metrics[metric_name]
primary_event_name = metric[0]["event"]
primary_aggregate = metric[0]["aggregate"]
primary_process = configured(join_aggregate_analysis_event,
name=f"join_{analysis_name}_{primary_event_name}")(
{"analysis_name": analysis_name, "event_name": primary_event_name, "aggregate": primary_aggregate})
secondary_event_name = metric[1]["event"]
secondary_aggregate = metric[1]["aggregate"]
secondary_process = configured(join_aggregate_analysis_event,
name=f"join_{analysis_name}_{secondary_event_name}")(
{"analysis_name": analysis_name, "event_name": secondary_event_name,
"aggregate": secondary_aggregate})
analysis_process = configured(analyze_metric,
name=f"analyze_{analysis_name}_{metric_name}")(
{"analysis_name": analysis_name, "metric_name": metric_name})
analysis_process(primary_process(source), secondary_process(source))
PartitionSetDefinition(
name=f"{source_name}_partition_set",
pipeline_name=pipeline_name,
partition_fn=partitions.date_partition_range(
start=datetime.datetime(2021, 1, 1),
delta_range="days",
inclusive=True,
fmt="%Y-%m-%d",
),
run_config_fn_for_partition=run_config_for_date_partition,
)
return _event_source_pipeline
Yugal Sharma
04/16/2021, 4:26 AMpaul.q
04/16/2021, 6:49 AMquery PipelineRun {
pipelineRunOrError(runId: "7b509160-1661-4a2a-bdf7-3f0b22e7fae5") {
__typename
... on PipelineRun {
runId
status
stepStats {
stepKey
status
}
}
}
}
Now, to get the error message, I see that the computeLogs member of PipelineRun requires a stepKey. Do I need a second call to get this? Something like:
query PipelineRun {
pipelineRunOrError(runId: "7b509160-1661-4a2a-bdf7-3f0b22e7fae5") {
__typename
... on PipelineRun {
computeLogs(stepKey: <step_that_failed>) {
stderr {
downloadUrl
}
}
}
}
}
Maybe I'm missing something? Doesn't seem possible to get what I want in one call.
Thanks
PaulSri Kadiyala
04/16/2021, 10:48 AMValue at path root:solids:dev_datasets:config for enum type dataset_prefix must be a string
. Attached are the screenshots of code and the value we are inputing. I tried changing the input by enclosing it with single/ double quotes. I am not sure what is missing here. ThanksAlexis
04/16/2021, 10:50 AMAlex Despotakis
04/16/2021, 1:49 PMalex@PF2C27VY:~/mvp$ dagster-daemon run
2021-04-15 16:10:27 - dagster-daemon - INFO - instance is configured with the following daemons: ['BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2021-04-15 16:10:28 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-04-15 16:10:28 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-15 16:10:29 - SensorDaemon - INFO - Checking for new runs for sensor: Remediation
2021-04-15 16:10:58 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-04-15 16:10:58 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-15 16:11:28 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-04-15 16:11:28 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-15 16:11:58 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-04-15 16:11:58 - BackfillDaemon - INFO - No backfill jobs requested.
2021-04-15 16:12:58 - dagster-daemon - ERROR - Thread for SENSOR did not shut down gracefully
Traceback (most recent call last):
File "/home/alex/miniconda3/envs/eip/bin/dagster-daemon", line 8, in <module>
sys.exit(main())
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/dagster/daemon/cli/__init__.py", line 126, in main
cli(obj={}) # pylint:disable=E1123
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/dagster/daemon/cli/__init__.py", line 39, in run_command
controller.check_daemon_loop()
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/dagster/daemon/controller.py", line 149, in check_daemon_loop
self.check_daemon_heartbeats()
File "/home/alex/miniconda3/envs/eip/lib/python3.8/site-packages/dagster/daemon/controller.py", line 130, in check_daemon_heartbeats
raise Exception(
Exception: Stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SENSOR']
(eip) alex@PF2C27VY:~/mvp$ INFO:/home/alex/mvp/client/sensors/remediation.py:results are: []
Steve Pletcher
04/16/2021, 4:13 PMInputDefinition
to use that functionality?Mark
04/17/2021, 2:30 PM--watch
CLI option but could not find that in the documentation. So if you could give me a hint how to develop and debug while having dagit open with always the latest .py
files loaded automatically that would be great.Mark
04/17/2021, 2:44 PMrepo.py
content after changing the file. I am not sure if the pipeline container is not updating after file changes or if the dagit container is not updating via grpc or both. Should watching files without having to restart/recreate containers usually work somehow in this scenario?Ronak Jain
04/19/2021, 5:23 AMWhen we are doing setupdagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "make_people":
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/execute_plan.py", line 190, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 313, in core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 71, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 608, in _user_event_sequence_for_step_compute_fn
gen,
File "/home//.local/lib/python3.6/site-packages/dagster/utils/__init__.py", line 364, in iterate_with_context
return
File "/usr/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/home//.local/lib/python3.6/site-packages/dagster/core/errors.py", line 199, in user_code_error_boundary
) from e
The above exception was caused by the following exception:
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM
File "/home//.local/lib/python3.6/site-packages/dagster/core/errors.py", line 187, in user_code_error_boundary
yield
File "/home//.local/lib/python3.6/site-packages/dagster/utils/__init__.py", line 362, in iterate_with_context
next_output = next(iterator)
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/compute.py", line 126, in execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "/home//.local/lib/python3.6/site-packages/dagster/core/execution/plan/compute.py", line 92, in _yield_compute_results
user_event_generator = compute_fn(SolidExecutionContext(compute_context), inputs)
File "/home//.local/lib/python3.6/site-packages/dagster/core/definitions/decorators/solid.py", line 304, in compute
result = fn(context, **kwargs)
File "repo.py", line 30, in make_people
spark = SparkSession.builder.getOrCreate()
File "/home//.local/lib/python3.6/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/home//.local/lib/python3.6/site-packages/pyspark/context.py", line 384, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/home//.local/lib/python3.6/site-packages/pyspark/context.py", line 147, in __init__
conf, jsc, profiler_cls)
File "/home//.local/lib/python3.6/site-packages/pyspark/context.py", line 224, in _do_init
self._encryption_enabled = self._jvm.PythonUtils.isEncryptionEnabled(self._jsc)
File "/home//.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1531, in __getattr__
"{0}.{1} does not exist in the JVM".format(self._fqn, name))
Ronak Jain
04/19/2021, 1:46 PMRonak Jain
04/19/2021, 1:48 PMSlackbot
04/19/2021, 1:48 PMBrian Abelson
04/19/2021, 5:10 PMDagsterApiServer] Pipeline execution process for 5899d526-300a-40c6-839d-b73bedbb3885 unexpectedly exited.
I suspect it might be a OOM error or something like that because its part of a large table copy (i run the same pipeline for other tables and those seem to be fine), but I have no way to debug this currently since the error message is opaque. Any advice here?Vlad Dumitrascu
04/19/2021, 10:41 PMMark
04/20/2021, 12:00 AMRubén Lopez Lozoya
04/20/2021, 10:19 AMmrdavidlaing
04/20/2021, 11:03 AMPandasColumn.datetime_column("at_date", non_nullable=True, is_required=True)
and pass it a dataframe with a the column of type datetime64[ns, UTC]
(from <http://pandas.to|pandas.to>_datetime(my_dates, utc=True)
) I get the following error:
Warning! Type check failed. Violated "ColumnDTypeInSetConstraint" for column "at_date" - Column dtype must be in the following set {'datetime64[ns]'}.. DTypes received: datetime64[ns, UTC]
Which is pretty much the exact opposite of the constraint I'm trying to place on the column 🙂Ming Fang
04/20/2021, 6:17 PMdagster-daemon run
resulted in this error
dagster-daemon run
Traceback (most recent call last):
File "/usr/local/bin/dagster-daemon", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.9/site-packages/dagster/daemon/cli/__init__.py", line 126, in main
cli(obj={}) # pylint:disable=E1123
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/dagster/daemon/cli/__init__.py", line 32, in run_command
raise Exception(
Exception: dagster-daemon can't run using an in-memory instance. Make sure the DAGSTER_HOME environment variable has been set correctly and that you have created a dagster.yaml file there.
Christian Lam
04/20/2021, 6:31 PMChristian Lam
04/20/2021, 6:31 PM| 1618942759883 | Serving on <http://0.0.0.0:80> in process 27 |
| 1618942759886 | Traceback (most recent call last): |
| 1618942759886 | File "/usr/local/bin/dagit", line 8, in <module> |
| 1618942759886 | sys.exit(main()) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 205, in main |
| 1618942759886 | cli(auto_envvar_prefix="DAGIT") # pylint:disable=E1120 |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__ |
| 1618942759886 | return self.main(*args, **kwargs) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main |
| 1618942759886 | rv = self.invoke(ctx) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke |
| 1618942759886 | return ctx.invoke(self.callback, **ctx.params) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke |
| 1618942759886 | return callback(*args, **kwargs) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 109, in ui |
| 1618942759886 | **kwargs, |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 128, in host_dagit_ui |
| 1618942759886 | host_dagit_ui_with_workspace(instance, workspace, host, port, path_prefix, port_lookup) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 137, in host_dagit_ui_with_workspace |
| 1618942759886 | start_server(instance, host, port, path_prefix, app, port_lookup) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 197, in start_server |
| 1618942759886 | raise os_error |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/dagit/cli.py", line 166, in start_server |
| 1618942759886 | server.serve_forever() |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/baseserver.py", line 398, in serve_forever |
| 1618942759886 | self.start() |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/baseserver.py", line 336, in start |
| 1618942759886 | self.init_socket() |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 1545, in init_socket |
| 1618942759886 | StreamServer.init_socket(self) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/server.py", line 180, in init_socket |
| 1618942759886 | self.socket = self.get_listener(self.address, self.backlog, self.family) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/server.py", line 192, in get_listener |
| 1618942759886 | return _tcp_listener(address, backlog=backlog, reuse_addr=cls.reuse_addr, family=family) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/server.py", line 288, in _tcp_listener |
| 1618942759886 | sock.bind(address) |
| 1618942759886 | File "/usr/local/lib/python3.7/site-packages/gevent/_socketcommon.py", line 563, in bind |
| 1618942759886 | return self._sock.bind(address) |
| 1618942759886 | PermissionError: [Errno 13] Permission denied: ('0.0.0.0', 80) |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------