Qwame
12/22/2021, 10:20 PMdagster.core.executor.child_process_executor.ChildProcessCrashException
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 163, in execute
event_or_none = next(step_iter)
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 268, in execute_step_out_of_process
for ret in execute_child_process_command(command):
File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\child_process_executor.py", line 157, in execute_child_process_command
raise ChildProcessCrashException(exit_code=process.exitcode)
I don't quite understand what happened. Any help?Rasheed Elsaleh
12/22/2021, 11:00 PMpaul.q
12/23/2021, 12:28 AMPartitionSetDefinition
, create_schedule_definition
and pipeline
. We used to have a PartitionSetDefinition
with a partition_fn
that created a list of Partition
for a date range, but only Mon-Fri. We were able to use this to create different PartitionSet
objects for different pipelines as well as create a schedule using create_schedule_definition
on a PartitionSet
object.
Now, I have a Job
for my Graph
and its config is coming from a daily_partitioned_config
- which doesn't exclude weekends as I would like. Should I be messing with dynamic_partition_config
to achieve this? I also use build_schedule_from_partitioned_job
to achieve having a schedule for the job. I was able to get the partitions looking the way I wanted using dynamic_partition_config
, but build_schedule_from_partitioned_job
returned this error, where get_effective_dates1
returns a set of date strings of the form `%Y-%m-%d`:
dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) with type <class 'dagster.core.definitions.partition.DynamicPartitionsDefinition'>.
Thanks
PaulBryan Chavez
12/23/2021, 1:45 AMsample_graph = sample_graph.to_job(
executor_def=in_process_executor,
resource_defs=resource_defs_map,
config=graph_configs,
)
Nitin Madhavan
12/23/2021, 4:59 AMRahul Sharma
12/23/2021, 9:06 AMFlorian Giroud
12/23/2021, 10:54 AMJazzy
12/23/2021, 5:59 PMCONTINENTS = ["Africa", "Antarctica", "Asia", "Europe", "North America", "Oceania", "South America"]
@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
return {"config": {"continent_name": partition_key}}
@op(config_schema={"continent_name": str})
def continent_op(context):
<http://context.log.info|context.log.info>(context.op_config["continent_name"])
normal_config = {"ops": {
"validate_date_input_variables": {"config": {"date_begin": '2021-12-01', "date_end": '2021-12-23'}},
"set_order_type": {"config": {"order_type": 'NORMAL'}},
"continent_op": continent_config
}
}
For which I get the following error:
Error 1: Value at path root:continent_op must be dict. Expected: "{ config: { continent_name: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }".
Is there a different way that I should be doing this? I think it might relate to a previous thread which led me to multi-dimension partitions...
The end goal is to have many different jobs that run hourly for a few different combinations of parametersQuy
12/23/2021, 6:36 PMdagster.core.errors.PartitionExecutionError
not sure that I have a proper config due to error which doesn’t give more details on this. would anyone can help me?
dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: daily_download_config() takes 1 positional argument but 2 were given
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
yield
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
run_config = partition_set_def.run_config_for_partition(partition)
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
run_config_for_partition_fn=lambda partition: fn(
https://stackoverflow.com/questions/70465752/what-is-proper-partition-configs-for-dagster-jobschrockn
12/23/2021, 11:38 PMChristian Bay
12/24/2021, 6:47 AMMartin Carlsson
12/24/2021, 9:36 AMGeorge Pearse
12/24/2021, 10:32 AM张强
12/25/2021, 8:08 AMgeoHeil
12/25/2021, 9:11 AM@asset(out=Out(MyCustomTypedDataFrame))
how can I specify a custom type definition not only for op outputs but also for an asset?Jahid Hasan
12/26/2021, 2:53 AMOr Asher
12/26/2021, 12:14 PMrhl
12/26/2021, 2:36 PMManny Schneck
12/26/2021, 11:10 PMdaniel blinick
12/27/2021, 10:21 AMbuild_op_context
to create the context for the op and the created context has a run_id
set to "ephemeral". the problem is that there is a resource that im trying to use in the test that also makes use of the run_id
, but the run_id
in the resource context is None
. seems to be caused by the fact that it tries to grab the run_id
from the pipeline run object, which, in this case, does not exist. is there a way this can be fixed so that the run_id 'ephemeral' is perpetuated throughout, including the resources?
thanks!geoHeil
12/27/2021, 12:09 PMBryan Chavez
12/27/2021, 2:19 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/usr/local/lib/python3.8/site-packages/dagster/__main__.py", line 3, in <module>
main()
File "/usr/local/lib/python3.8/site-packages/dagster/cli/__init__.py", line 50, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster/cli/api.py", line 48, in execute_run_command
DagsterInstance.from_ref(args.instance_ref)
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 423, in from_ref
run_launcher=instance_ref.run_launcher,
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 264, in run_launcher
return self.run_launcher_data.rehydrate() if self.run_launcher_data else None
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 56, in rehydrate
module = importlib.import_module(self.module_name)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 848, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.8/site-packages/dagster_docker/__init__.py", line 4, in <module>
from .docker_run_launcher import DockerRunLauncher
File "/usr/local/lib/python3.8/site-packages/dagster_docker/docker_run_launcher.py", line 5, in <module>
from dagster.core.launcher.base import (
ImportError: cannot import name 'ResumeRunContext' from 'dagster.core.launcher.base' (/usr/local/lib/python3.8/site-packages/dagster/core/launcher/base.py)
run_launcher:
# module: dagster.core.launcher
# class: DefaultRunLauncher
module: dagster_docker
class: DockerRunLauncher
config:
env_vars:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
network: docker_khde_network
container_kwargs:
volumes:
- repository.py:/opt/dagster/app/kh_dagster/
Daniel Suissa
12/27/2021, 2:25 PMpydevd_pycharm.settrace
Edit: for some reason settrace jams the job / graph execution, but when I put it inside an op, it works fine and I'm able to attach the code to the pycharm debugger. It's a bit annoying that I have to declare the debugger in every op (although I can wrap a decorator around @op). I guess the problem arises when the code provider parses the python code to create the dependency wiring...
If anyone knows a way to run settrace once for the whole job I'd appreciate a quick description of what you did..Qumber Ali
12/27/2021, 2:34 PM张强
12/28/2021, 5:41 AMCezary Pukownik
12/28/2021, 8:35 AMdbt_resource
in job config, eg. in dagit?
In documentation I've seen only in code configuration using dbt_cli_resource.configured
method. But this requires to set some configs inside the code.
I want to set project_dir
and dbt_executable
outside of code, to have an option to switch between dev and prod envs without chaning code.Harris Hoke
12/28/2021, 2:03 PM--import-mode=importlib
, but nothing has worked so far:
(dagster37) ➜ dagster git:(master) ✗ pwd
/Users/harris.hoke/personal_projects/dagster/dagster
(dagster37) ➜ dagster git:(master) ✗ git pull
Already up to date.
(dagster37) ➜ dagster git:(master) ✗ python3 --version
Python 3.7.4
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:14: in <module>
from dagster_test.dagster_core_docker_buildkite import (
E ModuleNotFoundError: No module named 'dagster_test'
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:10: in <module>
from dagster.core.errors import DagsterUserCodeUnreachableError
E ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
(dagster37) ➜ dagster git:(master) ✗ ls python_modules/dagster/dagster_tests
__init__.py cli_tests daemon_tests execution_tests workspace.yaml
__pycache__ conftest.py docker-compose.yml general_tests
api_tests core_tests environments scheduler_tests
(dagster37) ➜ dagster git:(master) ✗ rm python_modules/dagster/dagster_tests/__init__.py
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:10: in <module>
from dagster.core.errors import DagsterUserCodeUnreachableError
E ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:10: in <module>
from dagster.core.errors import DagsterUserCodeUnreachableError
E ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
(dagster37) ➜ dagster git:(master) ✗ pip uninstall dagster
[...]
Successfully uninstalled dagster-0.13.12
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:9: in <module>
from dagster import check, seven
E ModuleNotFoundError: No module named 'dagster'
(dagster37) ➜ dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
python_modules/dagster/dagster_tests/conftest.py:9: in <module>
from dagster import check, seven
E ModuleNotFoundError: No module named 'dagster'
Chris Retford
12/28/2021, 9:31 PM2021-12-28 14:30:51 - BackfillDaemon - ERROR - Backfill failed for vrixduce: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating responses: Object of type type is not JSON serializable"
debug_error_string = "{"created":"@1640727051.106427000","description":"Error received from peer unix:/var/folders/95/hh0q96hs3dddfgp8ty7x8fvd4p202x/T/tmpovml57vv","file":"src/core/lib/surface/call.cc","file_line":1075,"grpc_message":"Exception iterating responses: Object of type type is not JSON serializable","grpc_status":2}"
>
Stack Trace:
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/daemon/backfill.py", line 91, in execute_backfill_iteration
for _run_id in submit_backfill_runs(
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/core/execution/backfill.py", line 147, in submit_backfill_runs
result = repo_location.get_external_partition_set_execution_param_data(
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/core/host_representation/repository_location.py", line 742, in get_external_partition_set_execution_param_data
return sync_get_external_partition_set_execution_param_data_grpc(
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/api/snapshot_partition.py", line 110, in sync_get_external_partition_set_execution_param_data_grpc
api_client.external_partition_set_execution_params(
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/grpc/client.py", line 206, in external_partition_set_execution_params
chunks = list(
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/grpc/client.py", line 118, in _streaming_query
yield from response_stream
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
return self._next()
File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
raise self
Manny Schneck
12/28/2021, 9:42 PMpython_modules/dagster/dagster_tests/general_tests/grpc_tests
and they think it might be dns, you can try export GRPC_DNS_RESOLVER=native; pytest ....
, and if that fixes it, then you know that it's DNS.Arun Kumar
12/29/2021, 12:48 AMLaunchPipelineExecution
graphQL API from our Kotlin backend using a Kotlin client and I am passing the runConfigData
as a valid JSON string. Seeing the following error on dagster 0.12.12. Any thoughts on how I can fix this?
PythonError(message=dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got '{"ops": {"load_analysis": {"config": {"analysis_id": b3e61740-ed8d-482b-943d-8ce354b11632}}}}' which is type <class 'str'>.
Arun Kumar
12/29/2021, 12:48 AMLaunchPipelineExecution
graphQL API from our Kotlin backend using a Kotlin client and I am passing the runConfigData
as a valid JSON string. Seeing the following error on dagster 0.12.12. Any thoughts on how I can fix this?
PythonError(message=dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got '{"ops": {"load_analysis": {"config": {"analysis_id": b3e61740-ed8d-482b-943d-8ce354b11632}}}}' which is type <class 'str'>.
daniel
12/29/2021, 2:04 AMArun Kumar
12/29/2021, 2:18 AMdaniel
12/29/2021, 2:31 AM>>> my_dict = {"foo": "bar"}
>>> json.dumps(my_dict)
'{"foo": "bar"}'
>>> json.dumps(json.dumps(my_dict))
'"{\\"foo\\": \\"bar\\"}"
If you were passing the second string in (instead of the first) - the server would json decode it once, but instead of being a dict as expected, it would now be a (still JSON-encoded) stringArun Kumar
12/29/2021, 3:37 AM{
"variables": {
"params": {
"mode": "default",
"runConfigData": "{\"ops\": {\"load_analysis\": {\"config\": {\"analysis_id\": \"b3e61740-ed8d-482b-943d-8ce354b11632\"}}}}",
"selector": {
"pipelineName": "analyses_exposures_loader",
"repositoryLocationName": "metrics-repo",
"repositoryName": "metrics-repo"
}
}
},
.
>>> json.loads("{\"ops\": {\"load_analysis\": {\"config\": {\"analysis_id\": \"b3e61740-ed8d-482b-943d-8ce354b11632\"}}}}")
{'ops': {'load_analysis_exposures': {'config': {'analysis_id': 'b3e61740-ed8d-482b-943d-8ce354b11632'}}}}
>>>
Keshav
12/29/2021, 4:27 AMdaniel
12/29/2021, 4:33 AMArun Kumar
12/29/2021, 4:59 AMRunConfigData
is typed as a scalar
in graphQL, the client on our side assumes it to be a String by default. Even if the double serialization is not happening, I assume the run config will be sent as a string to the server and not sure if I understand where the str to dict conversion will happen. Am I missing something?Keshav
12/29/2021, 5:58 AMclient = DagsterGraphQLClient("url", port)
query_string = YOUR_QUERY_STRING
variable_dict = YOUR_VARIABLES_DICT
response = client._execute(query_string,variables=variable_dict)
If you are using something similar to requests then
query_string = YOUR_QUERY_STRING_INCLUDING_YOUR_VARIABLES
response = <http://requests.post|requests.post>("url", json={"query", query_string})
daniel
12/29/2021, 6:01 AMArun Kumar
12/29/2021, 9:39 AMdaniel
12/30/2021, 3:43 AMArun Kumar
12/30/2021, 4:10 AMgraphql-java-extended-scalars
as it seems like it requires change in the graphQL schema itself (correct me if I am wrong). Even with that I might still have to create objects on my side to set to the run config scalar which would still require similar code.daniel
01/03/2022, 11:19 PM