https://dagster.io/ logo
#ask-ai
Title
# ask-ai
d

Darryl Ng

02/22/2024, 6:59 AM
@Scout when I load dbt models into dagster using dbt_assets decorator, how can I get the assets that are representation of external sources that cannot be materialized?
s

Scout

02/22/2024, 6:59 AM
When you load dbt models into Dagster using the
dbt_assets
decorator, external sources that cannot be materialized within Dagster can be represented using SourceAssets. According to the documentation, you can define these external sources as SourceAssets, which allows you to add descriptions, generate asset observations, and create Data Version functions for them. Here's an example from the documentation on how to define a dbt source as a SourceAsset:
Copy code
python from dagster import SourceAsset from dagster_dbt import get_asset_key_for_source  my_dbt_source_as_dagster_asset = SourceAsset(     key=get_asset_key_for_source([my_dbt_assets], "test"),     description="my description" )
In this example,
get_asset_key_for_source
is used to get the asset key for the dbt source, and
SourceAsset
is used to define the source as a Dagster asset. This allows you to document the dbt source within Dagster's asset catalog, even though it cannot be materialized by Dagster itself. For more complex scenarios where you want to generate asset observations or trigger runs based on the presence of new data in the source, you can use an observable source asset. The documentation provides guidance on how to use
observable_source_asset
for these purposes. Remember that SourceAssets are a way to represent external data sources within Dagster's asset graph, and they are not materialized by Dagster but can be used to define dependencies and document external data sources within your asset graph.
d

Darryl Ng

02/22/2024, 7:01 AM
@Scout in this case, you are specifying the external source named "test". However, I have hundreds of them and simply want to select them into code. How can I do this easily?
s

Scout

02/22/2024, 7:01 AM
If you have hundreds of external sources in your dbt project and you want to select them into your Dagster code, you can use the
load_assets_from_dbt_manifest
function provided by Dagster's dbt integration. This function allows you to load all dbt models and sources as Dagster assets based on the dbt manifest file. Here's an example of how you might use
load_assets_from_dbt_manifest
to load all dbt sources:
Copy code
python from dagster_dbt import load_assets_from_dbt_manifest  # Assuming you have a dbt manifest.json file dbt_manifest_path = "path/to/your/dbt/manifest.json"  # Load all dbt models and sources as Dagster assets dbt_assets = load_assets_from_dbt_manifest(dbt_manifest_path)
This will create Dagster assets for each dbt model and source defined in your dbt project. If you want to filter and only select specific sources, you can use the
selected_unique_ids
parameter of the
load_assets_from_dbt_manifest
function to specify which sources to include. This parameter accepts a list of unique IDs that correspond to the dbt models or sources you want to select. For example, if you have a list of unique IDs for the sources you want to include, you can do the following:
Copy code
python selected_sources = ["source1_id", "source2_id", "source3_id", ...]  # List of unique IDs for your sources  dbt_assets = load_assets_from_dbt_manifest(     dbt_manifest_path,     selected_unique_ids=selected_sources )
This will load only the dbt sources with the unique IDs specified in the
selected_sources
list. If you have a large number of sources and you want to automate the selection process, you might consider writing a script that generates the list of unique IDs based on some criteria, such as a naming convention or metadata in your dbt project. Please note that the exact function signatures and parameters may vary depending on the version of Dagster and its dbt integration library you are using, so you should refer to the specific documentation for your version to ensure you are using the functions correctly.
d

Darryl Ng

02/22/2024, 7:18 AM
How can I associate an op to the materialization of these source assets that are loaded from dbt manifest?
@Scout How can I associate an op to the materialization of these source assets that are loaded from dbt manifest?
s

Scout

02/22/2024, 7:19 AM
To associate an op with the materialization of source assets that are loaded from a dbt manifest, you can define an op that represents the action you want to perform when materializing the asset. Then, you can use this op within a job that includes the dbt assets. Here's a step-by-step guide on how to do this: 1. Define an op that performs the desired action. This could be anything from running a dbt command to executing custom logic.
Copy code
python from dagster import op  @op def my_custom_materialization_op(context):     # Your custom logic here     <http://context.log.info|context.log.info>("Performing custom materialization logic")
2. Load your dbt assets from the dbt manifest as shown in the previous response.
Copy code
python from dagster_dbt import load_assets_from_dbt_manifest  dbt_manifest_path = "path/to/your/dbt/manifest.json" dbt_assets = load_assets_from_dbt_manifest(dbt_manifest_path)
3. Create a job that includes both the dbt assets and the custom op. You can use the
to_job
method on the assets to create a job, and then add your custom op to the job definition.
Copy code
python from dagster import job  @job def my_dbt_job():     # Include your dbt assets in the job     for asset in dbt_assets:         asset.to_job()      # Include your custom op in the job     my_custom_materialization_op()
4. If you need to create dependencies between your dbt assets and the custom op, you can define these dependencies explicitly in the job using the
after
parameter.
Copy code
python @job def my_dbt_job_with_dependencies():     # Assume `dbt_asset_op` is an op that corresponds to a dbt asset     dbt_asset_op_result = dbt_asset_op()      # Run `my_custom_materialization_op` after `dbt_asset_op`     my_custom_materialization_op.after(dbt_asset_op_result)
5. When you execute the job, Dagster will run the ops in the order defined by the dependencies. The custom materialization logic will be executed as part of the job's execution plan. Please note that the actual implementation details may vary based on your specific use case and the version of Dagster you are using. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with dbt assets and defining jobs and ops.
d

Darryl Ng

03/08/2024, 3:05 AM
@Scout I have declared a SourceAsset that is given an op that returns AssetObservation, with a metadata key-value when run. However, whenever I run the op, is it possible to get the last observed metadata value in order to compare with a new value obtained?
s

Scout

03/08/2024, 3:05 AM
Yes, it is possible to get the last observed metadata value of a SourceAsset in Dagster. You can do this by querying the event log for the latest asset observation event for the given asset key. The
AssetObservation
event records metadata about assets, and you can access this metadata from the event log. Here's a general approach to accessing the last observed metadata value: 1. Use the
DagsterInstance
to query the event log for the latest
AssetObservation
event for the asset key associated with your SourceAsset. 2. Extract the metadata from the
AssetObservation
event. Here's an example of how you might implement this in an op:
Copy code
python from dagster import op, AssetKey, DagsterInstance, EventRecordsFilter, DagsterEventType  @op(required_resource_keys={"instance"}) def my_op(context):     # Get the Dagster instance from the context     instance = context.resources.instance      # Define the asset key for which you want to retrieve the last observation     asset_key = AssetKey(["my_source_asset"])      # Query the event log for the latest AssetObservation event for the asset key     records = instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_OBSERVATION,             asset_key=asset_key,         ),         ascending=False,  # Get the most recent event first         limit=1,     )      # Check if there is an existing observation event     if records:         last_observation = records[0].event_log_entry.dagster_event.event_specific_data         last_metadata = last_observation.metadata_entries          # Extract the metadata value you're interested in         # Assuming the metadata entry is a dictionary with a key 'my_metadata_key'         last_value = last_metadata.get('my_metadata_key')          # Your logic to compare with the new value         new_value = ...  # Obtain the new value         if new_value != last_value:             # Perform your comparison logic             pass      # If there are no observation events, this might be the first run     else:         # Handle the case where there is no previous observation         pass
In this example,
my_op
is an op that retrieves the last observed metadata value for the asset key
["my_source_asset"]
. It uses the
get_event_records
method on the
DagsterInstance
to query the event log for the latest
AssetObservation
event. The metadata is then extracted from the event and can be used for comparison with a new value. Please note that the actual code may need to be adjusted based on the structure of your metadata entries and the specific details of your implementation. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with asset observations and the event log.
d

Darryl Ng

03/08/2024, 3:24 AM
@Scout When I use observable_source_asset() decorator to create a SourceAsset, how can I access the previous asset observation within the definition itself?
s

Scout

03/08/2024, 3:24 AM
When using the
observable_source_asset
decorator to create a SourceAsset, you can access the previous asset observation within the definition itself by querying the event log for the latest asset observation event for the asset key associated with your SourceAsset. This is done using the
DagsterInstance
to interact with the event log. Here's a general approach to accessing the last observed metadata value within the
observable_source_asset
definition: 1. Define your
observable_source_asset
with an observation function. 2. Within the observation function, use the
DagsterInstance
to query the event log for the latest
AssetObservation
event for the asset key. 3. Extract the metadata from the
AssetObservation
event. Here's an example of how you might implement this:
Copy code
python
from dagster import observable_source_asset, DagsterInstance, EventRecordsFilter, DagsterEventType

@observable_source_asset
def my_observable_source_asset(context):
    # Get the Dagster instance from the context
    instance = DagsterInstance.get()

    # Define the asset key for which you want to retrieve the last observation
    asset_key = context.asset_key

    # Query the event log for the latest AssetObservation event for the asset key
    records = instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_OBSERVATION,
            asset_key=asset_key,
        ),
        ascending=False,  # Get the most recent event first
        limit=1,
    )

    # Check if there is an existing observation event
    if records:
        last_observation = records[0].event_log_entry.dagster_event.event_specific_data
        last_metadata = last_observation.metadata_entries

        # Extract the metadata value you're interested in
        # Assuming the metadata entry is a dictionary with a key 'my_metadata_key'
        last_value = last_metadata.get('my_metadata_key')

        # Your logic to compare with the new value
        new_value = ...  # Obtain the new value
        if new_value != last_value:
            # Perform your comparison logic
            pass

    # If there are no observation events, this might be the first run
    else:
        # Handle the case where there is no previous observation
        pass

    # Return the new observation result, possibly with new metadata
    return ...
In this example,
my_observable_source_asset
is an observable source asset that retrieves the last observed metadata value for its own asset key. It uses the
get_event_records
method on the
DagsterInstance
to query the event log for the latest
AssetObservation
event. The metadata is then extracted from the event and can be used for comparison with a new value. Please note that the actual code may need to be adjusted based
on the structure of your metadata entries and the specific details of your implementation. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with asset observations and the event log.
d

Darryl Ng

03/08/2024, 3:28 AM
This does not seem to work because I am getting
Op my_observable_source_asset does not have an assets definition
. How can I get the asset name when the source asset itself hasn't been created?
@Scout This does not seem to work because I am getting
Op my_observable_source_asset does not have an assets definition
. How can I get the asset name when the source asset itself hasn't been created?
s

Scout

03/08/2024, 3:28 AM
The error message you're encountering,
Op my_observable_source_asset does not have an assets definition
, suggests that there might be a mismatch between the expectations of the Dagster framework and the way the
observable_source_asset
is being used or defined. When using
observable_source_asset
, it's important to remember that the function decorated with
observable_source_asset
is expected to return an observation result, such as a
DataVersion
,
DataVersionsByPartition
, or an
ObserveResult
. This function is wrapped inside of an op and can be executed as part of a job, with each execution generating an
AssetObservation
event associated with the source asset. However, based on the error message, it seems that Dagster is expecting an asset definition but is instead finding an op definition. This could be due to a misconfiguration or misunderstanding of how
observable_source_asset
should be used. To resolve this issue, ensure that you are using
observable_source_asset
correctly: 1. The function decorated with
observable_source_asset
should not be manually invoked or treated as a regular op. It is a special kind of asset that Dagster will manage. 2. Make sure you are using the latest version of Dagster, as APIs can change, and ensure that you are following the current documentation for
observable_source_asset
. 3. Verify that you are not mixing observable source assets with regular materializable assets in a way that is not supported by Dagster. If you continue to encounter issues, it may be helpful to consult the Dagster community or support channels with specific details about your implementation, as they may be able to provide more targeted assistance. Unfortunately, without more context or a detailed look at your code, it's challenging to provide a precise solution. The documentation does not provide a direct example of accessing the previous asset observation within the
observable_source_asset
definition itself, so if the standard approach to querying the event log is not working, it may be a more complex issue that requires further investigation.
d

Darryl Ng

03/08/2024, 3:37 AM
@Scout How can I get an observable source asset's last AssetObservation result from an OpExecutionContext of a given SourceAsset?
s

Scout

03/08/2024, 3:37 AM
As of my knowledge cutoff date in early 2023, there isn't a direct way to access the last
AssetObservation
result from the
OpExecutionContext
of a given
SourceAsset
within the
observable_source_asset
definition itself. The
OpExecutionContext
is designed to provide context for an op's execution, including logging and resource access, but it does not have built-in methods to query the event log for asset observations. However, you can access the event log to retrieve the last
AssetObservation
for a given
SourceAsset
using the
DagsterInstance
outside of the
OpExecutionContext
. Here's a general approach: 1. Use the
DagsterInstance
to query the event log for the latest
AssetObservation
event for the asset key associated with your
SourceAsset
. 2. Extract the metadata from the
AssetObservation
event. Here's an example of how you might implement this in a sensor or some other context where you have access to the `DagsterInstance`:
Copy code
python from dagster import DagsterInstance, EventRecordsFilter, DagsterEventType, AssetKey  def get_last_asset_observation(instance: DagsterInstance, asset_key: AssetKey):     records = instance.get_event_records(         EventRecordsFilter(             event_type=DagsterEventType.ASSET_OBSERVATION,             asset_key=asset_key,         ),         ascending=False,  # Get the most recent event first         limit=1,     )      if records:         last_observation = records[0].event_log_entry.dagster_event.event_specific_data         return last_observation      return None  # Usage instance = DagsterInstance.get()  # Get the current Dagster instance asset_key = AssetKey(["my_source_asset"]) last_observation = get_last_asset_observation(instance, asset_key)  if last_observation:     # Do something with the last observation     pass
If you need to access the last
AssetObservation
within an op or asset computation, you would typically pass this information as an input to the op or asset, rather than trying to retrieve it from the
OpExecutionContext
. Remember that the ability to query the event log for asset observations is typically used in monitoring and orchestration contexts, such as within sensors or other system-level components, rather than within the computation logic of ops or assets themselves. If there have been updates to the Dagster API after my knowledge cutoff date that provide new capabilities for accessing asset observations directly within the
OpExecutionContext
, you should refer to the latest Dagster documentation or check with the Dagster community for the most up-to-date information.
d

Darryl Ng

03/08/2024, 3:45 AM
@Scout do I return
DataVersion
or
AssetObservation
when using observable_source_asset() decorator
s

Scout

03/08/2024, 3:45 AM
When using the
observable_source_asset
decorator in Dagster, you should return a
DataVersion
or
ObserveResult
object from the observation function. The
DataVersion
object represents a version of the data that the asset is observing, and the
ObserveResult
object can include additional metadata or checks along with the data version. Here's an example from the documentation that shows how to use the
observable_source_asset
decorator with a `DataVersion`:
Copy code
python from dagster import DataVersion, observable_source_asset  @observable_source_asset def my_observable_source_asset():     # Your logic to determine the data version     data_version_str = "some_version_string"     return DataVersion(data_version_str)
In this example, the
my_observable_source_asset
function calculates a data version string and returns it wrapped in a
DataVersion
object. Every time you run the observation function, a new
AssetObservation
will be generated with this data version. If you want to include additional metadata or checks, you can return an
ObserveResult
object instead:
Copy code
python from dagster import ObserveResult, observable_source_asset, MetadataEntry  @observable_source_asset def my_observable_source_asset_with_metadata():     # Your logic to determine the data version and metadata     data_version_str = "some_version_string"     metadata = {"key": "value"}      return ObserveResult(         data_version=DataVersion(data_version_str),         metadata=[MetadataEntry("label", value=metadata)]     )
In this example, the
my_observable_source_asset_with_metadata
function returns an
ObserveResult
object that includes both a
DataVersion
and additional metadata. When the observation function is run, Dagster will generate an
AssetObservation
event for the source asset and tag it with the returned data version. If you include metadata or checks in an
ObserveResult
, those will also be recorded with the
AssetObservation
event. Please refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with
observable_source_asset
and the types of objects you can return from the observation function.
d

Darryl Ng

03/08/2024, 3:48 AM
@Scout How can I set a SourceAsset's data version in an op?
s

Scout

03/08/2024, 3:49 AM
In Dagster, a
SourceAsset
is a representation of an external data source, and its data version is typically set using an observation function when you define the
SourceAsset
using the
observable_source_asset
decorator. This observation function is responsible for computing and returning a
DataVersion
that represents the version of the data at the time of observation. However, if you want to set or update the data version of a
SourceAsset
within an op, you would generally not do this directly. Instead, you would define an observation function that is responsible for observing the external data source and computing its data version. Here's an example of how you might define an observable source asset with an observation function:
Copy code
python from dagster import observable_source_asset, DataVersion  @observable_source_asset def my_external_data_source(context):     # Logic to compute the data version of the external data source     data_version_str = compute_data_version_somehow()     return DataVersion(data_version_str)
In this example,
compute_data_version_somehow()
represents your custom logic to determine the data version of the external data source. The
DataVersion
object is then returned by the observation function. If you need to trigger the observation of a
SourceAsset
from within an op, you would typically do this by invoking the observation function as part of your op's logic. However, this is not a common pattern, as observations are usually performed outside of the execution of ops, such as through a schedule or sensor that periodically checks the external data source. If you find yourself needing to set or update a `SourceAsset`'s data version from within an op, it may be worth reconsidering your data pipeline's design to ensure that it aligns with Dagster's asset model and best practices. For more specific guidance on how to work with
SourceAsset
observations and data versions in Dagster, please refer to the latest documentation or seek advice from the Dagster community.
d

Darryl Ng

03/08/2024, 5:47 AM
@Scout I want to convert dbt assets (using dbt_asset decorator) with materialized: incremental configs to be a partitioned asset in Dagster. How should I go about it?
s

Scout

03/08/2024, 5:47 AM
As of my knowledge cutoff date, the
dbt_asset
decorator in Dagster is used to load dbt models as Dagster assets. If you have dbt models with
materialized: incremental
configurations and you want to convert them to partitioned assets in Dagster, you would typically need to handle the partitioning logic within your dbt models and then reflect that partitioning in Dagster. Here's a general approach to how you might go about it: 1. Partitioning in dbt: Ensure that your dbt models are set up for incremental loading with the appropriate partitioning logic. This usually involves using dbt's incremental materialization along with a partitioning column in your model SQL. 2. Load dbt models as Dagster assets: Use the
dbt_asset
decorator to load your dbt models as Dagster assets. This will create assets in Dagster that correspond to your dbt models. 3. Define partitions in Dagster: Define partitions in Dagster that correspond to the partitions in your dbt models. This could be based on time (e.g., daily, hourly) or any other logical partitioning scheme that makes sense for your data. 4. Pass partition information to dbt: When materializing your dbt models in Dagster, pass the relevant partition information to dbt as variables. This allows dbt to only process the data for the specified partition. Here's an example of how you might define a partitioned asset in Dagster that corresponds to a dbt model:
Copy code
python
from dagster import Partition, PartitionSetDefinition, repository
from dagster_dbt import dbt_cli_resource, dbt_asset

# Define your partition set
def date_partition(start_date, end_date):
    return PartitionSetDefinition(
        name="date_partition_set",
        pipeline_name="my_dbt_pipeline",
        partition_fn=lambda: [Partition(date) for date in date_range(start_date, end_date)],
        run_config_fn_for_partition=lambda partition: {"solids": {"my_dbt_model": {"config": {"date": partition.value}}}},
    )

# Load your dbt models as Dagster assets
@dbt_asset
def my_dbt_model(context, dbt: DbtCliResource):
    # Pass the partition date to dbt as a variable
    date = context.solid_config["date"]
    dbt.cli(["run", "--models", "my_dbt_model", "--vars", f"{{'date': '{date}'}}"])

# Define your repository
@repository
def my_repo():
    return [my_dbt_model, date_partition.to_definition()]
In this example,
my_dbt_model
is a dbt model that is loaded as a Dagster asset. The
date_partition
function defines a partition set based on a date range, and the
run_config_fn_for_partition
function generates the run configuration for each partition, passing the partition date to the dbt model. Please note that the actual implementation details may vary based on your specific use case and the version of Dagster and dbt you
are using. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with dbt assets and defining partitioned assets. Additionally, since Dagster and dbt are actively developed, there may be new features or changes after my knowledge cutoff date that could affect how this process is handled.
d

Darryl Ng

03/08/2024, 5:55 AM
@Scout How can I use dagster to determine which dbt model is taking the longest time and processing the most data? Is it possible to collect these information from run_results and perform a filter and sort?
s

Scout

03/08/2024, 5:55 AM
In Dagster, you can use the integration with dbt to collect metadata from dbt's
run_results.json
file, which is generated after a dbt run. This file contains detailed information about each model's execution, including timing and row counts. To determine which dbt model is taking the longest time and processing the most data, you can follow these steps: 1. Run dbt with Dagster: Use Dagster's dbt integration to run your dbt project. This can be done using the
dbt_cli_resource
or by defining assets with the
dbt_asset
decorator. 2. Collect run results: After the dbt run, the
run_results.json
file will be generated. You can write a Dagster asset or op that reads this file and parses the JSON data to extract the relevant information. 3. Analyze the data: Once you have the data from
run_results.json
, you can analyze it to find the models with the longest execution times and the highest row counts. You can perform filtering and sorting based on these metrics. 4. Report the results: You can create a Dagster asset that outputs the results of your analysis, or you can log the information using Dagster's logging system. Here's an example of how you might implement an asset in Dagster to analyze dbt run results: ```python import json from dagster import asset, AssetMaterialization, MetadataValue @asset def analyze_dbt_run_results(context): # Path to the run_results.json file generated by dbt run_results_path = '/path/to/dbt/target/run_results.json' # Read and parse the run_results.json file with open(run_results_path, 'r') as f: run_results = json.load(f) # Extract timing and row count information for each model model_timings = [] for result in run_results['results']: model_name = result['unique_id'] execution_time = result['execution_time'] row_count = result.get('table', {}).get('rows_produced', None) model_timings.append({ 'model': model_name, 'execution_time': execution_time, 'row_count': row_count }) # Sort the models by execution time (descending) sorted_models_by_time = sorted(model_timings, key=lambda x: x['execution_time'], reverse=True) # Sort the models by row count (descending) sorted_models_by_rows = sorted(model_timings, key=lambda x: x['row_count'], reverse=True) # Log the top model by execution time top_model_by_time = sorted_models_by_time[0] context.log.info(f"Longest running model: {top_model_by_time['model']} with execution time {top_model_by_time['execution_time']} seconds") # Log the top model by row count top_model_by_rows = sorted_models_by_rows[0] context.log.info(f"Model with most rows: {top_model_by_rows['model']}
with row count {top_model_by_rows['row_count']}") # Yield materializations with metadata for observability yield AssetMaterialization( asset_key='analyze_dbt_run_results', metadata={ 'longest_running_model': MetadataValue.text(top_model_by_time['model']), 'most_rows_model': MetadataValue.text(top_model_by_rows['model']), } ) ``` In this example, the
analyze_dbt_run_results
asset reads the
run_results.json
file, extracts the timing and row count information for each model, sorts the models by execution time and row count, logs the top models, and yields an
AssetMaterialization
with metadata for observability. Please note that the actual implementation details may vary based on your specific use case and the version of Dagster and dbt you are using. Always refer to the latest Dagster documentation for the most accurate and up-to-date guidance on working with dbt assets and analyzing dbt run results. Additionally, since Dagster and dbt are actively developed, there may be new features or changes after my knowledge cutoff date that could affect how this process is handled.
d

Darryl Ng

03/10/2024, 11:45 PM
@Scout give me an example of converting dbt_asset that are incremental in materialized config, and marking it as partitioned so that the last ingestion time can be passed to dbt as a variable to set the incremental filter when loading from a source data.
s

Scout

03/10/2024, 11:45 PM
As of my knowledge cutoff date, Dagster does not natively support partitioned dbt assets directly through the
dbt_asset
decorator. However, you can work around this limitation by manually orchestrating dbt runs with partition information and passing that information as variables to dbt. Here's a conceptual example of how you might approach this: 1. Define a Dagster asset that represents the partitioned source data. This asset will be responsible for tracking the last ingestion time. 2. Create a Dagster job that runs the dbt model. This job will take the last ingestion time from the source asset and pass it to dbt as a variable. 3. In your dbt model, use the variable to filter the data for incremental processing. Here's a simplified example of how this might look in code:
Copy code
python
from dagster import job, op, AssetKey, asset, AssetMaterialization, MetadataValue
from dagster_dbt import dbt_cli_resource

# Define an asset that represents the partitioned source data
@asset
def source_data_partition(context):
    # Logic to determine the last ingestion time
    last_ingestion_time = get_last_ingestion_time_somehow()
    
    # Yield an asset materialization with the last ingestion time as metadata
    yield AssetMaterialization(
        asset_key=AssetKey(["source_data_partition"]),
        metadata={"last_ingestion_time": MetadataValue.text(last_ingestion_time)}
    )
    
    # Return the last ingestion time so it can be passed to dbt
    return last_ingestion_time

# Define an op that runs the dbt model with the last ingestion time as a variable
@op(required_resource_keys={"dbt"})
def run_dbt_incremental_model(context, last_ingestion_time):
    # Run the dbt model with the last ingestion time as a variable
    context.resources.dbt.run(
        models=["my_incremental_model"],
        vars={"last_ingestion_time": last_ingestion_time}
    )

# Define a job that orchestrates the dbt run
@job(resource_defs={"dbt": dbt_cli_resource.configured({"project_dir": "path/to/dbt/project"})})
def my_dbt_job():
    last_ingestion_time = source_data_partition()
    run_dbt_incremental_model(last_ingestion_time)

# Example function to get the last ingestion time (you would implement this)
def get_last_ingestion_time_somehow():
    # Logic to determine the last ingestion time
    return "2023-01-01T00:00:00"
In this example,
source_data_partition
is an asset that yields the last ingestion time. The
run_dbt_incremental_model
op takes this time and passes it to dbt as a variable. The
my_dbt_job
job orchestrates the execution of these steps. In your dbt model, you would use the
last_ingestion_time
variable to filter the data for incremental processing: ```sql -- my_incremental_model.sql {{ config(materialized='incremental') }} SELECT * FROM {{ sou
rce('my_source', 'my_table') }} {% if is_incremental() %} -- Only process data that is newer than the last ingestion time WHERE updated_at > '{{ var("last_ingestion_time") }}' {% endif %} ``` This is a high-level example, and the actual implementation may vary based on your specific requirements and the capabilities of your data platform. Always refer to the latest Dagster and dbt documentation for the most accurate and up-to-date guidance on integrating these tools. Additionally, since Dagster and dbt are actively developed, there may be new features or changes after my knowledge cutoff date that could affect how this process is handled.
8 Views