I've defined dbt assets using the dagster_dbt.dbt_...
# ask-ai
a
I've defined dbt assets using the dagster_dbt.dbt_assets decorator. I use it multiple times, one to select full-refresh models and another for incremental. The incremental models are partitioned as outlined in the documentation. The issue I'm having is I have defined a job that consists of both partitioned and non-partitioned assets (underlying dbt models). However, I can't seem to run it without error. Is this even possible to mix partitioned and non-partitioned assets in the same job?
s
Based on the documentation, it is indeed possible to work with both partitioned and non-partitioned dbt assets within Dagster. The documentation provides an example of how to handle a dbt project with both partitioned and unpartitioned assets. The suggested approach is to have two separate
@dbt_assets
definitions, one with a
partitions_def
for the partitioned assets and one without it for the unpartitioned assets. You can then use dbt tags to distinguish which dbt models are partitioned and assign them accordingly to the respective
@dbt_assets
definition using the
select
and
exclude
parameters. Here's a snippet from the documentation that outlines this approach:
Copy code
I worked around this by having two @dbt_asset definitions, one with a partitions_def and one without. I then used dbt tags to mark which dbt models are partitioned daily (partitioned_daily). Then I could assign those models to the partitioned @dbt_asset using the select="tag:partitioned_daily" parameter. The unpartitioned @dbt_asset excludes these models with exclude="tag:partitioned_daily"
This method allows you to ensure that your partitioned Dagster assets map to partitioned dbt assets, and unpartitioned Dagster assets map to unpartitioned dbt assets. If you are encountering errors, it would be helpful to review the error messages and ensure that your job definition correctly separates the partitioned and non-partitioned assets as described. If the errors persist, it may be necessary to look into the specific details of the error messages and the job configuration to diagnose the issue further. dbt project with partitioned and unparti... button Allow more than one custom partition def... button How do I define a job with a multi-parti... button Cannot govern multiple dbt projects with... button Thank you for your feedback!
a
@Scout I'm not sure that answers the question. I have defined the assets like this:
Copy code
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=DagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(["run"], context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()  # raise_on_error=False means all tests will run even if upstream test fails


@dbt_assets(
    manifest=dbt_manifest_path,
    select="tag:partitioned,tag:daily",
    partitions_def=DailyPartitionsDefinition(start_date="2024-02-20"),
    dagster_dbt_translator=DagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    start, end = context.partition_time_window

    dbt_vars = {
        "min_date": start.isoformat(),
        "max_date": end.isoformat(),
    }

    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(["run", "--vars", json.dumps(dbt_vars)], context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
I have then defined the job like this:
Copy code
manifest = json.loads(dbt_manifest_path.read_text())
daily_dbt_asset_selection = DbtManifestAssetSelection.build(
    manifest=manifest,
    select="tag:daily",
)
daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
    # partitions_def=DailyPartitionsDefinition("2024-01-01"),
)
daily_dbt_job_schedule = ScheduleDefinition(
    job=daily_dbt_job,
    cron_schedule="*/1 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)
Yet, when this job runs, it errors:
non_partitioned_dbt_assets - Dependencies for step non_partitioned_dbt_assets failed: ['daily_partitioned_dbt_assets']. Not executing.
s
The error message you're encountering,
non_partitioned_dbt_assets - Dependencies for step non_partitioned_dbt_assets failed: ['daily_partitioned_dbt_assets']. Not executing.
, suggests that there is a dependency issue within your job definition. Specifically, it indicates that the
non_partitioned_dbt_assets
step cannot execute because it depends on the
daily_partitioned_dbt_assets
step, which seems to have failed or not executed. In Dagster, assets can have dependencies on other assets, and if a dependent asset fails or is not executed, it can prevent downstream assets from executing. This is to ensure data consistency and to prevent partial updates when the entire pipeline is considered as a unit of work. Given that you have partitioned and non-partitioned assets, it's important to ensure that the dependencies between them are correctly defined. If
non_partitioned_dbt_assets
is intended to run independently of
daily_partitioned_dbt_assets
, you should make sure that there are no implicit dependencies between them unless that is your intention. Here are a few steps you can take to troubleshoot this issue: 1. Check Asset Dependencies: Verify that the
non_partitioned_dbt_assets
does not unintentionally depend on
daily_partitioned_dbt_assets
. If there is a dependency, you need to ensure that the
daily_partitioned_dbt_assets
executes successfully before
non_partitioned_dbt_assets
can run. 2. Review Asset Selection: Look at the
daily_dbt_asset_selection
and ensure that it is selecting the correct assets for execution. If the selection inadvertently includes assets that should not be part of the job, it could lead to dependency issues. 3. Examine Partition Definitions: Since you have partitioned assets, ensure that the partition definitions are correctly set up and that the job is being run with the correct partition context. If there is a mismatch in the partition context, it could cause the partitioned assets to not execute as expected. 4. Check Asset Materialization: Ensure that the assets are materializing correctly during execution. If an asset fails to materialize, it could cause downstream assets to not execute. 5. Review Job and Schedule Definitions: Double-check the
daily_dbt_job
and
daily_dbt_job_schedule
definitions to ensure they are set up correctly and that the schedule is triggering the job as expected. 6. Inspect Logs and Error Messages: Look at the full logs and any error messages associated with the
daily_partitioned_dbt_assets
to understand why it might be failing or not executing. 7. Isolate the Issue: Try running the partitioned and non-partitioned assets separately to determine if they can execute successfully on their own. This can help isolate whether the issue is with a
specific asset or with the interaction between them. If after these steps the issue persists, you may need to provide more detailed logs or error messages for further diagnosis. It's also possible that there might be a more specific issue with the Dagster setup or the dbt project configuration that requires a deeper investigation.
a
@Scout You are correct, the upstream partitioned asset fails, so the downstream non-partitioned asset errors. The error shown from the partitioned asset is
Failure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
. I can only assume this is because no partition_def is passed to the
define_asset_job
function. However, if I do pass a definition, I get an error from the non-partitioned assets saying it isn't partitioned
s
The error message
Failure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
indicates that there is a mismatch between the expected partition information and what is being provided during the execution of the partitioned asset. This error typically arises when a partitioned asset is expected to run with a specific partition key or range, but none is provided. On the other hand, if you pass a
partitions_def
to the
define_asset_job
function and receive an error from the non-partitioned assets stating that they aren't partitioned, it suggests that the job is being configured in a way that expects all included assets to be partitioned. To resolve this, you can consider the following approaches: 1. Separate Jobs: Create separate jobs for partitioned and non-partitioned assets. This way, you can run each job with the appropriate partition context (or lack thereof) without causing conflicts between partitioned and non-partitioned assets. 2. Partition Mapping: If you need to have both partitioned and non-partitioned assets in the same job, you may need to use a
PartitionMapping
that correctly maps the dependencies between partitioned and non-partitioned assets. For example,
AllPartitionMapping
can be used when a non-partitioned asset depends on all partitions of an upstream partitioned asset. 3. Dynamic Partitioning: If the partitioned and non-partitioned assets must be in the same job and run together, consider using dynamic partitioning, which allows for more flexible partitioning schemes that can accommodate varying partition setups. 4. Job Configuration: Ensure that the job configuration correctly reflects the partitioning scheme of the assets. If only some assets are partitioned, the job should be configured to handle this hybrid setup without imposing partition requirements on non-partitioned assets. 5. Debugging: Run the partitioned assets with explicit partition keys to ensure they work as expected. Then, try running the non-partitioned assets separately. This can help you identify where the configuration is going wrong when combining them. 6. Documentation and Community: Check the Dagster documentation for examples of jobs that include both partitioned and non-partitioned assets. You may also reach out to the Dagster community for advice on handling such cases, as other users may have encountered similar scenarios. Given the complexity of the issue, it may require a detailed review of your Dagster and dbt configurations. If the problem persists, consider providing more context or seeking direct support from the Dagster community or maintainers.
a
@Scout I don't want to separate the jobs. This is running a series of dbt models. It seems trying to take advantage of Dagster's partition paradigm is nullifying some of the benefits of dbt. Could you please, using the example of my code I gave earlier, show me how partition mapping could be used to resolve this?
s
In the context of Dagster, partition mappings are used to define the relationship between the partitions of different assets. However, when dealing with dbt models, the partitioning logic is often handled within dbt itself, particularly when using incremental models. Dagster's partitioning system is designed to work with Dagster assets, and while it can be used to orchestrate dbt runs, the partitioning logic within dbt may not always align neatly with Dagster's expectations for partitioned assets. Given the constraints you've mentioned and the code you've provided, it's important to note that the
define_asset_job
function in Dagster is designed to work with assets that have a uniform partitioning scheme. When you have a mix of partitioned and non-partitioned assets, the job needs to be configured to handle this mix correctly. Unfortunately, the documentation does not provide a direct example of using partition mappings to resolve the specific issue you're facing with a mix of partitioned and non-partitioned dbt models within the same Dagster job. The use of partition mappings is more straightforward when all assets within a job are partitioned in a compatible way. Since the documentation does not offer a clear solution for this scenario, I would recommend the following general debugging steps: 1. Review dbt and Dagster Configurations: Ensure that your dbt models are correctly configured for incremental runs and that the partitioning logic within dbt aligns with the partitioning scheme you're trying to implement in Dagster. 2. Test Partitioned Runs Separately: Try running your partitioned dbt models separately with explicit partition keys to ensure that the partitioning logic within dbt is functioning as expected. 3. Test Non-Partitioned Runs Separately: Similarly, run your non-partitioned dbt models separately to confirm that they execute without requiring partition information. 4. Explore Dagster's Partitioning Documentation: Review Dagster's documentation on partitioning to understand the available partition mappings and how they might be applied to your use case. 5. Seek Community Input: Since the documentation does not provide a specific example for your case, consider reaching out to the Dagster community for advice. Other users or the maintainers may have experience with similar scenarios and can offer guidance. 6. Consider Custom Logic: If Dagster's built-in partitioning system does not support your use case, you may need to implement custom logic to orchestrate the execution of your dbt models, potentially bypassing Dagster's partitioning system for this particular job. If you continue to face challenges with this setup, it may be beneficial to seek direct support from the Dagster community, where you can s
hare more details about your configuration and the errors you're encountering. The community or the maintainers may be able to provide more targeted advice or suggest alternative approaches. dbt project with partitioned and unparti... button Associate each model in dbt to its own `... button Why is my dbt run one block in the my Da... button Intro to ops and jobs, part two: Connect... button Thank you for your feedback!
a
@Scout Is there a way we can override the DagsterDbtTranslator.get_partition_mapping method to try and solve this problem. For example, just consider an upstream asset that is partitioned and a downstream that is not. There seems to be a few implementations of a PartitionMapping in the code base.
s
Yes, you can override the
DagsterDbtTranslator.get_partition_mapping
method to customize the partition mapping for dbt assets in Dagster. This can be useful when you have a mix of partitioned and non-partitioned assets and need to define how partitions should relate to each other. The documentation mentions several implementations of
PartitionMapping
that you can use depending on your use case: - `TimeWindowPartitionMapping`: The default mapping between two
TimeWindowPartitionsDefinitions
. It maps a partition in the downstream partitions definition to all partitions in the upstream asset whose time windows overlap it. - `IdentityPartitionMapping`: Expects that the upstream and downstream assets are partitioned in the same way, and maps partitions in the downstream asset to the same partition in the upstream asset. - `AllPartitionMapping`: Maps every partition in the downstream asset to every partition in the upstream asset. This is commonly used when the downstream asset is not partitioned, and the entire downstream asset depends on all partitions of the upstream asset. - `LastPartitionMapping`: Maps all dependencies to the last partition in the upstream asset. For your specific case, where you have an upstream asset that is partitioned and a downstream asset that is not, you might consider using the
AllPartitionMapping
. This mapping would indicate that the non-partitioned downstream asset depends on all partitions of the upstream partitioned asset. Here's an example of how you might override the
get_partition_mapping
method in a custom `DagsterDbtTranslator`: ```python from dagster_dbt import DagsterDbtTranslator from dagster import AllPartitionMapping, PartitionMapping class CustomDagsterDbtTranslator(DagsterDbtTranslator): @classmethod def get_partition_mapping( cls, dbt_resource_props: Mapping[str, Any], dbt_parent_resource_props: Mapping[str, Any], ) -> Optional[PartitionMapping]: # Assuming dbt_resource_props and dbt_parent_resource_props contain # information about the dbt models and their dependencies, you can # implement custom logic to return the appropriate PartitionMapping. # For example, if the downstream asset is not partitioned and depends # on an upstream partitioned asset, you might return an AllPartitionMapping. # Here's a simple example where we always return an AllPartitionMapping, # but you can include logic to make this decision based on the properties. return AllPartitionMapping() # Then, when defining your dbt assets, you would use your custom translator: @dbt_assets( manifest=dbt_manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), # ... other parameters ... ) def my_dbt_a
ssets(): # ... your dbt asset definitions ... ``` Please note that the actual implementation of
get_partition_mapping
may need to be more complex, depending on the relationships between your dbt models and how you want to handle partition dependencies. Keep in mind that the use of custom partition mappings and translators should be carefully tested to ensure that they behave as expected in your Dagster jobs. If you encounter any issues or need further guidance, the Dagster community and maintainers can be a valuable resource.
a
@Scout We seem to have it covered when the downstream is not partitioned and the upstream is. AllPartitionMapping lets us say the downstream depends on all partitions of the upstream (fan-in). It isn't obvious how we handle the reverse case, i.e. upstream is not partitioned and downstream is. Also, it is worth noting for handling these partition mappings, we have 4 cases: • Upstream partitioned, downstream not • Downstream partitioned, upstream not • Both upstream and downstream partitioned • Neither upstream or downstream partitioned
s
In Dagster, handling the relationship between partitioned and non-partitioned assets can be managed using partition mappings. The documentation provides guidance on how to handle different scenarios: 1. Upstream partitioned, downstream not: As you mentioned,
AllPartitionMapping
is used to indicate that the downstream asset depends on all partitions of the upstream asset. This is a fan-in scenario where the downstream asset is not partitioned. 2. Downstream partitioned, upstream not: For this scenario, the documentation does not provide a specific partition mapping. However, the default behavior in Dagster when a partition mapping is not specified is to assume that each partition of the downstream asset depends on the unpartitioned upstream asset. This is a fan-out scenario. 3. Both upstream and downstream partitioned: If both assets are partitioned, you can use
IdentityPartitionMapping
if they are partitioned in the same way, or
TimeWindowPartitionMapping
to map partitions based on overlapping time windows. The specific mapping to use would depend on how the partitions in the upstream and downstream assets relate to each other. 4. Neither upstream nor downstream partitioned: If neither asset is partitioned, there is no need for a partition mapping, as the assets would simply depend on each other without any partition context. For the case where the upstream is not partitioned and the downstream is, you typically don't need to do anything special. Each partition of the downstream asset will depend on the unpartitioned upstream asset by default. If you need to customize this behavior, you would likely need to implement custom logic within your asset definitions to handle the dependency relationship. It's important to note that while the documentation provides guidance on common scenarios, there may be cases that require custom solutions. If you find that the provided partition mappings do not cover your use case, or if you need to implement a more complex dependency relationship, you may need to reach out to the Dagster community for support or consider implementing custom logic within your asset definitions. Remember to test any custom partition mappings or logic thoroughly to ensure that they work as expected within your Dagster jobs.
a
@Scout Ok, consider I have implemented this and have created an asset selection of dbt models that contains both partitioned and non-partitioned assets. How would you define a job? Do we need to include a partition_def in it?
s
When defining a job that includes both partitioned and non-partitioned dbt models, you need to consider how the job will be executed, especially in terms of partitioning. The job definition should align with the partitioning scheme of the assets it is intended to materialize. According to the documentation, when defining an asset job using
define_asset_job
, you can specify a
selection
parameter to determine which assets the job will materialize. However, the
partitions_def
parameter is optional. If you include a
partitions_def
in the job definition, it implies that the job is intended to materialize partitioned assets, and you would typically run the job for specific partitions. Here's how you might define a job for a mix of partitioned and non-partitioned dbt models:
Copy code
python from dagster import define_asset_job, AssetSelection  # Define an asset selection that includes both partitioned and non-partitioned dbt models asset_selection = AssetSelection.assets(     # ... include your dbt assets here ... )  # Define the job without specifying a partitions_def dbt_job = define_asset_job(     name="dbt_job",     selection=asset_selection, )  # If you have partitioned assets, you would typically run the job for specific partitions # using the run configuration or through schedules or sensors that provide the partition key.
In this job definition, we do not specify a
partitions_def
because the job includes both partitioned and non-partitioned assets. When you run the job, Dagster will materialize all selected assets. For partitioned assets, you would specify which partition(s) to materialize in the run configuration or through schedules or sensors that trigger the job. If you have a schedule or sensor that triggers this job, you would handle the partitioning logic there. For example, a schedule could be set up to run the job for the latest partition of the partitioned assets, while still materializing the non-partitioned assets. Keep in mind that the job's behavior regarding partitioning should be consistent with the partitioning logic defined in your dbt models. If your dbt models are set up to handle incremental runs based on partitioning, ensure that the job's run configuration aligns with those expectations. If you encounter any issues or have specific requirements that are not covered by the standard job definition, you may need to seek further guidance from the Dagster community or explore custom solutions.
a
@Scout This doesn't seem desirable. dbt has a definite lineage and order to materialising models. You seem to be suggesting I have two jobs, one that executes the non-partitioned assets and those that executes the partitioned assets. Perhaps we could ignore dbt for now, create the hello world of assets and partitioned assets. Then show me how I could run them in a single job.
s
Certainly! Let's create a simple example with both partitioned and non-partitioned assets and then define a single job that can run them together. First, we'll define a non-partitioned asset:
Copy code
python from dagster import asset  @asset def non_partitioned_asset():     return "non_partitioned_data"
Next, we'll define a partitioned asset. For simplicity, let's assume daily partitions:
Copy code
python from dagster import asset, DailyPartitionsDefinition, PartitionKey  @asset(partitions_def=DailyPartitionsDefinition(start_date="2023-01-01")) def partitioned_asset(context):     # The context provides the partition key, which you can use to customize behavior     partition_key = context.partition_key     return f"partitioned_data_for_{partition_key}"
Now, we'll define a job that includes both assets. Since the job includes both partitioned and non-partitioned assets, we won't specify a
partitions_def
in the job definition. Instead, we'll use the
AssetSelection
to include both assets:
Copy code
python from dagster import define_asset_job, AssetSelection  # Define an asset selection that includes both the partitioned and non-partitioned assets asset_selection = AssetSelection.assets(     non_partitioned_asset,     partitioned_asset, )  # Define the job without specifying a partitions_def my_asset_job = define_asset_job(     name="my_asset_job",     selection=asset_selection, )
When you run
my_asset_job
, Dagster will materialize the non-partitioned asset without any partition context. For the partitioned asset, you will need to specify which partition you want to materialize when you trigger the job run. This can be done through the Dagit UI, the Dagster GraphQL API, or programmatically using the
execute_in_process
method with the appropriate partition key. Here's an example of how you might run the job for a specific partition programmatically:
Copy code
python partition_key = "2023-01-01" my_asset_job.execute_in_process(partition_key=PartitionKey(partition_key))
In this example, the non-partitioned asset will run as usual, and the partitioned asset will run for the specified partition key. Please note that this is a simplified example to illustrate the concept. In a real-world scenario, you would need to handle the partitioning logic according to your specific data and processing requirements. If you have complex dependencies or need to handle different partitioning schemes, you may need to implement custom logic or use more advanced features of Dagster's partitioning system.
a
@Scout Ok, so going back to dbt, I implemented this:
Copy code
class CustomDagsterDbtTranslator(DagsterDbtTranslator):    
    def get_partition_mapping(cls, dbt_resource_props, dbt_parent_resource_props) -> Optional[PartitionMapping]:
        downstream_incremental = "partitioned" in dbt_resource_props.get("config").get("tags", [])
        upstream_incremental = "partitioned" in dbt_parent_resource_props.get("config").get("tags", [])
        # print(json.dumps(dbt_parent_resource_props.get("config")))
        
        print(f"downstream: {dbt_resource_props.get('name')} upstream: {dbt_parent_resource_props.get('name')}")
        if downstream_incremental and upstream_incremental:
            # downstream IS partitioned, upstream IS partitioned
            print("both")
            return IdentityPartitionMapping()
        elif downstream_incremental and not upstream_incremental:
            # downstream IS partitioned, upstream NOT partitioned
            print("downstream partitioned")
            return None
            # return AllPartitionMapping()  # not sure on this
        elif not downstream_incremental and upstream_incremental:
            # downstream NOT partitioned, upstream IS partitioned
            print("upstream partitioned")
            return AllPartitionMapping()
            # return AllPartitionMapping()  # not sure on this
        
        print("neither")
            
        return None  # neither partitioned
Where I am using a tag I set on the dbt model to decipher whether to map the dbt model to be a partitioned asset or not. However, this still does not fix anything. It is worth noting if I manually run this job, it requests in UI I select some partitions and then beyond that is runs absolutely fine.
s
The custom
DagsterDbtTranslator
you've implemented is a step in the right direction for handling the relationship between partitioned and non-partitioned dbt models in Dagster. The
get_partition_mapping
method is used to determine the partition mapping between dbt assets based on their tags. From your description, it seems that the job runs fine when you manually select partitions in the UI. This indicates that the partition selection is working as expected when explicitly provided. However, the issue might be related to how the job is triggered automatically (e.g., via a schedule or sensor) without manual partition selection. Here are a few things to consider: 1. Automatic Partition Selection: When triggering the job automatically, you need to ensure that the correct partition information is being passed to the job. This could be done through a schedule or sensor that computes the appropriate partition keys based on the current time or other logic. 2. Job Definition: Ensure that the job is defined in a way that allows for both partitioned and non-partitioned assets to be materialized. Since you have a mix of partitioned and non-partitioned dbt models, the job should not have a
partitions_def
specified, as this would imply that all assets in the job are expected to be partitioned. 3. Partition Mapping: The
get_partition_mapping
method should return the correct
PartitionMapping
based on the tags. It looks like you're returning
None
when the downstream is partitioned and the upstream is not, which implies that the default partition mapping will be used. The default behavior is typically to map partition keys to the same partition keys in upstream assets, which may not be what you want in this case. You might need to return a specific
PartitionMapping
that reflects the desired behavior. 4. Testing: Test the job execution with different scenarios (e.g., different partition keys, different combinations of partitioned and non-partitioned models) to ensure that the partition mappings are working as expected. 5. Logging and Debugging: Use logging to help debug the partition mappings. You've already added print statements in your
get_partition_mapping
method, which can be helpful. Ensure that the logs are giving you the correct information about the relationships between assets. 6. Dagster Community: If the issue persists, consider reaching out to the Dagster community for support. Other users or the maintainers may have encountered similar scenarios and can offer guidance or solutions. Remember that the partition mappings are there to define dependencies between assets. If the automatic triggering of the job is not working as expected, it's likely an issue with how the partitions are being determined or passed
to the job when it's not being run manually. You may need to adjust your schedule or sensor logic to provide the correct partition information when the job is triggered.
a
@Scout Ok, forgetting the class I implemented for now, how can I implement the first two points you made? To be clear, without the class I implemented and defining a job without the
partition_def
, since there is a mixture of partitioned and unpartitioned assets within it, you get the following error:
Failure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
This makes sense when you consider manually running the job, you are prompted to pick the partitions for which you wish to run the job. It works fine in that scenario.
s
The error you're encountering,
Failure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
, suggests that Dagster expects partition information for the job because it contains partitioned assets, even though the job itself was not defined with a
partitions_def
. To address this, you can implement the following: 1. Automatic Partition Selection: Create a schedule or sensor that provides the necessary partition information when triggering the job. This can be done by computing the partition keys based on the current time or other logic and passing them to the job when it runs. Here's an example of how you might define a schedule that computes the partition key for the current day and triggers the job with that partition key:
Copy code
python
from dagster import ScheduleDefinition, daily_schedule
from datetime import datetime

@daily_schedule(
    pipeline_name='my_dbt_pipeline',
    start_date=datetime(2023, 1, 1),
    execution_timezone='UTC'
)
def my_daily_schedule(date):
    # Compute the partition key for the current day
    partition_key = date.strftime('%Y-%m-%d')
    # Return the run configuration with the partition key
    return {
        'solids': {
            'my_partitioned_solid': {
                'config': {
                    'partition_key': partition_key
                }
            }
        }
    }

# Assuming 'my_dbt_pipeline' is the name of your pipeline that includes both partitioned and non-partitioned assets
2. Job Definition Without
partitions_def
: Define the job without a
partitions_def
and ensure that the schedule or sensor provides the partition key when triggering the job. This way, the job itself remains agnostic to partitioning, but the schedule or sensor handles the partition logic.
Copy code
python
from dagster import define_asset_job, AssetSelection

# Define an asset selection that includes both the partitioned and non-partitioned assets
asset_selection = AssetSelection.assets(
    # ... include your dbt assets here ...
)

# Define the job without specifying a partitions_def
my_asset_job = define_asset_job(
    name="my_asset_job",
    selection=asset_selection,
)

# The schedule defined above will trigger this job with the appropriate partition key
By using a schedule or sensor to provide the partition key, you can ensure that the partitioned assets within the job receive the necessary partition information for execution, while the non-partitioned assets simply run as usual. Please note that the examples provided are for illustrative purposes and may need to be adjusted to fit your specific use case and Dagster version. If you're using a newer version of Dagster, the way you define schedules or sensors might differ, and you should refer to the lat
est documentation for the correct syntax and best practices.
a
@Scout I'm not really following this. Firstly, your answer seems out of date: use of
solids
has been phased out of Dagster for a long time. In addition, can you explain what you are trying to do? Is it: • Create a job without a partition_def, this contains both partitioned and unpartitioned assets • Schedule something that computes the current partition and then triggers the job with the relevant partition?
s
Apologies for any confusion. Let's clarify the approach with the updated concepts in Dagster. The goal is to create a job that includes both partitioned and unpartitioned assets and then use a schedule to trigger this job, providing the necessary partition information for the partitioned assets. Here's how you can achieve this: 1. Define a Job Without
partitions_def
: Create a job that includes both partitioned and unpartitioned assets without specifying a
partitions_def
. This job will be able to run both types of assets.
Copy code
python
from dagster import define_asset_job, AssetSelection

# Define an asset selection that includes both the partitioned and non-partitioned assets
asset_selection = AssetSelection.assets(
    # ... include your dbt assets here ...
)

# Define the job without specifying a partitions_def
my_asset_job = define_asset_job(
    name="my_asset_job",
    selection=asset_selection,
)
2. Create a Schedule to Trigger the Job: Define a schedule that computes the current partition key based on the current time or other logic. This schedule will trigger the job and provide the partition key for the partitioned assets.
Copy code
python
from dagster import ScheduleDefinition, daily_schedule
from datetime import datetime

@daily_schedule(
    job=my_asset_job,
    start_date=datetime(2023, 1, 1),
    execution_timezone='UTC'
)
def my_daily_schedule(context):
    # Compute the partition key for the current day
    partition_key = context.scheduled_execution_time.strftime('%Y-%m-%d')
    # Return the run configuration with the partition key
    return {
        'ops': {
            'my_partitioned_op': {
                'config': {
                    'partition_key': partition_key
                }
            }
        }
    }
In this example,
my_daily_schedule
is a schedule that triggers
my_asset_job
daily. The
context.scheduled_execution_time
provides the time at which the schedule is set to run, which you can use to compute the partition key. The run configuration returned by the schedule should specify the partition key for the partitioned assets. Please note that the
ops
key in the run configuration should be replaced with the appropriate key for your partitioned assets. If your partitioned assets are defined using the
@asset
decorator, you may not need to specify the
ops
key at all, and the partition key would be provided directly in the run configuration. This approach allows you to have a single job that handles both partitioned and unpartitioned assets, with a schedule that provides the necessary partition information for the partitioned assets at runtime. The non-partitioned assets will run as usual without any partition context. Make sure to adjust the code to match the actual names of your assets
and the structure of your project. If you have any further questions or need additional clarification, please let me know.
a
@Scout Ok, this is making more sense, however, I'm not sure what should be returned. We can calculate the daily partition key from the schedule context. However, given the underlying is an asset generated from the dbt_assets decorator, what should be in the response? Where can I find this out?
s
When using the
@dbt_assets
decorator to define dbt models as Dagster assets, and you have a mix of partitioned and non-partitioned assets, you typically don't need to specify the partition key in the run configuration for non-partitioned assets. Dagster will handle the execution of non-partitioned assets without any partition context. For partitioned assets, you need to provide the partition key when triggering the job so that Dagster knows which partition of the asset to materialize. The schedule's responsibility is to compute the partition key and pass it to the job when it triggers a run. The documentation provides guidance on how to access partition information within an asset when it's being materialized. For example, you can use
context.partition_key
within the asset function to get the partition key for the current run. Here's an example of how you might define a schedule that triggers a job with partitioned dbt assets:
Copy code
python from dagster import ScheduleDefinition, daily_schedule from datetime import datetime  @daily_schedule(     job=my_asset_job,     start_date=datetime(2023, 1, 1),     execution_timezone='UTC' ) def my_daily_schedule(context):     # Compute the partition key for the current day     partition_key = context.scheduled_execution_time.strftime('%Y-%m-%d')     # Return the run configuration with the partition key     return {         'resources': {             'dbt': {                 'config': {                     'vars': {                         'partition_key': partition_key,                     }                 }             }         }     }
In this example, the schedule computes the partition key and includes it in the run configuration under the
vars
key, which is passed to the dbt resource. The dbt resource can then use this variable to run dbt models for the specified partition. Please note that the exact structure of the run configuration may vary depending on how your dbt resource and assets are set up. You should refer to the documentation for the
dbt_assets
decorator and the dbt resource you are using to ensure that you are providing the partition key in the format expected by your dbt models. If you need to pass the partition key to dbt in a different way, such as through command-line arguments or environment variables, you would adjust the run configuration accordingly. Remember to test the schedule to ensure that it correctly triggers the job with the appropriate partition key and that your dbt models are receiving and using the partition key as expected. If you encounter any issues, you may need to adjust the run configuration or consult the documentation for more details on how to configure dbt assets with partitioning in Dagster.
a
@Scout This isn't true, we are discussing this precisely due to the current dbt implementation not handling partitioned and unpartitioned assets. As I stated prior, when we run a job with both kinds of assets in manually, we have to specify the partitions and it works fine. When we define a job without a partition definition, it fails. We shouldn't want to add a partition definition because all assets in the job are not partitioned. Your solution was to schedule a function which calculates the current partition, i.e. today and then triggers the underlying job with the correct partition key being passed. I am using the schedule decorator too, not the old daily_schedule one. Knowing this now, how do I write the schedule decorated function to both get the correct partition key and then pass the information to the job
s
I apologize for the confusion. Let's address the issue with the correct approach using the
@schedule
decorator, which is the updated way to define schedules in Dagster. When you have a job that includes both partitioned and unpartitioned dbt assets, and you want to trigger this job with a schedule, you need to compute the partition key for the partitioned assets and pass it to the job at runtime. Here's how you can define a schedule using the
@schedule
decorator that computes the partition key and triggers the job with that key:
Copy code
python from dagster import schedule, RunRequest from datetime import datetime  @schedule(     cron_schedule="0 0 * * *",  # This is a daily schedule at midnight     job=my_asset_job,     execution_timezone='UTC' ) def my_daily_schedule(context):     # Compute the partition key for the current day     partition_key = datetime.now().strftime('%Y-%m-%d')          # Construct the run configuration with the partition key     run_config = {         'resources': {             'dbt': {                 'config': {                     'vars': {                         'partition_key': partition_key,                     }                 }             }         }     }          # Return a RunRequest to trigger the job with the computed partition key     return RunRequest(run_config=run_config)
In this example, the
my_daily_schedule
function is decorated with
@schedule
and is set to run daily at midnight UTC. The function computes the partition key for the current day and constructs a
run_config
dictionary that includes the partition key under the
vars
key, which is how dbt accepts runtime variables. The
RunRequest
object is returned with the
run_config
to trigger the job with the necessary partition information for the partitioned assets. The unpartitioned assets will run as usual without any partition context. Please adjust the
run_config
as necessary to match the configuration expected by your dbt assets and resources. The structure of the
run_config
may vary depending on your specific dbt setup. This approach allows you to trigger a job that includes both partitioned and unpartitioned dbt assets with the correct partition information for the partitioned assets. Make sure to test the schedule to ensure that it triggers the job correctly and that the partitioned dbt models are executed with the appropriate partition key.
a
@Scout This is the bit that doesn't make sense:
Copy code
run_config = {
        'resources': {
            'dbt': {
                'config': {
                    'vars': {
                        'partition_key': partition_key,
                    }
                }
            }
        }
    }
That to me looks like you are setting a var in dbt, i.e. a model is parameterised. This makes sense, somewhat, but the issue we are having is dagster needing a partition key, not dbt?
s
I apologize for the confusion. You are correct that the
run_config
snippet I provided is for setting a variable in dbt, which would be used if your dbt models are parameterized to accept a
partition_key
variable. However, the issue you're facing is with Dagster needing a partition key for the partitioned assets when the job is triggered. To address this, you need to provide the partition key to Dagster in a way that it understands which partitions of the partitioned assets to materialize. In Dagster, when you define a job that includes partitioned assets, you can use the
with_resources
method on the job to provide the resources configuration, including any dbt-specific variables, if needed. However, for the partition key, you would typically use the
run_config
to specify which partitions to materialize. Since the
@schedule
decorator is used to trigger a job, and you need to provide the partition key for the partitioned assets, you would use the
run_config
to specify the partition key for the Dagster job, not for dbt. The
run_config
should include the necessary information for Dagster to know which partitions of the partitioned assets to run. Here's an updated example of how you might define a schedule that computes the partition key and triggers the job with that key:
Copy code
python
from dagster import schedule, RunRequest
from datetime import datetime

@schedule(
    cron_schedule="0 0 * * *",  # This is a daily schedule at midnight
    job=my_asset_job,
    execution_timezone='UTC'
)
def my_daily_schedule(context):
    # Compute the partition key for the current day
    partition_key = datetime.now().strftime('%Y-%m-%d')
    
    # Return a RunRequest to trigger the job with the computed partition key
    return RunRequest(
        run_key=partition_key,
        run_config={
            # Include any necessary run configuration for the job here
        },
        tags={
            "dagster/partition": partition_key,
        }
    )
In this updated example, the
RunRequest
includes a
run_key
and
tags
that specify the partition key. The
run_config
is left as a comment because it may or may not be necessary to include additional configuration depending on your job's requirements. When the schedule triggers the job, Dagster will use the partition key provided in the
tags
to determine which partitions of the partitioned assets to materialize. The unpartitioned assets will run as usual. Please ensure that your job and assets are set up to handle the partition key provided in the
tags
. If your partitioned assets are defined using the
@asset
decorator with a
partitions_def
, Dagster will use the partition key to materialize the correct partitions. Again, I apologize for any confusion, and I hope this clarifies how to us
e the
@schedule
decorator to trigger a job with partitioned assets in Dagster.
a
@Scout Ok, this makes more sense. I have the following:
Copy code
manifest = json.loads(dbt_manifest_path.read_text())
daily_dbt_asset_selection = DbtManifestAssetSelection.build(
    manifest=manifest,
    select="tag:daily",
)
daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
)


@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
    partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return RunRequest(
        partition_key=partition_key,
    )
However, I am getting an error from the schedule: `dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key
2024-02-29
.`. This partition does not exist, however, I naively assumed we could calculate todays partition this way? I'm not sure why this wouldn't work?
s
The error you're encountering,
dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key 2024-02-29
, indicates that Dagster is trying to find a partition with the specified key in the partition set defined for your assets, but it cannot find one. This could happen if the partition key you're computing does not correspond to any of the defined partitions for your assets. When you define partitions for an asset in Dagster, you specify a range of valid partition keys (e.g., dates). If you attempt to run a job with a partition key that is not within this range, Dagster will raise an error because it expects all partition keys to be predefined and known ahead of time. In your schedule function
daily_configurable_job
, you are computing the partition key as today's date using
context.scheduled_execution_time.strftime("%Y-%m-%d")
. This approach is generally correct for generating a partition key for the current day. However, if the current day's partition key has not been defined in the partition set for your assets, Dagster will not be able to find the partition and will raise the
DagsterUnknownPartitionError
. To resolve this issue, you need to ensure that the partition key you're generating corresponds to a valid partition in your partition set. Here are a few things to check: 1. Partition Set Definition: Make sure that the
DailyPartitionsDefinition
(or other partition definitions) you're using for your assets includes the current day's date. If you're using a static partition set, you may need to update it to include new partitions as time goes on. 2. Partition Generation: If you're using dynamic partitioning, ensure that the logic for generating partitions is correct and includes the current day. 3. Time Zone: Be aware of time zone differences. The
scheduled_execution_time
is in UTC by default, and if your partitions are defined in a different time zone, you may need to adjust the computation of the partition key accordingly. 4. Leap Year: The date
2024-02-29
is a leap day. Ensure that your partition set accounts for leap years if you're running this schedule in a leap year. 5. Validation: Before returning the
RunRequest
, you can add validation logic to check if the computed partition key exists in the partition set. If it doesn't, you can handle the situation appropriately, such as by skipping the run or logging a warning. If you continue to face issues, you may need to review the partition definitions for your assets and ensure that they are set up to include all expected partition keys. If necessary, you can also reach out to the Dagster community for further assistance.
a
@Scout
Copy code
@dbt_assets(
    manifest=dbt_manifest_path,
    select="tag:partitioned,tag:daily",
    partitions_def=daily_partition,
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    start, end = context.partition_time_window

    dbt_vars = {
        "min_date": start.isoformat(),
        "max_date": end.isoformat(),
    }

    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(["run", "--vars", json.dumps(dbt_vars)], context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
Above is my definition of the partitioned assets. I assume the max_date is set to yesterday? How else should I define this?
s
In the code snippet you provided, the
daily_partitioned_dbt_assets
function is defined as a set of dbt assets that are partitioned daily. The
context.partition_time_window
provides a start and end datetime object that represents the time window for the current partition being materialized. The
start
and
end
variables are set to the start and end of the partition's time window, respectively. When you use
start.isoformat()
and
end.isoformat()
, you are converting these datetime objects to ISO 8601 string representations, which are then passed as variables to dbt. The
min_date
and
max_date
variables are used to parameterize your dbt models. For example, if you have incremental models in dbt that filter data based on a date range, you would use these variables to ensure that each run of the dbt models only processes data for the specific time window of the current partition. Regarding the
max_date
, it is not necessarily set to "yesterday" by default. The actual value of
max_date
depends on how the
DailyPartitionsDefinition
is set up and how the partition time window is calculated for each partition. If you want to ensure that
max_date
is always set to "yesterday," you would need to adjust the partition definition or the logic in your asset function to calculate the end of the time window accordingly. Here's an example of how you might adjust the
end
time to ensure it represents "yesterday":
Copy code
python from datetime import datetime, timedelta  def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):     # Get the current date and time     now = datetime.now()     # Calculate "yesterday" by subtracting one day from the current date     yesterday = now - timedelta(days=1)     # Convert "yesterday" to ISO format     max_date = yesterday.isoformat()      # Use the start of the partition time window as the min_date     min_date = context.partition_time_window.start.isoformat()      dbt_vars = {         "min_date": min_date,         "max_date": max_date,     }      # ... rest of your dbt asset function ...
In this adjusted example,
max_date
is explicitly set to "yesterday," regardless of the partition time window. You would need to ensure that this logic aligns with your data processing requirements and the expectations of your dbt models. Remember to test your asset definitions to verify that the
min_date
and
max_date
variables are being set correctly and that your dbt models are using these variables as intended. If you have any further questions or need additional clarification, please let me know.
a
@Scout Sorry, to be clear, I am saying the code I gave you has resulted in partitions defined daily from 25th Feb 2024 to 28th Feb 2024. Today is the 29th Feb 2024, so it looks like it does use yesterdays date by default for the max date. What is recommended? If we make it so the partitions include today, then running a materialisation of that partition will miss the rest of the day. Of course, we could just always run for the last X partitions to account for this while enabling people to look at the latest data when they run it? Does this make sense?
s
Based on the documentation, the
DailyPartitionsDefinition
by default creates partitions up to the current time, excluding the end date. If
end_offset
is set to 0 (the default), the last partition ends before the current time. This means that if you run a materialization for the current day's partition, it will not include data for the entire day, as the partition represents the time window up to the start of the current day. To handle this, you have a couple of options: 1. Run for the Last X Partitions: As you suggested, you can run materializations for the last X partitions to ensure that you're capturing the most recent complete data. This approach allows you to include the latest available data while acknowledging that the current day's partition will not be complete until the day is over. 2. Include Today's Partition with an Offset: If you want to include today's partition, you can adjust the
end_offset
parameter when defining your
DailyPartitionsDefinition
. Setting
end_offset
to 1 will extend the partitions forward so that the schedule tick that runs on day N will fill in day N's partition instead of day N-1. However, keep in mind that materializing today's partition before the day is over will result in partial data for the current day. 3. Materialize After the Day Ends: Another approach is to schedule the materialization to occur after the end of the day, ensuring that the partition for the current day is complete. This could be done by scheduling the materialization to run early in the morning of the following day. 4. Dynamic Partitioning: If your data ingestion pattern requires more flexibility, you might consider using a
DynamicPartitionsDefinition
to create partitions dynamically based on the data available. This allows you to define partitions on the fly and can be useful for scenarios where the data arrival pattern is irregular. It's important to choose an approach that aligns with your data processing requirements and the expectations of your downstream consumers. If the data for the current day is needed as soon as possible, you might opt for more frequent materializations with the understanding that the data will be partial. If complete data is required, you would wait until the day is over before materializing the partition for that day. Remember to clearly communicate the partitioning strategy to users of the data so they understand the implications of the chosen approach on the completeness and freshness of the data.
a
@Scout Great, I have it working now with partitioned and unpartitioned assets. Final question, given this:
Copy code
daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
)


@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
    partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return RunRequest(
        partition_key=partition_key,
    )
Can we also configure the job with a config_schema such that I could parameterise the job so that I can modify the underlying dbt assets say every sunday to run a full refresh? Focusing on the unpartitioned assets, they are defined like this:
Copy code
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(["run"], context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()  # raise_on_error=False means all tests will run even if upstream test fails
To keep things simple, I simply want to add
--full-refresh
arg to the
run
dbt command. I know the RunRequest object can accept a run_config.
s
To parameterize the job to allow for a full refresh of the dbt assets on a specific schedule, such as every Sunday, you can define a
config_schema
for your job that includes a configuration option for a full refresh. Then, you can use this configuration in your schedule to conditionally add the
--full-refresh
argument to the dbt command. Here's how you can define a
config_schema
for your job and use it in the schedule: 1. Define a
config_schema
for your job that includes a
full_refresh
option:
Copy code
python
from dagster import Bool, Field, define_asset_job, AssetSelection

daily_dbt_asset_selection = AssetSelection(...)

daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
    config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)},
)
2. Modify your dbt asset function to accept a configuration and conditionally add the
--full-refresh
argument based on the
full_refresh
configuration:
Copy code
python
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    # Check if full_refresh is set in the job's configuration
    full_refresh = context.op_config.get("full_refresh", False)
    
    # Add --full-refresh argument conditionally
    dbt_run_args = ["run"]
    if full_refresh:
        dbt_run_args.append("--full-refresh")
    
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(dbt_run_args, context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Update your schedule to include the
run_config
with the
full_refresh
option set to
True
on Sundays:
Copy code
python
from dagster import schedule, RunRequest
from datetime import datetime

@schedule(job=daily_dbt_job, cron_schedule="0 0 * * *")  # This runs daily at midnight
def daily_configurable_job(context):
    # Compute the partition key for the current day
    partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
    
    # Determine if today is Sunday
    is_sunday = context.scheduled_execution_time.weekday() == 6  # 6 is Sunday
    
    # Set full_refresh to True if today is Sunday
    run_config = {"full_refresh": is_sunday}
    
    return RunRequest(
        run_key=partition_key,
        run_config=run_config,
    )
In this example, the
daily_configurable_job
schedule checks if the current day is Sunday and sets the
full_refresh
configuration accordingly. The
RunRequest
includes this configuration, which is then used by the dbt asset function to determine whether to add the `--full-
refresh` argument to the dbt command. Please ensure that the
context.op_config
access in the dbt asset function matches the way you have configured your job and assets. The exact key to access the configuration may vary depending on your setup. This approach allows you to have a configurable job that can perform a full refresh of dbt assets based on the schedule's logic, providing flexibility in how and when dbt models are materialized.
a
@Scout This makes sense. However, would you rather have two jobs? One called daily and another called sunday_full_refresh for example? We then schedule the daily for all days except sunday and then the sunday job for just sunday. Going beyond that, can you explain the config_schema a little more. Is it related to the higher-level asset config for which we create classes and pass it via the asset function signature? Would we ever define config_schemas as standalone objects or always within the define_asset_job function signature?
s
Whether to have two separate jobs or a single configurable job depends on the complexity of your workflow and your preference for managing schedules and configurations. Here are some considerations for each approach: 1. Two Separate Jobs: Having a
daily
job and a
sunday_full_refresh
job can make the scheduling logic simpler and more explicit. Each job has its own dedicated schedule, and there's no need for conditional logic within the schedule to determine the behavior. This can be easier to understand and maintain, especially for teams with multiple members who might be managing the schedules. 2. Single Configurable Job: A single job with a
config_schema
that includes a
full_refresh
option can be more flexible and reduce duplication in job definitions. The schedule's logic determines whether to perform a full refresh based on the day of the week. This approach centralizes the logic for the full refresh in one place, which can be beneficial if the logic for determining a full refresh is more complex than just the day of the week. Regarding the
config_schema
, it is a way to define the configuration options for a job or asset in Dagster. The
config_schema
specifies the shape and type of the configuration data that can be passed to a job or asset when it runs. This configuration is then accessible within the job or asset through the execution context. The
config_schema
is typically defined within the
define_asset_job
function signature or the
@asset
decorator for assets. It can also be defined for ops using the
@op
decorator. The schema is defined using Dagster's field system, which allows you to specify types, defaults, and validation for configuration values. Here's an example of defining a
config_schema
within the
define_asset_job
function:
Copy code
python
from dagster import Field, Int, define_asset_job, AssetSelection

my_asset_selection = AssetSelection(...)

my_job = define_asset_job(
    "my_job",
    selection=my_asset_selection,
    config_schema={
        "example_config": Field(Int, is_required=False, default_value=42),
    },
)
In this example, the
config_schema
for
my_job
includes an
example_config
option, which is an integer with a default value of 42. You wouldn't typically define
config_schema
as a standalone object; it's usually defined in the context of a job, asset, or op where it's used to specify the configuration options for that entity. The higher-level asset config that you mentioned, where you create classes and pass them via the asset function signature, is related to the
config_schema
in that it defines the configuration for an asset. However, the
config_schema
is more about defining the structure and validation of the configuration data, while the classes you pass to the ass
et function are about how you use that configuration data within the asset's logic. In summary, the choice between separate jobs or a single configurable job depends on your workflow needs and preferences. The
config_schema
is a way to define configuration options for jobs and assets, and it's typically defined within the context of the job or asset definition.
a
@Scout Can you show me the basics of a sensor in dagster. Do we always return a run request (assuming the condition has been met). Please give a simple example.
s
In Dagster, a sensor is a function that periodically checks for certain conditions and triggers a job run when those conditions are met. A sensor can return a
RunRequest
if the conditions for a run are met, or it can return a
SkipReason
if there is no work to be done. You can also return
None
if you want to skip without providing a reason. Here's a simple example of a sensor that triggers a job run every time it evaluates:
Copy code
python from dagster import sensor, RunRequest, DefaultSensorStatus  @sensor(job=my_job) def my_simple_sensor(context):     # You can access the sensor's context here, if needed     # For example, you can read from the context to get the last run time     # last_run_time = context.last_run_time      # If the conditions are met, return a RunRequest to trigger a job run     return RunRequest(         run_key="unique_run_key",  # A unique identifier for this run request         run_config={},  # The run configuration for the job     )
In this example,
my_job
is the job that you want to trigger with this sensor. The
run_key
is a unique identifier for the run request, which helps Dagster to deduplicate runs and ensure that the same conditions don't trigger multiple runs. The
run_config
is the configuration that will be used for the job run; it should match the
config_schema
defined for the job. If you want the sensor to only trigger under certain conditions, you can add logic to check for those conditions and only return a
RunRequest
when they are met. If the conditions are not met, you can return a
SkipReason
or `None`:
Copy code
python from dagster import sensor, RunRequest, SkipReason  @sensor(job=my_job) def my_conditional_sensor(context):     # Logic to check if conditions are met     if are_conditions_met():         # Conditions are met, trigger a job run         return RunRequest(             run_key="unique_run_key",             run_config={},         )     else:         # Conditions are not met, skip this evaluation         return SkipReason("Conditions are not met for triggering the job.")
In this conditional sensor,
are_conditions_met()
is a placeholder for your actual condition-checking logic. If the function returns
True
, a
RunRequest
is returned, and if it returns
False
, a
SkipReason
is returned. Sensors are evaluated at a regular interval defined by the
minimum_interval_seconds
parameter, which defaults to 30 seconds. You can adjust this interval to control how often the sensor checks for conditions. Remember to replace
my_job
with your actual job definition and to implement the
are_conditions_met
function with the logic specific to your use case.
a
@Scout actually going back to the previous point, define_asset_job does not have an argument for config_schema?
s
You are correct; the
define_asset_job
function does not have an argument for
config_schema
. The
config_schema
is used at the asset or op level to define the configuration for those components. When you define an asset job using
define_asset_job
, it will automatically include all the assets within the provided selection, and the configuration for those assets is defined at the individual asset level. If you want to provide configuration for the assets within a job, you would do so by passing a
run_config
when you execute the job. The
run_config
should match the
config_schema
defined on the individual assets. Here's an example of how you might define an asset with a
config_schema
and then provide a
run_config
when executing the job:
Copy code
python from dagster import asset, Field, define_asset_job, AssetSelection  @asset(config_schema={"param": Field(str)}) def my_asset(context):     param = context.op_config["param"]     # Use the param in your asset computation     ...  # Define an asset job that includes my_asset my_asset_job = define_asset_job(     "my_asset_job",     selection=AssetSelection.assets(my_asset), )  # When executing the job, provide a run_config that includes the necessary configuration run_config = {     "ops": {         "my_asset": {             "config": {                 "param": "value",             }         }     } }  # Execute the job with the provided run_config result = my_asset_job.execute_in_process(run_config=run_config)
In this example,
my_asset
has a
config_schema
that expects a string parameter
param
. When you execute
my_asset_job
, you provide a
run_config
that includes the configuration for
my_asset
. Please note that the structure of the
run_config
must match the expectations of the assets within the job. If your assets have different configuration schemas, the
run_config
should include the configuration for each asset accordingly.
a
@Scout Well, this is how I define the assets:
Copy code
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    full_refresh = context.op_config.get("full_refresh", False)

    dbt_run_args = ["run"]
    if full_refresh:
        dbt_run_args.append("--full-refresh")
    
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(dbt_run_args, context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()  # raise_on_error=False means all tests will run even if upstream test fails
The intention was to use the scheduled function here:
Copy code
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
    partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")

    is_sunday = context.scheduled_execution_time.weekday() == 6  # sunday
    is_minute_event = context.scheduled_execution_time.minute % 2 == 0
    run_config = {
        "full_refresh": is_minute_event,
    }

    return RunRequest(
        partition_key=partition_key,
        run_config=run_config,
    )
To determine whether we should run a full refresh or not. However, it seems that doesn't quite work as you explained prior?
s
I apologize for the confusion. The
run_config
in the
RunRequest
should match the structure expected by the assets within the job. In the case of dbt assets defined with the
@dbt_assets
decorator, the
run_config
should specify the configuration for the dbt resource, not directly at the top level. Here's how you can adjust the
run_config
in the
RunRequest
to pass the
full_refresh
configuration to the dbt assets:
Copy code
python from dagster import schedule, RunRequest  @schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *") def daily_configurable_job(context):     partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")      is_sunday = context.scheduled_execution_time.weekday() == 6  # sunday     is_minute_event = context.scheduled_execution_time.minute % 2 == 0     run_config = {         "ops": {             "non_partitioned_dbt_assets": {                 "config": {                     "full_refresh": is_minute_event or is_sunday,                 }             }         }     }      return RunRequest(         run_key=f"run_{partition_key}",         run_config=run_config,     )
In this updated example, the
run_config
is structured with an
"ops"
key, which then contains the configuration for the specific asset. The asset name (
"non_partitioned_dbt_assets"
) must match the name used within the job. The
"config"
for the asset includes the
full_refresh
option, which is set based on whether the current minute is an even number or if it's Sunday. Please ensure that the asset name in the
run_config
matches the name used in the job. If you have multiple dbt assets or if the asset names are different, you will need to adjust the
run_config
accordingly. The
run_key
in the
RunRequest
is a unique identifier for the run, which I've constructed using the
partition_key
to ensure uniqueness. You can adjust this as needed for your use case. This approach should allow you to conditionally pass the
full_refresh
configuration to your dbt assets based on the logic defined in your schedule.
a
@Scout So is it actually better to define two jobs. And then use the config parameter of the define_asset_job function? Or define the run_config within the function used to determine how to launch the job We keep our asset definition the same, it can still decide whether to full refresh or not depending on the op_config
s
Defining two separate jobs, one for the regular runs and another for the full refresh, can be a cleaner and more explicit approach, especially if the logic for determining when to do a full refresh is straightforward (e.g., every Sunday). This way, you can have a dedicated job with its own schedule and run configuration, making it clear when and how each job should be executed. Here's how you could define two separate jobs: 1. A regular job for daily runs without a full refresh:
Copy code
python daily_dbt_job = define_asset_job(     "daily_dbt",     selection=daily_dbt_asset_selection,     # No need to specify run_config here since it's the regular run )  @schedule(job=daily_dbt_job, cron_schedule="0 0 * * 1-6")  # Monday to Saturday def daily_job_schedule(context):     return RunRequest(         run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"),     )
2. A separate job for the full refresh:
Copy code
python sunday_full_refresh_dbt_job = define_asset_job(     "sunday_full_refresh_dbt",     selection=daily_dbt_asset_selection,     config={         "ops": {             "non_partitioned_dbt_assets": {                 "config": {                     "full_refresh": True,                 }             }         }     }, )  @schedule(job=sunday_full_refresh_dbt_job, cron_schedule="0 0 * * 0")  # Sunday def sunday_full_refresh_schedule(context):     return RunRequest(         run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"),     )
In this setup, you have two jobs with their own schedules and run configurations. The
daily_dbt_job
runs every day except Sunday without a full refresh, and the
sunday_full_refresh_dbt_job
runs only on Sunday with a full refresh. Using the
config
parameter of the
define_asset_job
function allows you to specify the run configuration for the job at definition time. This configuration will apply to all runs of the job unless overridden at runtime. If you prefer to keep a single job and decide at runtime whether to perform a full refresh, you can continue using the
run_config
within the schedule function to dynamically set the configuration based on the day of the week or other conditions. Both approaches are valid, and the choice depends on your preference for organizing jobs and schedules. Separate jobs can make the system more understandable, as the intent of each job is clear from its name and associated schedule. On the other hand, a single job with dynamic configuration can reduce duplication and keep all related logic in one place.
a
@Scout Ok, so I did this:
Copy code
daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
)


@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
    partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")

    is_sunday = context.scheduled_execution_time.weekday() == 6  # sunday
    is_minute_event = context.scheduled_execution_time.minute % 2 == 0
    run_config = {
        "ops": {
            "non_partitioned_dbt_assets": {
                "config": {
                    "full_refresh": is_minute_event,
                },
            },
        },
    }

    return RunRequest(
        partition_key=partition_key,
        run_config=run_config,
    )
However, I am getting an error
Copy code
dagster._core.errors.DagsterInvalidConfigError: Error in config for job daily_dbt
    Error 1: Received unexpected config entry "full_refresh" at path root:ops:non_partitioned_dbt_assets:config. Expected: "{ }".
s
The error you're encountering indicates that Dagster is not expecting a
full_refresh
configuration entry for the
non_partitioned_dbt_assets
within the
run_config
. This is because the
@dbt_assets
decorator does not expose a
config_schema
that defines
full_refresh
as a valid configuration key. In Dagster, the
config_schema
for an asset or op defines the structure and type of configuration that can be passed to it. Since
@dbt_assets
generates assets based on your dbt project, and there isn't a direct way to pass additional configuration like
full_refresh
through the
@dbt_assets
decorator, you'll need to handle this differently. To resolve this issue, you can modify the
non_partitioned_dbt_assets
function to accept a custom configuration class that includes the
full_refresh
option. However, since
@dbt_assets
does not support passing a custom configuration class directly, you would need to use the
context.resources
to access the configuration for
full_refresh
. Here's an example of how you might adjust your asset and schedule definitions: 1. Define a custom resource for the
full_refresh
configuration:
Copy code
python
from dagster import resource, Field, Bool

@resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)})
def dbt_full_refresh_resource(context):
    return context.resource_config["full_refresh"]
2. Update your
non_partitioned_dbt_assets
to use the custom resource:
Copy code
python
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    full_refresh = context.resources.dbt_full_refresh_resource

    dbt_run_args = ["run"]
    if full_refresh:
        dbt_run_args.append("--full-refresh")
    
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(dbt_run_args, context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Update your schedule to provide the
full_refresh
configuration via resources: ```python @schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *") def daily_configurable_job(context: ScheduleEvaluationContext): partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d") is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday run_config = { "resources": { "dbt_full_refresh_resource": { "config": { "full_refresh": is_sunday, }, }, }, } return RunRequest( run_key=f"run_{partition_key}", run_config=run_config
, ) ``` In this updated example, the
dbt_full_refresh_resource
is defined as a resource with a
config_schema
that includes the
full_refresh
option. The
non_partitioned_dbt_assets
function accesses the
full_refresh
configuration through
context.resources
. The schedule provides the
full_refresh
configuration via the
resources
section of the
run_config
. Please ensure that you include the
dbt_full_refresh_resource
in your repository definition so that it's available to the job when it runs. This approach allows you to pass custom configuration to your dbt assets indirectly through resources.
a
@Scout I'm not following. We define the dbt resource like this:
Copy code
dbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir))
Now, technically it can take a list of global flags, including one for full-refresh. However, we need to support being able to run incremental and full_refresh, so we don't want to define the same underlying model twice as an asset. Perhaps you could show an example of defining the dbt resource, adding it to the Definitions object and then how we can configure our dbt assets, job and schedule to handle being able to dynamically run full refresh or not
s
To support dynamic full refreshes for your dbt assets without defining the same model twice, you can use a resource that conditionally sets the
full_refresh
flag based on tags or configuration passed at runtime. Here's how you can set up your dbt resource, assets, job, and schedule to handle this: 1. Define a dbt resource that accepts a
full_refresh
flag:
Copy code
python
from dagster import resource, Field, Bool, make_values_resource

@resource({
    "full_refresh": Field(Bool, is_required=False, default_value=False)
})
def dbt_cli_with_refresh(context):
    full_refresh_flag = ["--full-refresh"] if context.resource_config["full_refresh"] else []
    return DbtCliResource(
        project_dir=os.fspath(dbt_project_dir),
        global_flags=full_refresh_flag
    )
2. Update your
non_partitioned_dbt_assets
to use the
dbt_cli_with_refresh
resource:
Copy code
python
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    # The dbt resource now includes the full_refresh flag if set
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(["run"], context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Add the
dbt_cli_with_refresh
resource to your repository definition:
Copy code
python
from dagster import repository, define_asset_job, AssetSelection

@repository
def my_repository():
    return [
        dbt_cli_with_refresh,
        non_partitioned_dbt_assets,
        # ... other assets and jobs ...
    ]
4. Define your job without specifying `full_refresh`:
Copy code
python
daily_dbt_job = define_asset_job(
    "daily_dbt",
    selection=daily_dbt_asset_selection,
)
5. Define a schedule that sets the
full_refresh
flag based on the desired condition:
Copy code
python
from dagster import schedule, RunRequest

@schedule(job=daily_dbt_job, cron_schedule="0 0 * * *")  # Daily at midnight
def daily_configurable_job(context):
    is_sunday = context.scheduled_execution_time.weekday() == 6  # sunday
    run_config = {
        "resources": {
            "dbt_cli_with_refresh": {
                "config": {
                    "full_refresh": is_sunday,
                }
            }
        }
    }

    return RunRequest(
        run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"),
        run_config=run_config,
    )
In this setup, the
dbt_cli_with_refresh
resource is configured to include the
--full-refresh
flag when the
full_refresh
configuration is
True
. The
daily_configurable_job
schedule checks if it's Sunday and sets the
full_refresh
flag accordingly in the
run_config
. When the job runs, it will use the
dbt_cli_with_refresh
resource, which will have the
--full-refresh
flag included if the condition is met. This approach allows you to dynamically control whether a full refresh should occur without needing to define the same dbt model as multiple assets. The
full_refresh
behavior is controlled at the resource level, which is then used by the dbt assets during job execution.
a
@Scout I'm starting to think just bundling up this logic in the asset definition itself is cleaner. However, it feels like it obfuscates a lot. You tend to look at jobs and schedules at a high level. You schedule a job and if you add some dynamic logic to that schedule, they you can see how the job run might change given some condition. However, if I bundle it in the dbt_assets decorated function, then at the top level I see one job and one schedule which actually changes over time. I don't think the provided solution works well: you've just moved the dynamic nature to the resource, which feels odd
s
I understand your concerns about clarity and maintainability when embedding dynamic logic within asset definitions. It's important to strike a balance between flexibility and the ease of understanding your data pipelines. Given your scenario, you might consider the following approaches: 1. Explicit Jobs and Schedules: Define separate jobs for the regular and full-refresh runs, each with its own schedule. This approach makes the behavior explicit and easily discoverable at the job and schedule level. 2. Dynamic Scheduling: Use a single job with a dynamic schedule that adjusts the
run_config
based on the conditions you specify. This keeps the dynamic logic at the scheduling level, which can be more transparent than embedding it within assets. 3. Custom Sensor Logic: Instead of using a schedule, you could use a sensor to trigger runs with more complex logic. The sensor can check for conditions and decide whether to trigger a run with a full refresh or not. Here's an example of how you might use a sensor to trigger a job with or without a full refresh:
Copy code
python from dagster import sensor, RunRequest, repository, DefaultSensorStatus  @sensor(job=daily_dbt_job) def full_refresh_sensor(context):     is_sunday = context.scheduled_execution_time.weekday() == 6  # sunday     if is_sunday:         run_config = {             "resources": {                 "dbt": {                     "config": {                         "global_flags": ["--full-refresh"]                     }                 }             }         }         return RunRequest(             run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"),             run_config=run_config,         )     else:         # Skip the full refresh on other days         return None  @repository def my_repository():     return [         daily_dbt_job,         full_refresh_sensor,         # ... other assets and jobs ...     ]
In this example, the
full_refresh_sensor
checks if it's Sunday and triggers a run with the
--full-refresh
flag if the condition is met. On other days, it returns
None
, indicating no run is requested. This approach allows you to encapsulate the dynamic full-refresh logic within the sensor, keeping your job and asset definitions static and easier to understand. The sensor becomes the place where dynamic behavior is defined, which can be a more natural fit for such logic. Ultimately, the best approach depends on your specific requirements and preferences for organizing and maintaining your Dagster codebase. Each method has its trade-offs, and you should choose the one that aligns best with your team's practices and the complexity of your workflows.
a
@Scout Passing a run config doesn't appear to be additive, so we lose all existing configuration... How can we get around this?
s
When you pass a
run_config
to a job execution in Dagster, it replaces the existing configuration for that run. If you want to modify only a part of the configuration without losing the existing settings, you need to ensure that the
run_config
you provide includes all necessary configuration values, not just the changes. To get around the issue of losing existing configuration, you can: 1. Merge Configurations: Programmatically merge the new configuration with the existing one before passing it to the job execution. This way, you can update only the parts of the configuration that need to change while preserving the rest. 2. Use Defaults and Overrides: Define default configurations within your assets or resources, and use the
run_config
to override specific values as needed. This allows you to maintain a base configuration that is always applied unless explicitly overridden. 3. Configurable Resources: If the dynamic part of your configuration is related to resources, you can define resources with a
config_schema
that includes default values. Then, you can override these defaults with the
run_config
at runtime. Here's an example of how you might define a resource with a
config_schema
that includes a default value:
Copy code
python from dagster import resource, Field, Bool  @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def dbt_resource(context):     full_refresh = context.resource_config["full_refresh"]     # Return the dbt resource with the appropriate configuration     return DbtCliResource(         project_dir=os.fspath(dbt_project_dir),         global_flags=["--full-refresh"] if full_refresh else []     )
When you want to trigger a full refresh, you can provide a
run_config
that overrides the default value:
Copy code
python run_config = {     "resources": {         "dbt_resource": {             "config": {                 "full_refresh": True,             }         }     } }
If you need to merge configurations programmatically, you can use Python's dictionary update methods or other utilities to combine the existing configuration with the new values before passing it to the job execution. Keep in mind that if you're using the Dagster UI (Dagit) to launch runs, you can manually adjust the
run_config
for each run as needed. However, for automated runs triggered by schedules or sensors, you'll need to ensure that the
run_config
is constructed correctly in code.
a
@Scout Are resource reinstantiated when required for a job run? Otherwise this dynamic stuff wouldn't work?
s
Yes, resources in Dagster are instantiated at the beginning of each job run when they are required. This means that each time a job is executed, the resources it requires are created anew using the configuration provided for that run. This behavior allows you to dynamically configure resources on a per-run basis. For example, if you have a resource that connects to a database, the resource will be instantiated with the connection information provided in the
run_config
when the job starts. If the next run of the job provides different connection information in the
run_config
, a new instance of the resource will be created with the new configuration. This instantiation behavior is what enables dynamic configuration of resources, such as deciding whether to perform a full refresh in a dbt job based on a condition evaluated at runtime. Each run can have its own
run_config
that specifies whether the
full_refresh
flag should be set for the dbt resource, and the resource will be instantiated accordingly. Here's a simplified example of how a resource might be defined and used in a job:
Copy code
python from dagster import resource, Field, Bool, job, op  @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def dbt_resource(context):     full_refresh = context.resource_config["full_refresh"]     # Logic to handle the full_refresh flag     ...  @op(required_resource_keys={"dbt_resource"}) def my_op(context):     # This op can access the dbt_resource which will be instantiated with the current run's config     ...  @job(resource_defs={"dbt_resource": dbt_resource}) def my_job():     my_op()
When you execute
my_job
, the
dbt_resource
will be instantiated with the configuration provided in the
run_config
. If you trigger another run of
my_job
with a different
run_config
, a new instance of
dbt_resource
will be created with that configuration. This design allows for flexibility and ensures that each job run can behave differently based on the provided configuration, making it suitable for scenarios where job behavior needs to change dynamically.
a
@Scout In that case, create CustomDbtCliResource from a ConfigurableResource. It has a single option which is full_refresh. When full_refresh is on, we need to set the DbtCliResource _global_config_flags_ to include full_refresh. Then show how we can use this to periodically run full refreshes.
s
To create a custom
DbtCliResource
that includes a
full_refresh
option, you can subclass
ConfigurableResource
and set the `DbtCliResource`'s
_global_config_flags_
based on the
full_refresh
configuration. Here's how you can define such a resource and use it to periodically run full refreshes: 1. Define the custom `DbtCliResource`:
Copy code
python from dagster import resource, Field, Bool from dagster_dbt import DbtCliResource  @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def custom_dbt_cli_resource(context):     full_refresh = context.resource_config["full_refresh"]     global_flags = ["--full-refresh"] if full_refresh else []     return DbtCliResource(         project_dir=os.fspath(dbt_project_dir),         global_flags=global_flags     )
2. Use the custom resource in your dbt assets:
Copy code
python from dagster import AssetExecutionContext, asset from dagster_dbt import dbt_assets  @dbt_assets def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):     # The dbt resource now includes the full_refresh flag if set     yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource:
Copy code
python from dagster import define_asset_job, AssetSelection  my_asset_selection = AssetSelection.assets(my_dbt_assets)  my_dbt_job = define_asset_job(     "my_dbt_job",     selection=my_asset_selection,     resource_defs={"dbt": custom_dbt_cli_resource} )
4. Create a schedule that triggers the job with the
full_refresh
option set periodically (e.g., every Sunday):
Copy code
python from dagster import schedule, RunRequest from datetime import datetime  @schedule(job=my_dbt_job, cron_schedule="0 0 * * 0")  # Every Sunday at midnight def weekly_full_refresh_schedule(context):     run_config = {         "resources": {             "dbt": {                 "config": {                     "full_refresh": True,                 }             }         }     }      return RunRequest(         run_key=datetime.now().isoformat(),         run_config=run_config,     )
In this setup, the
weekly_full_refresh_schedule
is configured to trigger the
my_dbt_job
every Sunday with the
full_refresh
option set to
True
. The
custom_dbt_cli_resource
will be instantiated with the
--full-refresh
flag included in the
global_flags
for that run, causing dbt to perform a full refresh of the models. This approach allows you to have a configurable dbt resource that can be dynamically controlled by the schedule, enabling periodic full refreshes without the need to define multiple jobs or assets.
a
@Scout Can you not use the resource decorator, instead create a subclass of
ConfigurableResource
. Additionally, that hasn't solved the issue of the entire configuration being overwritten when you dynamically provide the run config. So to reiterate: • I define my dbt assets using the dbt_assets decorator • I define a dbt resource using the DbtCliResource • I want to run a selection of assets daily, some will be incremental and others not. Always run incremental for those that are though. • I then want to run a full_fresh once a week. This can be configured in many places: resource itself, not the run config due to it overwriting things, the dbt_assets decorated function itself. We cannot define multiple assets for the same underlying model. I need to trigger the job run via a schedule which returns a RunRequest.
s
To create a subclass of
ConfigurableResource
that includes a
full_refresh
option and use it to periodically run full refreshes without overwriting the entire configuration, you can follow these steps: 1. Define a subclass of
ConfigurableResource
that includes the
full_refresh
option:
Copy code
python
from dagster import ResourceDefinition
from dagster_dbt import dbt_cli_resource
from dagster.core.execution.resources_init import InitResourceContext

class CustomDbtCliResource(ResourceDefinition):
    def __init__(self, project_dir):
        self.project_dir = project_dir
        super().__init__(resource_fn=self.create_resource)

    def create_resource(self, context: InitResourceContext):
        full_refresh = context.resource_config.get("full_refresh", False)
        global_flags = ["--full-refresh"] if full_refresh else []
        return dbt_cli_resource(
            project_dir=self.project_dir,
            global_flags=global_flags
        )
2. Use the custom resource in your dbt assets:
Copy code
python
from dagster_dbt import dbt_assets

@dbt_assets
def my_dbt_assets(context, dbt):
    # The dbt resource now includes the full_refresh flag if set
    yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource:
Copy code
python
from dagster import define_asset_job, AssetSelection

my_asset_selection = AssetSelection.assets(my_dbt_assets)

my_dbt_job = define_asset_job(
    "my_dbt_job",
    selection=my_asset_selection,
    resource_defs={"dbt": CustomDbtCliResource(project_dir=dbt_project_dir)}
)
4. Create a schedule that triggers the job with the
full_refresh
option set periodically (e.g., every Sunday):
Copy code
python
from dagster import schedule, RunRequest
from datetime import datetime

@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0")  # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
    # Only set the full_refresh option in the resource config
    resource_config = {
        "dbt": {
            "config": {
                "full_refresh": True,
            }
        }
    }

    # Merge the new resource config with the existing base config
    run_config = merge_dicts(context.scheduled_execution_time, resource_config)

    return RunRequest(
        run_key=datetime.now().isoformat(),
        run_config=run_config,
    )
In this setup, the
weekly_full_refresh_schedule
is configured to trigger the
my_dbt_job
every Sunday with the
full_refresh
option set to
True
. The
CustomDbtCliResource
will be instantiated with the
--full-refresh
flag included in the
global_flags
for that run, causing dbt to perform a full refresh of the models. The
merge_dicts
function is a utility function that you would need to implement to merge the new resource configuration
with the existing base configuration. This function should handle deep merging of nested dictionaries to ensure that only the necessary parts of the configuration are updated. This approach allows you to have a configurable dbt resource that can be dynamically controlled by the schedule, enabling periodic full refreshes without the need to define multiple jobs or assets and without overwriting the entire configuration.
a
@Scout I think we've venture too far. Given we don't have access to the config_schema for the dbt assets, I think we just need to add the logic for full_refresh or not to:
Copy code
@dbt_assets(
    manifest=dbt_manifest_path,
    exclude="tag:partitioned",
    dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

    dbt_run_args = ["run"]
    if full_refresh:  # replace with check day?
        dbt_run_args.append("--full-refresh")
    
    yield from dbt.cli(["seed"], context=context).stream()
    yield from dbt.cli(dbt_run_args, context=context).stream()
    yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()  # raise_on_error=False means all tests will run even if upstream test fails
Is there anything wrong with this approach. What are the best practises in Dagster that you would suggest given the constraints.
s
The approach you've outlined is a practical solution given the constraints. By embedding the logic for a full refresh directly within the
dbt_assets
function, you can control the behavior of the dbt run based on conditions evaluated at runtime. This method keeps your job and asset definitions static and moves the dynamic logic into the asset execution context, which can be a good practice when you want to avoid duplicating asset definitions. Here are some best practices to consider: 1. Clear Logic: Ensure that the logic for determining whether to perform a full refresh is clear and easy to understand. For example, you could encapsulate the logic in a well-named function:
Copy code
python def should_perform_full_refresh(context: AssetExecutionContext) -> bool:     # Implement the logic to determine if a full refresh should be performed     # For example, check if today is Sunday:     return context.scheduled_execution_time.weekday() == 6
2. Documentation: Document the behavior of your assets, especially if the logic for triggering a full refresh is not immediately visible in the job or schedule definitions. This helps other developers understand when and why a full refresh will occur. 3. Testing: Test the asset logic under different conditions to ensure that the full refresh behavior works as expected. 4. Scheduling: If you're using a schedule to trigger the job that includes these assets, make sure the schedule aligns with the refresh logic. For example, if the full refresh is supposed to happen every Sunday, the schedule should reflect that. 5. Resource Configuration: If the full refresh flag is something that could be used by multiple assets or jobs, consider using a resource to hold this configuration. This allows you to centralize the configuration and avoid repeating the same logic in multiple places. 6. Monitoring: Implement monitoring or alerting to track when full refreshes occur, especially if they are resource-intensive operations. This can help you quickly identify and address any issues that arise during these operations. Given these best practices, the approach you've taken is reasonable. It allows you to maintain a single set of dbt assets and control the full refresh behavior dynamically, without complicating your job and schedule definitions. Just make sure to document the behavior and test it thoroughly to ensure it meets your needs.
a
@Scout The resource configuration doesn't really work though does it? Theoretically, I could define a DbtCliResource and a DbtCliFullRefreshResource. However, when defining assets, I need to pass a specific resource. However, this is where a configurable resource might be useful? Having said that, where would it be configurable from? I can't specify a resource at the job definition or schedule level? I can try the run_config hack in the RunRequest, but I don't want to deal with merging with existing, so ignore that for now.
s
You're correct that when defining assets with the
@dbt_assets
decorator, you need to specify which resource to use, and it's not straightforward to switch between different resources (like
DbtCliResource
and
DbtCliFullRefreshResource
) at the job or schedule level in Dagster. Configurable resources can indeed be useful in this scenario. A configurable resource allows you to define a resource that can be configured at runtime, typically through the
run_config
. However, as you've noted, you want to avoid dealing with merging configurations in the
RunRequest
. Here's how you could use a configurable resource to control the
full_refresh
behavior: 1. Define a configurable resource that includes the
full_refresh
option:
Copy code
python
from dagster import resource, Field, Bool
from dagster_dbt import dbt_cli_resource

@resource({
    "full_refresh": Field(Bool, is_required=False, default_value=False)
})
def configurable_dbt_cli_resource(context):
    full_refresh = context.resource_config["full_refresh"]
    global_flags = ["--full-refresh"] if full_refresh else []
    return dbt_cli_resource(
        project_dir=os.fspath(dbt_project_dir),
        global_flags=global_flags
    )
2. Use the configurable resource in your dbt assets:
Copy code
python
from dagster_dbt import dbt_assets

@dbt_assets
def my_dbt_assets(context, dbt):
    # The dbt resource now includes the full_refresh flag if set
    yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the configurable dbt resource:
Copy code
python
from dagster import define_asset_job, AssetSelection

my_asset_selection = AssetSelection.assets(my_dbt_assets)

my_dbt_job = define_asset_job(
    "my_dbt_job",
    selection=my_asset_selection,
    resource_defs={"dbt": configurable_dbt_cli_resource}
)
4. Create a schedule that triggers the job with the
full_refresh
option set periodically (e.g., every Sunday):
Copy code
python
from dagster import schedule, RunRequest
from datetime import datetime

@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0")  # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
    run_config = {
        "resources": {
            "dbt": {
                "config": {
                    "full_refresh": True,
                }
            }
        }
    }

    return RunRequest(
        run_key=datetime.now().isoformat(),
        run_config=run_config,
    )
In this setup, the
weekly_full_refresh_schedule
is configured to trigger the
my_dbt_job
every Sunday with the
full_refresh
option set to
True
. The
configurable_dbt_cli_resource
will be instantiated with the
--full-refresh
flag included in the
global_flags
for that run, causing dbt to perform a full refresh of the models. This approach allows you to control the fu
ll refresh behavior dynamically at the schedule level without the need to define multiple resources for different refresh behaviors. The
full_refresh
configuration is provided directly in the
run_config
of the
RunRequest
, and there's no need to merge configurations since the
run_config
provided at the schedule level is intended to be the complete configuration for that run.
a
@Scout Can you respond again, but not using the @resource decorator, I'm fairly sure this has been deprecated.
s
Certainly, the
@resource
decorator is part of the legacy resource system in Dagster. To define a configurable resource without using the
@resource
decorator, you can subclass
ConfigurableResource
directly. Here's how you can create a subclass of
ConfigurableResource
for the
DbtCliResource
that includes a
full_refresh
option: 1. Define a subclass of `ConfigurableResource`:
Copy code
python
from dagster import ConfigurableResource
from dagster_dbt import dbt_cli_resource

class CustomDbtCliResource(ConfigurableResource):
    def __init__(self, project_dir):
        self.project_dir = project_dir

    def setup_for_execution(self, context):
        full_refresh = context.resource_config.get("full_refresh", False)
        global_flags = ["--full-refresh"] if full_refresh else []
        return dbt_cli_resource(
            project_dir=self.project_dir,
            global_flags=global_flags
        )
2. Use the custom resource in your dbt assets:
Copy code
python
from dagster_dbt import dbt_assets

@dbt_assets
def my_dbt_assets(context, dbt):
    # The dbt resource now includes the full_refresh flag if set
    yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource:
Copy code
python
from dagster import define_asset_job, AssetSelection

my_asset_selection = AssetSelection.assets(my_dbt_assets)

my_dbt_job = define_asset_job(
    "my_dbt_job",
    selection=my_asset_selection,
    resource_defs={"dbt": CustomDbtCliResource(project_dir=dbt_project_dir)}
)
4. Create a schedule that triggers the job with the
full_refresh
option set periodically (e.g., every Sunday):
Copy code
python
from dagster import schedule, RunRequest
from datetime import datetime

@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0")  # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
    run_config = {
        "resources": {
            "dbt": {
                "config": {
                    "full_refresh": True,
                }
            }
        }
    }

    return RunRequest(
        run_key=datetime.now().isoformat(),
        run_config=run_config,
    )
In this setup, the
weekly_full_refresh_schedule
is configured to trigger the
my_dbt_job
every Sunday with the
full_refresh
option set to
True
. The
CustomDbtCliResource
will be instantiated with the
--full-refresh
flag included in the
global_flags
for that run, causing dbt to perform a full refresh of the models. This approach allows you to control the full refresh behavior dynamically at the schedule level without the need to define multiple resources for different refresh behaviors. The
full_refresh
configuration is provided directly in the
run_config
of the
RunRequest
, and there's no need to merge configurations since the `run_c
a
@Scout Can you respond again, but not using the resource decorator, I'm fairly sure this has been deprecated
s
The
@resource
decorator is not deprecated in Dagster. However, if you prefer not to use the
@resource
decorator and instead want to use a class-based approach for defining resources, you can directly subclass
ResourceDefinition
. This allows you to create more complex resource classes with custom initialization logic. Here's an example of how you can define a custom
DbtCliResource
by subclassing `ResourceDefinition`:
Copy code
python from dagster import ResourceDefinition from dagster_dbt import dbt_cli_resource  class CustomDbtCliResource(ResourceDefinition):     def __init__(self, project_dir, full_refresh):         self.project_dir = project_dir         self.full_refresh = full_refresh         super().__init__(resource_fn=self._resource_fn)      def _resource_fn(self, _):         global_flags = ["--full-refresh"] if self.full_refresh else []         return dbt_cli_resource(             project_dir=self.project_dir,             global_flags=global_flags         )  # Usage in job definition my_dbt_resource = CustomDbtCliResource(project_dir=dbt_project_dir, full_refresh=False)  my_dbt_job = define_asset_job(     "my_dbt_job",     selection=my_asset_selection,     resource_defs={"dbt": my_dbt_resource} )
In this example,
CustomDbtCliResource
is a subclass of
ResourceDefinition
that takes
project_dir
and
full_refresh
as initialization arguments. The
_resource_fn
method is used to create an instance of the
DbtCliResource
with the appropriate
global_flags
. To dynamically control the
full_refresh
behavior, you can create instances of
CustomDbtCliResource
with different
full_refresh
values and pass them to different job definitions or use a factory pattern to create the resource instance based on runtime conditions. Regarding schedules and dynamic behavior, you would still need to manage the
full_refresh
flag at runtime. One way to do this is by using a sensor or a schedule that checks the condition and triggers a job with the appropriate resource instance. Please note that the class-based approach for resources is more complex and less common than using the
@resource
decorator. The decorator-based approach is generally recommended for its simplicity and ease of use, unless you have specific requirements that necessitate a class-based approach.
onfig` provided at the schedule level is intended to be the complete configuration for that run.
a
@Scout resource_defs is not a parameter
s
I apologize for the oversight. You are correct that
resource_defs
is not a parameter you can pass directly to
define_asset_job
. Instead, resources are associated with assets or jobs at a higher level in Dagster, typically when constructing a repository or defining a job using the
to_job
method on a graph. Given the constraints and the need to dynamically control whether a full refresh should occur, you can consider using a sensor to trigger the job with the appropriate configuration. The sensor can check the current day and decide whether to include the
--full-refresh
flag in the
run_config
for the dbt CLI resource. Here's an example of how you might use a sensor to achieve this:
Copy code
python from dagster import sensor, RunRequest, repository, define_asset_job, AssetSelection from datetime import datetime  # Define your dbt assets using the dbt_assets decorator @dbt_assets(...) def my_dbt_assets(...):     ...  # Define your job that includes the dbt assets my_dbt_job = define_asset_job(     "my_dbt_job",     selection=AssetSelection.assets(my_dbt_assets),     # Resources are defined at the repository level, not here )  # Define a sensor that checks the current day and decides whether to run a full refresh @sensor(job=my_dbt_job) def my_dbt_full_refresh_sensor(context):     # Check if today is Sunday (or any other condition you want to use)     is_sunday = datetime.now().weekday() == 6      # Construct the run configuration with the full_refresh flag for the dbt CLI resource     run_config = {         "resources": {             "dbt": {                 "config": {                     "global_flags": ["--full-refresh"] if is_sunday else []                 }             }         }     }      # Trigger a run with the constructed run configuration     return RunRequest(         run_key=context.cursor,         run_config=run_config,     )  # Define your repository that includes the job and sensor @repository def my_repository():     return [         my_dbt_job,         my_dbt_full_refresh_sensor,         # Include other assets, jobs, sensors, and resources as needed     ]
In this setup, the
my_dbt_full_refresh_sensor
checks if it's Sunday and constructs a
run_config
that includes the
--full-refresh
flag for the dbt CLI resource if the condition is met. When the sensor triggers a run, it will use the provided
run_config
, which will apply the full refresh only if it's Sunday. This approach allows you to maintain a single job definition and control the full refresh behavior dynamically using a sensor, without the need to define multiple jobs or manage merging configurations.
428 Views