Qwame
04/04/2023, 2:35 PMzafar mahmood
04/06/2023, 9:03 AMNicolas Parot Alvarez
04/06/2023, 10:44 AMload_assets_from_dbt_manifest
with the select
argument?
We're trying to move out of load_assets_from_dbt_project
because of loading times, but the transition is not easy.
I'd like to see if there are working examples somewhere before I can tell if the problems are in my head or in the library.geoHeil
04/06/2023, 12:24 PMEdson Henrique
04/07/2023, 9:22 PMNicolas Luchetti
04/10/2023, 2:39 PMChris Dong
04/10/2023, 8:12 PMmartin o leary
04/11/2023, 11:10 AMsources.yml
file in my dbt project and are named "my_sources" for example. Do I need to do anything extra when loading the project to get the columne descriptions (and source table description) to show up in dagit?Chris Nogradi
04/11/2023, 10:26 PMRiccardo Tesselli
04/12/2023, 2:16 PMclaire
04/12/2023, 9:53 PMgeoHeil
04/13/2023, 9:51 AMRasmus Bonnevie
04/13/2023, 1:49 PMschema.yml
(so I have two sets of tables with identical names, in two different schemas) and now I'm getting errors of this type:
KeyError: 'schema1_schema2_tablename'
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 130, in core_execute_run
yield from execute_run_iterator(
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/execution/api.py", line 1096, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 536, in generate_setup_events
obj = next(self.generator)
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/execution/context_creation_pipeline.py", line 324, in orchestration_context_event_generator
context_creation_data = create_context_creation_data(
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/execution/context_creation_pipeline.py", line 142, in create_context_creation_data
resource_keys_to_init=get_required_resource_keys_to_init(
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/execution/resources_init.py", line 381, in get_required_resource_keys_to_init
get_required_resource_keys_for_step(pipeline_def, step, execution_plan)
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/execution/resources_init.py", line 415, in get_required_resource_keys_for_step
input_def = solid_def.input_def_named(step_input.name)
File "/home/rasmus/.cache/pypoetry/virtualenvs/annotator-WMT6nzGq-py3.8/lib/python3.8/site-packages/dagster/_core/definitions/node_definition.py", line 144, in input_def_named
return self._input_dict[name]
something in the node naming scheme appears to blend the schemas inadvertently?Raphael Pacheco
04/13/2023, 3:26 PMimport json
from dagster import (
asset,
Definitions,
load_assets_from_package_module
)
from dagster_dbt import load_assets_from_dbt_manifest
from dagster_airbyte_example.assets import airbyte
from .config.constants import DOMAIN, BI_OWNER
airbyte_assets = load_assets_from_package_module(
airbyte,
key_prefix="best_target"
)
manifest_file = "/home/raphael-pacheco/workspace/dagster-airbyte-example/best_target/target/manifest.json"
with open(manifest_file, encoding='utf-8') as f:
manifest = json.load(f)
dbt_assets = load_assets_from_dbt_manifest(
manifest,
key_prefix="best_target",
)
defs = Definitions(
assets=[*airbyte_assets] + dbt_assets,
)
What am I doing wrong?
Thanks in advanceEthan Leifer
04/13/2023, 10:28 PMDennis Gera
04/14/2023, 7:41 PMload_assets_from_dbt_manifest
already separate my dbt models with custom schemas as separate assets?Dennis Gera
04/17/2023, 12:23 PMdefine
an asset as a subset of a dbt loaded from manifest asset?Pablo Beltran
04/17/2023, 5:56 PMDennis Gera
04/17/2023, 8:56 PMload_from_manifest
function to load my dbt assets. However, I don’t commit dbt’s target file to my github repo. How can I use s3 to load and update my manifest.json file through dagster’s dbt runs?Aki Iwa
04/18/2023, 6:24 AMmodels:
modelA: # project name
modelA:
.....
modelB:
+schema: B
.....
Asset is automatically loaded in Data Lineage as follows.
• data asset
model/modelA/model_a_table
model/modelB/model_b_table
I am having trouble linking the modelB side and the materialization of the Data Lineage, although I am doing dbt runs daily and materializing at the same time as follows.
dbt_output = context.resources.dbt.run()
for materialization in generate_materializations(dbt_output):
yield materialization
This is because the “node” in the dbt_output output results in model.modelA.model_a_table, model.project.model_b_table
and therefore will be materialized as model.modelA.model_b_table.
Is there a good way to tie the materialization of the results from the daily dbt run to the data lineage?Danny Steffy
04/19/2023, 2:27 PMNicolas Luchetti
04/19/2023, 4:25 PMTypeError: '>' not supported between instances of 'str' and 'int'
Raphael da Silva Pacheco
04/20/2023, 2:14 PMload_assets_from_dbt_project
function, my lineage does not create a dependency between the two flows, much less define an execution order.
So I tried creating op and jobs to force the dependencies. I was able to set the dependencies as expected, but I ended up losing DBT lineage visibility.
How can I keep both scenarios?
Below I will send the codes and images.
Thank you very much!
Code without defined dependencies:
# dagster-dbt-example/__init__.py
from dagster import (
asset,
job,
Definitions
)
from dagster_dbt import load_assets_from_dbt_project, dbt_cli_resource
from .config.constants import DBT_PROJECT_DIR, DBT_PROFILE_DIR
from .assets.gcs import Extract
from .config.schemas import resources
@asset(group_name="bronze", compute_kind="python")
def bigquery_table_from_gcs_uri():
"""Function to send data of GCS files to BigQuery"""
return Extract(resource=resource).gcs_to_bq_from_uri()
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILE_DIR, key_prefix=["best_target"]
)
defs = Definitions(
assets=[bigquery_table_from_gcs_uri] + dbt_assets,
resources={
"dbt": dbt_cli_resource.configured(
{
"project_dir": DBT_PROJECT_DIR,
"profiles_dir": DBT_PROFILE_DIR
}
)
},
)
And this is the example with op and jobs, defining the dependencies:
# dagster-dbt-example/dbt/__init__.py
from dagster_dbt import dbt_run_op
from dagster import asset, job
from ...ops import get_resource
from ...assets.gcs import Extract
from ...config.schemas import resources
@asset(group_name="bronze", compute_kind="python")
def bigquery_table_from_gcs_uri():
"""Function to send data of GCS files to BigQuery from gs uri"""
for resource in resources:
print("RESOURCE ->", resource)
Extract(resource=resources[resource]).gcs_to_bq_from_uri()
@job(name="transformations", resource_defs=get_resource())
def dbt_pipeline():
dbt_run_op()
@asset(
metadata={"owner": "<mailto:bi.dev@neoway.com.br|bi.dev@neoway.com.br>", "domain": "neoway", "priority": 1},
group_name="silver",
compute_kind="dbt"
)
def run_dbt_instance(bigquery_table_from_gcs_uri):
"""Function that defines asset dependencies"""
dbt_pipeline.execute_in_process()
# dagster-dbt-example/__init__.py
from dagster import (
repository,
define_asset_job,
ScheduleDefinition,
load_assets_from_package_module,
with_resources,
asset,
job
)
from dagster_dbt import dbt_run_op
from .ops import get_resource
from .assets import dbt
from .config.constants import DBT_PROJECT_DIR, DBT_PROFILE_DIR
from .assets.gcs import Extract
from .config.schemas import resources
resource = resources["pessoa_indicadores_veiculares"]
@asset(group_name="bronze", compute_kind="python")
def bigquery_table_from_gcs_uri():
"""Function to send data of GCS files to BigQuery"""
return Extract(resource=resource).gcs_to_bq_from_uri()
@job(name="transformations", resource_defs=get_resource())
def dbt_pipeline():
dbt_run_op()
@repository
def repository():
return [
with_resources(
definitions=load_assets_from_package_module(dbt),
resource_defs=get_resource()
),
ScheduleDefinition(
job=define_asset_job("all_assets", selection=["*"]),
cron_schedule="@daily"
)
]
Again, thanks to anyone who can help me ;)Marco Porracin
04/21/2023, 5:56 PMload_assets_from_dbt_manifest
to load everything since i have my dbt project in another repository.Joel Olazagasti
04/25/2023, 5:12 PMDataVersion
based on a query to the table it's generated from? For example, getting the number of rows.Guy McCombe
04/27/2023, 3:02 PMdagster._core.errors.DagsterImportError: Encountered ImportError: `No module named 'dbt.main'` while importing module foo.bar.
rex
04/27/2023, 3:21 PMdagster-dbt
is incompatible with the newly released dbt 1.5. We are shipping a fix to resolve this. In the meantime, if you are encountering an error with the integration, we suggest that you pin dbt-core<1.5
!Charlie Bini
04/27/2023, 8:31 PMload_assets_from_dbt_manifest
?BK
04/28/2023, 12:51 PMRaphael da Silva Pacheco
04/28/2023, 5:27 PM