Arun Kumar
08/25/2021, 6:47 PMqueuedRunCoordinator:
enabled: true
config:
max_concurrent_runs: 25
tag_concurrency_limits:
- key: "user-code-1"
limit: 15
- key: "user-code-1"
value: "usecase-1"
limit: 10
- key: "user-code-1"
value: "usecase-2"
limit: 5
- key: "user-code-2"
limit: 15
- key: "user-code-2"
value: "usecase-1"
limit: 10
- key: "databricks"
limit: 10
William Reed
08/25/2021, 6:57 PMdagster new-project
jinja templates?Carlos Sanoja
08/25/2021, 8:15 PMAttributeError: 'SlackResource' object has no attribute 'chat_postMessage'
Daniel Kim
08/26/2021, 2:13 AM@op(
config_schema={
"plant_code": str,
},
required_resource_keys={"values"},
)
def get_shipping_receiving_data(context) -> Nothing:
plant_code = context.op_config["plant_code"]
begin_date = context.resources.values["start_date_yyyymmdd"]
end_date = context.resources.values["end_date_yyyymmdd"]
...
But not sure how to go about it with the latest dagster API.
I'm thinking I need to do something along the lines of:
from dagster import build_op_context
def test_get_shipping_receiving_data():
context = build_op_context(
resources={
"values": {
"config": {
"start_date_yyyymmdd": "20210801",
"end_date_yyyymmdd": "20210803"
}
}
},
config={"plant_code": "some_plant_code"}
)
result = get_shipping_receiving_data(context)
Sandeep Aggarwal
08/26/2021, 3:55 AMexecute_in_process
to run the graph and looking for a way to extract out information (error message/exception) in case pipeline fails.mrdavidlaing
08/26/2021, 10:41 AMSaurav Mittal
08/26/2021, 12:26 PMbuild_reconstructable_pipeline
Use case: Create runtime pipelines (ie. set solid_defs and dependencies in PipelineDefinition on runtime) with solids processed using a multi-process executor. Have integrated with spark to process data.
I looked up the dagster docs and tried to code it up, but I am facing an errors with it. I have attached the relevant code snippet and the error in this thread.
For `build_reconstructable_pipeline`:
1. What should I do to resolve this error and use it for my use-case?
2. Could you please provide an example code for this.Matthias Queitsch
08/26/2021, 2:28 PMCarlos Sanoja
08/26/2021, 2:28 PMdef file_handle_to_s3(context, file_handle):
But and unable to find what a file_handle is or how to create this object instance from a CSV file. Could anyone help me please? @chris @maxSandeep Aggarwal
08/26/2021, 3:03 PMdagster.op
executed even if some of the input dependencies are not resolved?
I tried marking input as Optional
and is_required=False
on parent output.
Currently the step is getting skipped saying: Skipping step query_datapoints due to skipped dependencies: ['relative_to_absolute.split_requested_period_meta'].
Navneet Sajwan
08/26/2021, 5:15 PMXu Zhang
08/26/2021, 6:50 PMHan You
08/27/2021, 5:25 AMdansasbu
08/27/2021, 2:44 PMXiaosu
08/27/2021, 3:10 PMCarlos Sanoja
08/27/2021, 6:21 PMWilliam Reed
08/27/2021, 6:29 PMkiam
(the pods are annotated with the proper IAM role). Is there something going on with the way Dagster executes runs that is causing this? Hope this is interesting-- thanks.Hebo Yang
08/27/2021, 9:37 PMCameron Gallivan
08/27/2021, 10:11 PMbuild_solid_context
with direct invocation instead of execute_solid
but I’m running into some inconsistent behavior with configuring the logger.
With execute_solid
I was able to pass this with the config to suppress the DEBUG level being output to console:
{'loggers': {'console': {'config': {'log_level': 'INFO'}}}}
But when I directly invoke the solid with
context = build_solid_context(config={'loggers': {'console': {'config': {'log_level': 'INFO'}}}})
some_solid(context)
The DEBUG level is still being output to the console, is there anyway to suppress that?Cameron Gallivan
08/27/2021, 11:20 PMstart_after
defined in the input_defs but calling the solid with start_after=None
gives:
TypeError: cohort_inputs() got an unexpected keyword argument 'start_after'
But when the start_after arg isn’t included then:
dagster.core.errors.DagsterInvalidInvocationError: No value provided for required input "start_after".
Hebo Yang
08/27/2021, 11:39 PMXu Zhang
08/28/2021, 3:37 PMGreg
08/28/2021, 7:02 PMOperation name: JobMetadataQuery
Message: (psycopg2.OperationalError) server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
[SQL: SELECT event_logs.id, event_logs.event
FROM event_logs
WHERE event_logs.run_id = %(run_id_1)s ORDER BY event_logs.id ASC
LIMIT ALL OFFSET %(param_1)s]
[parameters: {'run_id_1': '2a23d5cb-681b-4cc8-b493-5d68b75a3687', 'param_1': 0}]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Path: ["pipelineRunsOrError","results",0,"assets"]
Locations: [{"line":60,"column":3}]
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/graphql/execution/executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File "/usr/local/lib/python3.7/site-packages/graphql/execution/executors/sync.py", line 16, in execute
return fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/schema/pipelines/pipeline.py", line 270, in resolve_assets
return get_assets_for_run_id(graphene_info, self.run_id)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/fetch_assets.py", line 59, in get_assets_for_run_id
records = graphene_info.context.instance.all_logs(run_id)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1013, in all_logs
return self._event_storage.get_logs_for_run(run_id, of_type=of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 234, in get_logs_for_run
events_by_id = self.get_logs_for_run_by_log_id(run_id, cursor, of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 201, in get_logs_for_run_by_log_id
results = conn.execute(query).fetchall()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1263, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
self, multiparams, params, execution_options
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1462, in _execute_clauseelement
cache_hit=cache_hit,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1815, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1996, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1772, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
When I try to run a pipeline it does start but does update the job status very slowly (sometimes hours after the end of it) if not at all. I guess it is somewhat related to the above error and postgresql. Any help would be much appreciated!Pradithya Aria Pura
08/30/2021, 7:24 AMFredrik Bengtsson
08/30/2021, 11:35 AMMatthias Queitsch
08/30/2021, 3:28 PMHebo Yang
08/30/2021, 6:42 PMsourabh upadhye
08/31/2021, 9:40 AMPaolo Deregibus
08/31/2021, 10:43 AMLucas Ávila
08/31/2021, 12:35 PM