Hi teams, 1. I got this error when running the bac...
# ask-community
j
Hi teams, 1. I got this error when running the backfill partitions. (Image)
Copy code
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) disk I/O error
[SQL: SELECT dynamic_partitions.partition 
FROM dynamic_partitions 
WHERE dynamic_partitions.partition IN (?) AND dynamic_partitions.partitions_def_name = ?]
[parameters: ('OPTOMATE_4507_031 | Australia/Sydney | 2023-03-20 00:00:00 | BACKFILL', 'fivetran_OPTOMATE_4507_031')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)

  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_grpc/impl.py", line 120, in core_execute_run
    recon_job.get_definition()
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 257, in get_definition
    return self.repository.get_definition().get_maybe_subset_job_def(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 120, in get_definition
    return repository_def_from_pointer(self.pointer, self.repository_load_data)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 741, in repository_def_from_pointer
    target = def_from_pointer(pointer)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/definitions/reconstruct.py", line 633, in def_from_pointer
    target = pointer.load_target()
             ^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 224, in load_target
    module = load_python_module(self.module, self.working_directory)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
    return importlib.import_module(module_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1206, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1178, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1149, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 690, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/__init__.py", line 19, in <module>
    from <http://dbt_vault.jobs|dbt_vault.jobs> import jobs_dict
  File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/jobs/__init__.py", line 5, in <module>
    from dbt_vault.partitions import partitions_dict
  File "/mnt/e/CMC_Project/pycharm-devcontainer-experiments/dagster/projects/dbt-vault/dbt_vault/partitions/__init__.py", line 42, in <module>
    instance.add_dynamic_partitions(f"fivetran_{source}", [partition_key])
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 649, in inner
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/instance/__init__.py", line 1851, in add_dynamic_partitions
    return self._event_storage.add_dynamic_partitions(partitions_def_name, partition_keys)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/dagster/_core/storage/event_log/sql_event_log.py", line 1827, in add_dynamic_partitions
    existing_rows = conn.execute(
                    ^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)

The above exception was caused by the following exception:
sqlite3.OperationalError: disk I/O error

  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/home/julius/.virtualenvs/pycharm-devcontainer-experiments/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
This won't happen when I only run 1 backfill partition and only happen when I run more than 1 partition. It also success when I re-execute the fail backfill. 2. I'm using
partition_key_to_vars_fn
to translate a given partition_key to a dictionary of vars to be passed into the dbt invocation. I know that we can add some specific information to the
partition_key
, so that the
partition_key_to_vars_fn
can using that for dbt vars. But that will make the partition_key really long and hard to read. Can I access some information of the dbt_assest like key_prefix from my function for
partition_key_to_vars_fn
. Example, I have this dbt-asset:
Copy code
dbt_assets = load_assets_from_dbt_project(
            # Create asset using dbt project
            project_dir=DBT_PROJECT_PATH,
            profiles_dir=DBT_PROFILES,
            select=f"tag:{source}",
            op_name=f"{dbt_ops}",
            partitions_def=partitions_dict[f"fivetran_{source}"],
            key_prefix=source,     # <------ key_prefix using source
            partition_key_to_vars_fn=date_from_partition_key,
            use_build_command=True,
            display_raw_sql=True
        )
and the
date_from_partition_key
function I using look like this:
Copy code
def date_from_partition_key(input_str: str) -> Mapping[str, Any]:
    instance = DagsterInstance.get()
    all_partition = instance.get_dynamic_partitions(
        partitions_def_name=f"fivetran_{source}")         #  <----------------- source

    index = all_partition.index(input_str)

    if code == "INIT":
        last_incremental_load_end_date = datetime.min.strftime("%Y-%m-%d %H:%M:%S")
        incremental_load_end_date = date
    else:
        _, _, last_incremental_load_end_date, _ = partition_key_split(all_partition[index - 1])
        incremental_load_end_date = date

    return {
        "last_incremental_load_end_date": last_incremental_load_end_date,
        "incremental_load_end_date": incremental_load_end_date  # <----use for vars
    }
If you look at the
date_from_partition_key
, I need the
source
variables (also is the
key_prefix
of the dbt-assest) to get the specific dynamic partition. I don't want to add the source to partition_key and make that key long. Any solutions?
s
Hi Julius - is it possible that you hav a large number of dynamic partitions and are running on a machine with low disk space are are running out of disk space?
j
I had check, my machine is not low disk space or running out of disk space
@sandy I reduce the partition amount from 191 to 7, and the backfill is running well. Does the failed cause by the number of jobs running at the same time? How can we handle this?
s
🤖 1