Jack Yin
10/31/2022, 5:19 PMdagster job backfill
from console every day, but if that’s the only way to do it then i guess I can do thatRobert Wade
10/31/2022, 5:55 PMPablo Beltran
10/31/2022, 9:26 PMJames Hale
10/31/2022, 10:36 PMAnthony Reksoatmodjo
11/01/2022, 12:50 AMcontext.get_step_execution_context()
on a directly invoked op with build_op_context
, I get this error:
dagster.core.errors.DagsterInvalidPropertyError: The get_step_execution_context methods is not set on the context when a solid is directly invoked.
Is there a "directly-invoked" flag on the context
object?Anthony Reksoatmodjo
11/01/2022, 12:50 AMcontext
propertiesChris Roth
11/01/2022, 11:19 AMDynamicOutput
with
for val in ['cat', 'dog', 'panda']:
yield DynamicOutput(
complex_fn(val),
mapping_key=val,
)
Downstream, I'd like to same an output to s3 with the name f"output_{val}"
using the mapping_key. Is this possible? If not, what pattern should I be using to access val
later on?
What I'm trying to do in this workflow is (a) make pandas plots based on n
dataframes (this is the dynamic part) and then, (b) save each of those plots to a unique file on s3.
Alternatively, is there a way I could merge the plots back into a list to use with the original list used to break them apart?Scott Hood
11/01/2022, 12:41 PM2022-11-01 12:02:44 +0000 - dagster.daemon.SchedulerDaemon - INFO - Evaluating schedule `hive_metastore_sync_gl_schedule` at 2022-11-01 07:00:00 -0500
raise self
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
yield from response_stream
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 122, in _streaming_query
Stack Trace:
>
debug_error_string = "{"created":"@1667304153.030278088","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
details = "Deadline Exceeded"
status = StatusCode.DEADLINE_EXCEEDED
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
The above exception was caused by the following exception:
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 124, in _streaming_query
sensor_execution_args
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 305, in external_sensor_execution
cursor=cursor,
File "/usr/local/lib/python3.7/site-packages/dagster/_api/snapshot_sensor.py", line 61, in sync_get_external_sensor_execution_data_grpc
cursor,
File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 796, in get_external_sensor_execution_data
state.instigator_data.cursor if state.instigator_data else None,
File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 508, in _evaluate_sensor
sensor_debug_crash_flags,
File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 457, in _process_tick_generator
Stack Trace:
2022-11-01 12:02:41 +0000 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor slack_on_run_failure : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server
We are using the Helm Chart to deploy so curious as too how it could be losing connection.... I do see that DEADLINE_EXCEEDED message which I thought was due to a sensor going over the sensor alive time limit but not sure if that is still related..... Any help appreciated.Zachary Bluhm
11/01/2022, 2:25 PMdeps = _get_deps(dbt_nodes, selected_unique_ids, asset_resource_types=["model"])
Which would seem to only load modelsAlex Remedios
11/01/2022, 4:12 PMSelene Hines
11/01/2022, 4:54 PMJayme Edwards
11/01/2022, 6:16 PMJayme Edwards
11/01/2022, 6:18 PMDeo
11/01/2022, 7:15 PMjob
that has an asset
as input (currently the job
is defined using @graph
and .to_job
)
Is it possible? How?Ismael Rodrigues
11/01/2022, 8:16 PMPablo Beltran
11/01/2022, 8:59 PMnickvazz
11/02/2022, 12:48 AMfrom dagster import graph, asset
@graph
def trial_graph():
pass
@asset
def trial_graph_in_asset():
trial_graph()
dagster._core.errors.DagsterInvariantViolationError: Attempted to call graph 'trial_graph' outside of a composition function. Invoking graphs is only valid in a function decorated with @job or @graph.
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\execute_plan.py", line 225, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 360, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\execute_step.py", line 69, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\compute.py", line 174, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\compute.py", line 142, in _yield_compute_results
for event in iterate_with_context(
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_utils\__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\execution\plan\compute_generator.py", line 74, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "C:\tau_dev\apollo\apollo\flux\assets.py", line 86, in trial_graph_in_asset
trial_graph()
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\definitions\node_definition.py", line 199, in __call__
return PendingNodeInvocation(
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\definitions\composition.py", line 377, in __call__
assert_in_composition(node_name, self.node_def)
File "C:\ProgramData\Anaconda3\envs\dagster_env\lib\site-packages\dagster\_core\definitions\composition.py", line 125, in assert_in_composition
raise DagsterInvariantViolationError(
fahad
11/02/2022, 2:45 AM@op
def compute(context: OpExecutionContext,) -> str:
return "abcdef"
@op
def more_compute(
context: OpExecutionContext,
compute_id: str
) -> tuple[str, str]:
if context.run.previous_run_id:
return compute_id, "hijklm"
raise ValueError("try again")
@graph
def compute_graph():
return more_compute(compute())
compute_asset = AssetsDefinition.from_graph(compute_graph)
assets = with_resources(
load_assets_from_modules(...), # contains compute_asset
resource_defs={
"env": env,
"s3": s3_resource,
"io_manager": s3_io_manager, #PickledObjectS3IOManager
}
However, trying to replay the downstream more_compute
op after a failure results in a warning and an error:
WARNING
No previously stored outputs found for source StepOutputHandle(step_key='compute_graph.compute', output_name='result', mapping_key=None). This is either because you are using an IO Manager that does not depend on run ID, or because all the previous runs have skipped the output in conditional execution.
STEP_FAILURE
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
Is this op replay supported in graph-backed assets?Rainer Pichler
11/02/2022, 1:06 PMNihar Doshi
11/02/2022, 1:59 PMMykola Palamarchuk
11/02/2022, 2:50 PMinitial_data = initial_op()
data1 = processor1_op(initial_data)
persist1_op(data1)
data2 = processor2_op(initial_data)
persist2_op(data2)
I was expecting that the order of ops will remain the same as in job definition, but Dagster run them like this: initial_op -> processor1_op -> processor2_op -> persist1_op -> persist2_op
, which is kind of bad: results of processorX_op
will be stored in io_manager (in memory) till the very end of the job execution, but that memory could be released probably if the op execution order was preserved.
Is there any way to set some execution ordering strategies or priorities? I know there are "nothing" dependencies, but this is a bit different situation as there is no implicit dependency.Alex
11/02/2022, 3:45 PMYang
11/02/2022, 6:44 PM@graph
def my_graph(fiscal_year):
pillar = compute_pillar("esg", fiscal_year)
Gabriel Fioravante
11/02/2022, 8:41 PMStack Trace:
File "/home/aydev/.local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 299, in dagster_event_sequence_for_step
raise dagster_user_error.user_exception
File "/home/aydev/.local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/home/aydev/.local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "/home/aydev/.local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 557, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/home/aydev/.local/lib/python3.10/site-packages/dagster/_core/storage/fs_io_manager.py", line 176, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
File "stringsource", line 2, in _catboost._PoolBase.__reduce_cython__
Hello all, getting above issue on py31
with dagster
latest (1.0.15
). Seems like an issue with pickling C code, not sure how to about that. Any tips? :]CJ
11/02/2022, 8:57 PM#############
# Sample:
#############
@resource
@contextmanager
def db_connection():
try:
conn = psycopg2.connect()
yield conn
finally:
conn.close()
@op(required_resource_keys={'db_connection'}
def my_op(context):
conn = context.resources.db_connection
data = pd.read_sql("select * from foo", conn)
data2 = pd.read_sql("select * from bar", conn)
…
@op(required_resource_keys={'db_connection'}
def another_op(context):
conn = context.resources.db_connection
data = pd.read_sql("select * from hello_world", conn)
…
Questions:
• my_op: Will the conn be created once and used twice?
• my_op: Will the conn be created once, used once, and then fail on data2?
• another_op: I’d expect to be able to reuse the resource, but it’s almost like the connection is being closed after my_op is run and can’t be reused.
• General: Could the connection open and close on init? Or will the context manager only hit the finally once it's been used?Qwame
11/02/2022, 9:02 PM{
"env": "ID_NUMBER"
}
However, it is returned as a string. Is there a way to read this as an integer? My env file has this
export ID_NUMBER=5
Akira Renbokoji
11/02/2022, 10:19 PMload_from:
- grpc_server:
host: <http://my_service_discovery.name.space|my_service_discovery.name.space>
port: 4266
location_name: "grpc_server"
Is the above acceptable or do I need to grab the IP address from the Discovery Service then feed it into "workspace.yaml"?
Edit:
Turns out you can just use the namespace as host.
For anyone struggling to access your Dagster site and reload the repo, you have to add your gRPC port and the HTTP port (80) to the security group associated with your Dagster setup.
AWS has a Route Analyzer that you can use to see if the daemon can reach the gRPC server. That helped me the most.kyle
11/02/2022, 10:34 PMAlexander Whillas
11/02/2022, 10:47 PMPrag
11/02/2022, 10:58 PMImportError: dlopen [...] lib/python3.10/site-packages/grpc/_cython/cygrpc.cpython-310-darwin.so' (mach-o file, but is an incompatible architecture (have (x86_64), need (arm64e)))