Ivan Tsarev
11/29/2023, 1:23 PMDavi
11/29/2023, 4:12 PMException: Unexpected keys in model class V1Container: {'pod_spec_config'}
File "/usr/local/lib/python3.10/site-packages/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py", line 333, in _dequeue_run
instance.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/launcher.py", line 282, in launch_run
self._launch_k8s_job_with_args(job_name, args, run)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/launcher.py", line 204, in _launch_k8s_job_with_args
container_context = self.get_container_context_for_run(run)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/launcher.py", line 199, in get_container_context_for_run
return K8sContainerContext.create_for_run(dagster_run, self, include_run_tags=True)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/container_context.py", line 192, in create_for_run
user_defined_k8s_config = get_user_defined_k8s_config(dagster_run.tags)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/job.py", line 203, in get_user_defined_k8s_config
return UserDefinedDagsterK8sConfig(
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/job.py", line 105, in __new__
container_config = k8s_snake_case_dict(kubernetes.client.V1Container, container_config)
File "/usr/local/lib/python3.10/site-packages/dagster_k8s/models.py", line 116, in k8s_snake_case_dict
raise Exception(f"Unexpected keys in model class {model_class.__name__}: {invalid_keys}")
I am using Dagster with poetry with the following versions:
[tool.poetry.dependencies]
python = "^3.9"
dagster = "1.4.7"
dagster-webserver = "1.4.7"
dagster-graphql = "1.4.7"
dagster-postgres = "0.20.7"
dagster-k8s = "0.20.7"
dagster-msteams = "0.20.7"
dagster-gcp = "0.20.7"
And check here my tags configuration:
def get_k8s_config(cpu_min, cpu_max, memory_min, memory_max):
return {
"dagster/max_retries": 3,
"dagster-k8s/config": {
"container_config": {
"resources": {
"limits": {"cpu": cpu_max, "memory": memory_max},
"requests": {"cpu": cpu_min, "memory": memory_min}
}
},
"pod_spec_config": {
"nodeSelector": {...},
"tolerations": [
{
"effect": "NoSchedule",
"key": "workload-type",
"operator": "Equal",
"value": "batch"
}
]
}
}
}
@job(
resource_defs={
"msteams": msteams_resource.configured(
{"hook_url": os.getenv('TEAMS_WEBHOOK_URL')}
)
},
tags=get_k8s_config(
cpu_min="2500m",
cpu_max="5000m",
memory_min="30000Mi",
memory_max="30000Mi"
)
)
def job__process():
...
nickvazz
11/29/2023, 5:58 PMAlex Chisholm
11/29/2023, 7:34 PMAlex Chisholm
11/29/2023, 7:40 PMAnton Podviaznikov
11/29/2023, 7:45 PMNo module named 'dagster_postgres'
inside dagster-run-...-..
pod. Anyone know why is that and where do I need to add this module?
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dagster/_serdes/config_class.py", line 91, in rehydrate
module = importlib.import_module(self.module_name)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'dagster_postgres'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/dagster", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/dagster/_cli/__init__.py", line 48, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1157, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1078, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1434, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 783, in invoke
return __callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster/_cli/api.py", line 64, in execute_run_command
with get_instance_for_cli(instance_ref=args.instance_ref) as instance:
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/dagster/_cli/utils.py", line 104, in get_instance_for_cli
with DagsterInstance.from_ref(instance_ref) as instance:
File "/usr/local/lib/python3.8/site-packages/dagster/_core/instance/__init__.py", line 600, in from_ref
unified_storage = instance_ref.storage
File "/usr/local/lib/python3.8/site-packages/dagster/_core/instance/ref.py", line 496, in storage
return self.storage_data.rehydrate(as_type=DagsterStorage) if self.storage_data else None
File "/usr/local/lib/python3.8/site-packages/dagster/_serdes/config_class.py", line 120, in rehydrate
return klass.from_config_value(self, check.not_none(result.value))
File "/usr/local/lib/python3.8/site-packages/dagster/_core/storage/legacy_storage.py", line 122, in from_config_value
run_storage = ConfigurableClassData(
File "/usr/local/lib/python3.8/site-packages/dagster/_serdes/config_class.py", line 93, in rehydrate
check.failed(
File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1590, in failed
raise CheckError(f"Failure condition: {desc}")
dagster._check.CheckError: Failure condition: Couldn't import module dagster_postgres.run_storage when attempting to load the configurable class dagster_postgres.run_storage.PostgresRunStorage
nickvazz
11/29/2023, 8:12 PMDrew Broadley
11/30/2023, 12:49 AMMultiple asset definitions found
One or more of the selected assets are defined in multiple code locations. Rename these assets to avoid collisions and then try again.
zendesk_reports
zendesk_reports_comments_fields_taxonomy_retention@pipelines.i_zendesk_r_reports_comments_fields_o_sql_f_hourly
aggregated@pipelines.i_various_r_aggregated_o_sql_f_hourly
Anant Mishra
11/30/2023, 1:17 AMThomas
11/30/2023, 10:50 AMBen Jeffrey
11/30/2023, 2:22 PMHernan
11/30/2023, 3:26 PMlautaro kogan
11/30/2023, 3:46 PMif not os.path.isabs(path)
Jay Spiers
11/30/2023, 6:24 PMdagster._core.errors.DagsterUserCodeUnreachableError: User code server request timed out due to taking longer than 60 seconds to complete.
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_core\workspace\context.py", line 608, in _load_location
origin.reload_location(self.instance) if reload else origin.create_location()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_core\host_representation\origin.py", line 361, in reload_location
return GrpcServerCodeLocation(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_core\host_representation\code_location.py", line 629, in __init__
self._external_repositories_data = sync_get_streaming_external_repositories_data_grpc(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_api\snapshot_repository.py", line 25, in sync_get_streaming_external_repositories_data_grpc
external_repository_chunks = list(
^^^^^
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_grpc\client.py", line 346, in streaming_external_repository
for res in self._streaming_query(
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_grpc\client.py", line 184, in _streaming_query
self._raise_grpc_exception(
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_grpc\client.py", line 135, in _raise_grpc_exception
raise DagsterUserCodeUnreachableError(
The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:4266 {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2023-11-30T18:13:54.0310382+00:00"}"
>
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_grpc\client.py", line 180, in _streaming_query
yield from self._get_streaming_response(
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\dagster\_grpc\client.py", line 169, in _get_streaming_response
yield from getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\grpc\_channel.py", line 541, in __next__
return self._next()
^^^^^^^^^^^^
File "C:\ProgramData\miniconda3\envs\dagster\Lib\site-packages\grpc\_channel.py", line 967, in _next
raise self
reza razavipour
11/30/2023, 7:22 PMfrom dagster import AssetSelection, define_asset_job
selection = AssetSelection.keys("create_weekly_folder")
# selection = AssetSelection.keys("report_validator").upstream()
create_auction_root_folder = define_asset_job(
name="create_auction_root_folder",
selection=selection,
)
How do I guard against multiple runs of this job where I don't have access to context: AssetExecutionContextJordan Fox
11/30/2023, 8:20 PMR Lucas
11/30/2023, 10:05 PMTimeWindowPartitionMapping
with a LastPartitionMapping
My concern about this approach is to generate intermediate data only as source for the LastPartitionMapping
target asset.
I would like to know the opinion from Dagster experts which option would be best:
• PA A partition mapping class for such use-case is expected soon
• Using intermediate asset of these 2 existing PartitionMapping classes would be the best approach
• Implementing my own custom PartitionMapping classYuan Cheng
11/30/2023, 11:21 PMAdam
12/01/2023, 3:07 AMBolin Zhu
12/01/2023, 6:47 AMslack_on_run_failure_sensor
is defined for each of the code locations using dagster-slack
.
I seem to be getting duplicated alerts when a job from code-location-a failed. I received alerts from both slack_on_run_failure sensors (code-location-a and code-location-b). May I ask how can I eradicate this?Lukas Haberzettl
12/01/2023, 11:00 AMCasper Weiss Bang
12/01/2023, 12:57 PMclass DatabricksIOManager(ConfigurableIOManager):
client: UnityCatalogClient
Which i wanted to test by using the mock_resource
manager = DatabricksIOManager(client=ResourceDefinition.mock_resource())
however i get pydantic validation errors
Input should be a valid dictionary or instance of UnityCatalogClient [type=model_type, input_value=<dagster._core.definition...bject at 0x7f2b437d3250>, input_type=ResourceDefinition]
For further information visit <https://errors.pydantic.dev/2.5/v/model_type>
Bolin Zhu
12/01/2023, 2:20 PMGary McClure
12/01/2023, 2:22 PMJonathan Williams
12/01/2023, 5:52 PMJordan Fox
12/01/2023, 9:37 PMAmit Taller
12/01/2023, 9:55 PMrun_status_sensor
I'm getting an error (not on every run) on the command:
materializations_planned = context.instance.get_records_for_run(
run_id=context.dagster_run.run_id, of_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED
).records
Any ideas why?
Thanks!!Roy Ammeraal
12/01/2023, 10:37 PMtouch /tmp/helloworld
on a remote host.Zach
12/01/2023, 11:31 PMloggers:
console:
config:
log_level: WARNING
but it didn't produce any noticeable difference in the logs that get emitted to the UI. I've also looked at some of the docs here, but these seem to be focused on configuring logs globally across a deployment. I need to be able to change the log level for a specific job, and leave other jobs with the default. Any suggestions?Bowen Zhang
12/02/2023, 3:25 PMdagster.yaml
for different code locations (with different docker images) ?