Daniel Galea
11/10/2022, 1:24 PMDeo
11/10/2022, 4:53 PMShondace
11/10/2022, 6:54 PMSensor daemon caught an error for sensor git_comment_sensor : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server
Stack Trace:
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 476, in _process_tick_generator
yield from _evaluate_sensor(
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 540, in _evaluate_sensor
sensor_runtime_data = repo_location.get_external_sensor_execution_data(
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_core/host_representation/repository_location.py", line 808, in get_external_sensor_execution_data
return sync_get_external_sensor_execution_data_grpc(
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_api/snapshot_sensor.py", line 54, in sync_get_external_sensor_execution_data_grpc
api_client.external_sensor_execution(
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_grpc/client.py", line327, in external_sensor_execution
chunks = list(
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_grpc/client.py", line124, in _streaming_query
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1667927058.507662760","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
>
Stack Trace:
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/dagster/_grpc/client.py", line122, in _streaming_query
yield from response_stream
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/grpc/_channel.py", line 426, in __next__
return self._next()
File "/home/u00ubc1kg5n2YAppSj357/miniconda3/envs/rapids-22.10/lib/python3.9/site-packages/grpc/_channel.py", line 826, in _next
raise self
Chris Roth
11/10/2022, 8:18 PMDusty Shapiro
11/10/2022, 8:46 PM@op
def upload_docs_to_s3():
foo
@job
def dbt_cli_docs():
dbt_docs_generate_op()
upload_docs_to_s3()
In this case, both ops try to run at the same time.
In any case, is there a simple way to set relationship of ops in general? Like:
op1 >> op2 >> op3
Ryan Riopelle
11/10/2022, 9:27 PMReid Beels
11/10/2022, 9:28 PMk8s_job_op
runs should be available in dagit after a job has completed. I’ve never seen this happen in practice with any of my k8s_job_op
usage and I always get the blank loading screen. Is there any specific configuration that’s required to make this work?Crawford Collins
11/10/2022, 9:36 PMroot:config:base_dir
value.
dagster._core.errors.DagsterInvalidConfigError: Error in config for resource io_manager
Error 1: Value at path root:config:base_dir must not be None. Expected "(String | { env: String })"
File "/Users/crawford.collins/projects/hello-flow/.venv/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 412, in get_external_execution_plan_snapshot
create_execution_plan(
File "/Users/crawford.collins/projects/hello-flow/.venv/lib/python3.9/site-packages/dagster/_core/execution/api.py", line 1058, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/Users/crawford.collins/projects/hello-flow/.venv/lib/python3.9/site-packages/dagster/_core/system_config/objects.py", line 209, in build
config_mapped_resource_configs = config_map_resources(resource_defs, resource_configs)
File "/Users/crawford.collins/projects/hello-flow/.venv/lib/python3.9/site-packages/dagster/_core/system_config/objects.py", line 286, in config_map_resources
raise DagsterInvalidConfigError(
because the error message cuts off, I can’t figure out where my error is coming from or even begin to fix itPeter Skipper
11/10/2022, 11:31 PMreexecute_pipeline
from dagster 1.0.15, which makes me think there is now another way?Ion Scerbatiuc
11/10/2022, 11:43 PMdagster + dbt
. Following the documentation I'm a bit confused about how to configure the dbt
command. I managed to wire up a dbt_cli_resource
which reads from the configuration file (which is different by environment). But then I need to call load_assets_from_dbt_project
to get the asset for my job, which happens at import time (and not runtime). How do I access the project-dir
and profiles-dir
settings from the dbt
resource at this stage? I'm trying to avoid hardcoding those paths in two places and would prefer to only have them in the config file under the dbt
resourceJohn Sears
11/11/2022, 12:44 AMyuhan
11/11/2022, 12:57 AMEric
11/11/2022, 2:19 AMColo Carlos
11/11/2022, 7:16 AMfs_file_cache
from dagster.core.storage.file_cache
, all I can see is that it was removed in 0.15Averell
11/11/2022, 9:35 AMYou can now launch a run that targets a range of asset partitions, by supplying the "dagster/asset_partition_range_start" and "dagster/asset_partition_range_end" tags.
Zachary Bluhm
11/11/2022, 6:36 PMAirton Neto
11/11/2022, 8:23 PMIsmael Rodrigues
11/11/2022, 8:46 PMnickvazz
11/11/2022, 9:07 PMdagster.dynamic_partitioned_config
with dagster.ConfigMapping
?
https://docs.dagster.io/_apidocs/partitions#dagster.dynamic_partitioned_config
https://docs.dagster.io/_apidocs/config#dagster.ConfigMappingnickvazz
11/12/2022, 1:40 AMCharles
11/12/2022, 11:05 AMgeoHeil
11/12/2022, 1:25 PMpython
@asset
def upstream_nonpartitioned_1():
return 1
@asset
def upstream_nonpartitioned_2():
return 1
@asset
def merged_nonpartitioned(upstream_nonpartitioned_1, upstream_nonpartitioned_2):
return upstream_nonpartitioned_1 + upstream_nonpartitioned_2
merged_nonpartitioned_job=define_asset_job("non_partitioned", selection="ea/merged_nonpartitioned")
@multi_asset_sensor(
asset_keys=[AssetKey("ea/upstream_nonpartitioned_1"), AssetKey("ea/upstream_nonpartitioned_2")],
job=merged_nonpartitioned_job,
default_status=DefaultSensorStatus.RUNNING,
)
def asset_a_and_b_sensor_with_skip_reason(context):
asset_events = context.latest_materialization_records_by_key()
print('***')
print(asset_events)
print(asset_events.values())
print('***')
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
elif any(asset_events.values()):
materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if value
]
not_materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if not value
]
return SkipReason(
f"Observed materializations for {materialized_asset_key_strs}, "
f"but not for {not_materialized_asset_key_strs}"
)
else:
return SkipReason(f"NO materializations for {asset_events}, ")
assets_list = load_assets_from_package_module(event_assets, group_name="event_assets", key_prefix="ea")
@repository
def assets_modern_data_stack():
return[assets_list,
asset_a_and_b_sensor_with_skip_reason,
]
Always no asset materializations are logged form the dagster daemon logs:
{AssetKey(['ea/upstream_nonpartitioned_1']): None, AssetKey(['ea/upstream_nonpartitioned_2']): None}
dict_values([None, None])
Even after manually materializing each of the upstream assets, the materialization events are not picked up.Udo Schochtert
11/13/2022, 10:37 AMSireesha Kuchimanchi
11/13/2022, 1:16 PMSireesha Kuchimanchi
11/13/2022, 1:17 PMClovis Warlop
11/13/2022, 6:50 PMdagster._check.CheckError: Invariant failed. Description: All leaf nodes within graph 'branch_fetch_waterfall' must generate outputs which are mapped to outputs of the graph, and produce assets. The following leaf node(s) are non-asset producing ops: {'google_bigquery_upload_json_7', 'google_bigquery_upload_json_9', 'google_bigquery_upload_json_3', 'google_bigquery_upload_json_8', 'google_bigquery_upload_json_4', 'google_bigquery_upload_json_10', 'google_bigquery_upload_json_2', 'google_bigquery_upload_json_5', 'google_bigquery_upload_json_6', 'google_bigquery_upload_json'}. This behavior is not currently supported because these ops are not required for the creation of the associated asset(s).
Rama
11/13/2022, 8:47 PMunloadable schedules
, I didn't see any problems with the code (repository or schedule) also another jobs in same repository worked fine. Any idea?Casper Weiss Bang
11/14/2022, 7:15 AMRoman Maliushkin
11/14/2022, 9:28 AM@run_status_sensor
. There is parameter request_jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None
. Is it possible to pass two (or more) same jobs with different config? How should I config it?
Config for one job is pretty simple:
"resources": {
"first_resource": {
"config": {
"nice_param": "a"
}
}
},
But how can I define second one?Casper Weiss Bang
11/14/2022, 10:12 AM