Julius
06/08/2023, 10:18 AMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) disk I/O error
[SQL: SELECT dynamic_partitions.partition
FROM dynamic_partitions
WHERE dynamic_partitions.partition IN (?) AND dynamic_partitions.partitions_def_name = ?]
[parameters: ('OPTOMATE_4507_031 | Australia/Sydney | 2023-03-20 00:00:00 | BACKFILL', 'fivetran_OPTOMATE_4507_031')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 120, in core_execute_run
recon_job.get_definition()
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 257, in get_definition
return self.repository.get_definition().get_maybe_subset_job_def(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 120, in get_definition
return repository_def_from_pointer(self.pointer, self.repository_load_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 741, in repository_def_from_pointer
target = def_from_pointer(pointer)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 633, in def_from_pointer
target = pointer.load_target()
^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 224, in load_target
module = load_python_module(self.module, self.working_directory)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
return importlib.import_module(module_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1206, in _gcd_import
File "<frozen importlib._bootstrap>", line 1178, in _find_and_load
File "<frozen importlib._bootstrap>", line 1149, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 690, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/__init__.py", line 19, in <module>
from <http://dbt_vault.jobs|dbt_vault.jobs> import jobs_dict
File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/jobs/__init__.py", line 5, in <module>
from dbt_vault.partitions import partitions_dict
File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/partitions/__init__.py", line 42, in <module>
instance.add_dynamic_partitions(f"fivetran_{source}", [partition_key])
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 649, in inner
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/instance/__init__.py", line 1851, in add_dynamic_partitions
return self._event_storage.add_dynamic_partitions(partitions_def_name, partition_keys)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/storage/event_log/sql_event_log.py", line 1827, in add_dynamic_partitions
existing_rows = conn.execute(
^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
The above exception was caused by the following exception:
sqlite3.OperationalError: disk I/O error
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
This won't happen when I only run 1 backfill partition and only happen when I run more than 1 partition. It also success when I re-execute the fail backfill.
2. I'm using partition_key_to_vars_fn
to translate a given partition_key to a dictionary of vars to be passed into the dbt invocation. I know that we can add some specific information to the partition_key
, so that the partition_key_to_vars_fn
can using that for dbt vars. But that will make the partition_key really long and hard to read. Can I access some information of the dbt_assest like key_prefix from my function for partition_key_to_vars_fn
.
Example, I have this dbt-asset:
dbt_assets = load_assets_from_dbt_project(
# Create asset using dbt project
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES,
select=f"tag:{source}",
op_name=f"{dbt_ops}",
partitions_def=partitions_dict[f"fivetran_{source}"],
key_prefix=source, # <------ key_prefix using source
partition_key_to_vars_fn=date_from_partition_key,
use_build_command=True,
display_raw_sql=True
)
and the date_from_partition_key
function I using look like this:
def date_from_partition_key(input_str: str) -> Mapping[str, Any]:
instance = DagsterInstance.get()
all_partition = instance.get_dynamic_partitions(
partitions_def_name=f"fivetran_{source}") # <----------------- source
index = all_partition.index(input_str)
if code == "INIT":
last_incremental_load_end_date = datetime.min.strftime("%Y-%m-%d %H:%M:%S")
incremental_load_end_date = date
else:
_, _, last_incremental_load_end_date, _ = partition_key_split(all_partition[index - 1])
incremental_load_end_date = date
return {
"last_incremental_load_end_date": last_incremental_load_end_date,
"incremental_load_end_date": incremental_load_end_date # <----use for vars
}
If you look at the date_from_partition_key
, I need the source
variables (also is the key_prefix
of the dbt-assest) to get the specific dynamic partition. I don't want to add the source to partition_key and make that key long.
Any solutions?sandy
06/08/2023, 4:30 PMJulius
06/09/2023, 1:44 AMJulius
06/09/2023, 1:54 AMsandy
06/09/2023, 2:58 PM