Gustavo Carvalho
06/06/2023, 4:29 PMdagster._check.CheckError: Invariant failed. Description: Parent pipeline snapshot id out of sync with passed parent pipeline snapshot
?
How can I overcome it?alex
06/06/2023, 5:14 PMGustavo Carvalho
06/06/2023, 5:16 PMGustavo Carvalho
06/06/2023, 5:16 PMGustavo Carvalho
06/06/2023, 5:18 PM@asset(
name="Way2_ETL",
description=description or "Way2 ETL",
partitions_def=partitions_def,
compute_kind="Way2 API",
key_prefix=key_prefix,
required_resource_keys={
way2_api_connector_key,
sqlalchemy_engine_key,
},
# metadata={
# "scada_id": scada_id,
# "pontoId_mapping": pontoId_mapping,
# "grandezas_mapping": grandezas_mapping,
# "resource_key_mapping": {
# "way2_api_connector_key": way2_api_connector_key,
# "sqlalchemy_engine_key": sqlalchemy_engine_key,
# },
# },
)
def way2_etl(context: OpExecutionContext):
...
It works with metadata
commented out, but stops working when I uncomment italex
06/06/2023, 5:18 PMGustavo Carvalho
06/06/2023, 5:19 PMalex
06/06/2023, 5:19 PMGustavo Carvalho
06/06/2023, 5:19 PMGustavo Carvalho
06/06/2023, 5:20 PMGustavo Carvalho
06/06/2023, 5:22 PMalex
06/06/2023, 5:22 PMdagster dev
or manually running dagit and the daemon? You could try restarting the daemonGustavo Carvalho
06/06/2023, 5:22 PMalex
06/06/2023, 5:23 PMGustavo Carvalho
06/06/2023, 5:23 PMGustavo Carvalho
06/06/2023, 5:23 PMGustavo Carvalho
06/06/2023, 5:24 PMGustavo Carvalho
06/06/2023, 5:24 PMdict[int, int]
to my metadataGustavo Carvalho
06/06/2023, 5:25 PMGustavo Carvalho
06/06/2023, 5:27 PMdagster._check.CheckError: Invariant failed. Description: Parent pipeline snapshot id out of sync with passed parent pipeline snapshot
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/utils.py", line 126, in _fn
return fn(*args, **kwargs)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/utils.py", line 57, in _fn
result = fn(self, graphene_info, *args, **kwargs)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/schema/roots/mutation.py", line 281, in mutate
return create_execution_params_and_launch_pipeline_exec(graphene_info, executionParams)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/schema/roots/mutation.py", line 259, in create_execution_params_and_launch_pipeline_exec
return launch_pipeline_execution(
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 32, in launch_pipeline_execution
return _launch_pipeline_execution(graphene_info, execution_params)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 66, in _launch_pipeline_execution
run = do_launch(graphene_info, execution_params, is_reexecuted)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 49, in do_launch
dagster_run = create_valid_pipeline_run(graphene_info, external_job, execution_params)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster_graphql/implementation/execution/run_lifecycle.py", line 79, in create_valid_pipeline_run
dagster_run = graphene_info.context.instance.create_run(
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 1395, in create_run
dagster_run = self._construct_run_with_snapshots(
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 1118, in _construct_run_with_snapshots
self._ensure_persisted_job_snapshot(job_snapshot, parent_job_snapshot)
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 1165, in _ensure_persisted_job_snapshot
check.invariant(
File "/home/gustavo/miniconda3/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1654, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
Full stack tracealex
06/06/2023, 5:27 PMdict[int,int]
is it something slightly more complex than that?Gustavo Carvalho
06/06/2023, 5:27 PMGustavo Carvalho
06/06/2023, 5:27 PMwith open(root / "json_definitions/circuits.json") as file:
circuits = json.load(file)
pontoId_mapping = {circuit["tag"]: circuit["id"] for circuit in circuits}
here's the code defining itGustavo Carvalho
06/06/2023, 5:28 PM[
{
"id": 100,
"tag": 5174
},
{
"id": 101,
"tag": 5173
},
{
"id": 102,
"tag": 5172
},
...
]
this is circuits.json
Gustavo Carvalho
06/06/2023, 5:29 PMGustavo Carvalho
06/06/2023, 5:31 PMGustavo Carvalho
06/06/2023, 5:33 PMpontoId_mapping
, btw.
Before (the asset could backfill even with the dict[int, int]
in the metadata)
pontoId_mapping = {circuit["id"]: circuit["tag"] for circuit in circuits}
After (the asset backfill started giving error)
pontoId_mapping = {circuit["tag"]: circuit["id"] for circuit in circuits}
alex
06/06/2023, 5:38 PMGustavo Carvalho
06/06/2023, 5:38 PMGustavo Carvalho
06/06/2023, 5:39 PMtag
and id
(edited above accordingly)alex
06/06/2023, 5:42 PMGustavo Carvalho
06/06/2023, 5:43 PMalex
06/06/2023, 5:44 PM>>> {4: 'x', '4': 'y'}
{4: 'x', '4': 'y'}
>>> d = {4: 'x', '4': 'y'}
>>> import json
>>> json.dumps(d)
'{"4": "x", "4": "y"}'
>>> json.loads(json.dumps(d))
{'4': 'y'}
Gustavo Carvalho
06/06/2023, 5:45 PMalex
06/06/2023, 5:46 PMGustavo Carvalho
06/06/2023, 5:46 PMid
or tag
, each field is "all uniques"Gustavo Carvalho
06/06/2023, 5:47 PMGustavo Carvalho
06/06/2023, 5:47 PMGustavo Carvalho
06/06/2023, 5:47 PMalex
06/06/2023, 5:52 PMGustavo Carvalho
06/06/2023, 5:52 PMGustavo Carvalho
06/06/2023, 5:53 PMGustavo Carvalho
06/06/2023, 5:53 PMalex
06/06/2023, 5:53 PMdict[str,int]
?Gustavo Carvalho
06/06/2023, 5:54 PMGustavo Carvalho
06/06/2023, 5:54 PM"pontoId_mapping": {str(k): v for k, v in pontoId_mapping.items()},
it worksalex
06/06/2023, 5:54 PMGustavo Carvalho
06/06/2023, 6:25 PMalex
06/06/2023, 7:31 PMGustavo Carvalho
06/07/2023, 12:46 AM