Devaraj Nadiger
08/23/2021, 5:01 PMVlad Dumitrascu
08/23/2021, 5:10 PMoutput_defs=[
OutputDefinition(name="result_dict", dagster_type=dict,
description="A dictionary of results."),
OutputDefinition(name="entry_list", dagster_type=list,
description='''A list of dictionaries with the details of the update.''')
]
..and then you Yield outputs at the end of the function:
yield Output( result_dict, output_name="result_dict")
yield Output( entry_list, output_name="entry_list")
...you can, obviously, name them whatever you want and they can be all the same types that an input can be.
In the yield Output()
, the first position is the data within your function. The second string is the named output, which you defined at the top.
Also, you can output a bunch of stuff and don't have to use it. I often make general purpose solids that output the same data in various ways, and formats, and then only connect the outputs I need for that pipeline. Makes them more re-usable.
For completeness:
You also have to import the Output type:
from dagster import Output
Xu Zhang
08/23/2021, 5:36 PMasyncio
solids to run inside a pipeline — the Dagster now can honor those solids, however when they got executed, they are running inside their own event-loop, which means the benefits of asyncio are not used.
I was advised to implement my own multi-threading executor months ago but i didn’t have a chance to work on it. just wanna check whether we have the support for asyncio solids now/or multi-threading executor in place now.Gabriel Milan
08/23/2021, 11:05 PMWilliam Reed
08/24/2021, 3:44 AMtags
in graphs in the config
..? With pipelines, it was simply the tags
kwarg, but now that isn’t recognized when using the @graph
decorator.George Pearse
08/24/2021, 8:05 AMdagster.check.CheckError: Failure condition: unexpected error in composition descent during plan building
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/grpc/impl.py", line 342, in get_external_execution_plan_snapshot
create_execution_plan(
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/core/execution/api.py", line 720, in create_execution_plan
return ExecutionPlan.build(
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 877, in build
return plan_builder.build()
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 169, in build
self._build_from_sorted_solids(
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 234, in _build_from_sorted_solids
step_input_source = get_step_input_source(
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/core/execution/plan/plan.py", line 496, in get_step_input_source
check.failed("unexpected error in composition descent during plan building")
File "/opt/dagster/pipelines/venv/lib/python3.9/site-packages/dagster/check/__init__.py", line 119, in failed
raise CheckError("Failure condition: {desc}".format(desc=desc))
Drew Sonne
08/24/2021, 8:16 AMEventMetadata.asset(...)
to link them, but it doesn't seem to work as the asset from previous solids isn't available for subsequent solid executions. Am I correct in saying the assets is only available for referencing once the pipeline execution has finished?
metadata={
"source_asset": EventMetadata.asset(AssetKey(["foo-bar","hello-world"])),
...
}
gives
dagster.core.errors.DagsterInvalidEventMetadata: Could not resolve the metadata value for "source_asset" to a known type. Consider wrapping the value with the appropriate EventMetadata type.
Arni Westh
08/24/2021, 1:02 PMdagster-dagit_1 | Traceback (most recent call last):
dagster-dagit_1 | File "/usr/local/bin/dagit", line 5, in <module>
dagster-dagit_1 | from dagit.cli import main
dagster-dagit_1 | File "/usr/local/lib/python3.9/site-packages/dagit/cli.py", line 18, in <module>
dagster-dagit_1 | from .app import create_app_from_workspace
dagster-dagit_1 | File "/usr/local/lib/python3.9/site-packages/dagit/app.py", line 28, in <module>
dagster-dagit_1 | from .subscription_server import DagsterSubscriptionServer
dagster-dagit_1 | File "/usr/local/lib/python3.9/site-packages/dagit/subscription_server.py", line 4, in <module>
dagster-dagit_1 | from graphql_ws.gevent import GeventSubscriptionServer, SubscriptionObserver
dagster-dagit_1 | ImportError: cannot import name 'SubscriptionObserver' from 'graphql_ws.gevent' (/usr/local/lib/python3.9/site-packages/graphql_ws/gevent.py)
I had this running until very recently and I can't really figure out what might cause this.George Pearse
08/24/2021, 1:38 PMFredrik Bengtsson
08/24/2021, 2:03 PMEmin AYAR
08/24/2021, 3:49 PMin_process_executor
since I don’t want parallelization within the composite solids (each step should wait for the previous one to finish).
I pass boolean or single integer from one composite solid to another. They save their calculations to a DB and the next steps read from the DB whenever they need some data.
My problem is that after finishing a composite solid, the process doesn’t clean up memory completely.
The process created to execute tasks has ~55% memory when starting to execute the second composite solid.
I tried to re-create the memory issue in a toy pipeline but the garbage collector works as expected and clears the memory before starting to execute the next step.
Any ideas what would cause this memory issue?
Is there a way to tell dagster dump the whole memory inside the executor process?William Reed
08/24/2021, 5:51 PMsolid_selection
with the new jobs abstraction?Xu Zhang
08/24/2021, 9:14 PMfrom dagster import graph, op
@op
def get_name():
return "world"
@op
def hello(context, name):
<http://context.log.info|context.log.info>(f"Hello, {name}!")
@graph
def hello_graph():
hello(get_name())
Xu Zhang
08/24/2021, 9:14 PMFile "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_82f3fc6dd18e594d9d7c5c7b64fe2277aebabcaa7b576c8e392997d01e339026/site-packages/dagster/core/workspace/autodiscovery.py", line 36, in loadable_targets_from_loaded_module
'No pipelines or repositories found in "{}".'.format(module.__name__)
dagster.core.errors.DagsterInvariantViolationError: No pipelines or repositories found in "meeseeksbackend.dagster.demo".
Xu Zhang
08/24/2021, 9:15 PMsolid
and pipeline
Xu Zhang
08/24/2021, 9:16 PMloadable_target_origin = LoadableTargetOrigin(
executable_path=sys.executable,
working_directory=config.WORKING_DIRECTORY,
module_name=config.PYTHON_MODULE,
python_file=config.PYTHON_FILE,
package_name=config.PYTHON_PACKAGE,
)
server = DagsterGrpcServer(
host=config.HOST,
port=config.PORT,
max_workers=config.MAX_WORKER,
loadable_target_origin=loadable_target_origin,
heartbeat=config.HEARTBEAT,
heartbeat_timeout=config.HEARTBEAT_TIMEOUT,
lazy_load_user_code=config.LAZY_LOAD_USER_CODE,
fixed_server_id=fixed_server_id,
)
server.serve()
Arturs Stramkals
08/24/2021, 9:58 PMOutput
? I need to apply slightly diverging logic to multiple dynamically generated sets, which means that out of A, B, C
I may have collected_A
missing on run #3, and collected_C
on run #6. I’ve tried using default_value
parameter of InputDefinition
on my branch merge solid, to no avail.Xu Zhang
08/25/2021, 12:24 AM@op
and @graph
, my GRPC server in staging box no longer works
OSError: [Errno 30] Read-only file system: '/export/content/lid/apps/dagster-web'
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/core/execution/plan/execute_step.py", line 326, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/core/execution/plan/execute_step.py", line 380, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/core/execution/plan/execute_step.py", line 490, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/core/storage/fs_io_manager.py", line 119, in handle_output
mkdir_p(os.path.dirname(filepath))
File "/export/content/lid/apps/meeseeks-backend-dagster-grpc-server/i001/libexec/meeseeks-backend_015263caabc6969a26157a73f5dcbb71f6e53ea18d1e4d986685ee6b374c9871/site-packages/dagster/utils/__init__.py", line 150, in mkdir_p
os.makedirs(path)
File "/export/apps/python/3.7/lib/python3.7/os.py", line 213, in makedirs
makedirs(head, exist_ok=exist_ok)
File "/export/apps/python/3.7/lib/python3.7/os.py", line 213, in makedirs
makedirs(head, exist_ok=exist_ok)
File "/export/apps/python/3.7/lib/python3.7/os.py", line 213, in makedirs
makedirs(head, exist_ok=exist_ok)
[Previous line repeated 4 more times]
File "/export/apps/python/3.7/lib/python3.7/os.py", line 223, in makedirs
mkdir(name, mode)
Xu Zhang
08/25/2021, 12:24 AM@solid
and @graph
work in the production box.Xu Zhang
08/25/2021, 12:25 AMfrom dagster import graph, op
@op
def get_name():
return "world"
@op
def hello(context, name):
<http://context.log.info|context.log.info>(f"Hello, {name}!")
@graph
def hello_graph():
hello(get_name())
Xu Zhang
08/25/2021, 12:26 AMfrom dagster import solid, pipeline
@solid
def get_name():
return "world"
@solid
def hello(context, name):
<http://context.log.info|context.log.info>(f"Hello, {name}!")
@pipeline
def hello_pipeline():
hello(get_name())
Xu Zhang
08/25/2021, 12:27 AMNoOpComputeLogManager
Abednego Santoso
08/25/2021, 5:27 AM@pipeline
def pipeline_a():
solid_b(solid_a())
solid_c(solid_a())
make solid_a runs twice?Sara
08/25/2021, 8:37 AMFrancois-DE
08/25/2021, 8:46 AMLevan
08/25/2021, 10:39 AMNothing
dependency as below:
@op(input_defs=[InputDefinition("start", Nothing)])
def install_deps(context, package: str = ""):
...
Now I'm trying to invoke it for testing as below:
def test_install_deps():
context = build_solid_context()
result = install_deps(context, package="test")
But it fails dagster.core.errors.DagsterInvalidInvocationError: No value provided for required input "start".
if I pass start
as
@op
def dummy_op():
return
install_deps(context, start=dummy_op(), package="test")
I get: TypeError: install_deps() got an unexpected keyword argument 'start'
How else should I pass the input start
?Fredrik Bengtsson
08/25/2021, 2:24 PM@graph
def g1():
x = op1()
y = op2(x)
@graph(input_defs=[InputDefiniton("start", Nothing)])
def g2():
op3()
@graph
def combined():
g1_res = g1()
g2(g1_res)
However, this resulted in a DagsterInvalidDefinitionError:
@graph 'g2' decorated function does not have parameter(s) 'start' which are in solid's input_defs. Solid functions should only have keyword arguments that mstche input names and, if system information is required, a first parameter named 'context'.
Is there some other way to do this, is it not supported or is it just a bug in the experimental API?
I'm currently running dagster 0.12.6Jonathan Almanza Camacho
08/25/2021, 3:15 PMBrian Seo
08/25/2021, 5:36 PMtagConcurrencyLimits
is there support for multiple rules on the same key
tagConcurrencyLimits:
-key: job_tag
limit: 10
-key: job_tag
value: extra_limited
limit: 5
Gabriel Milan
08/25/2021, 5:49 PM