Alexander Gregory
02/29/2024, 2:18 AMScout
02/29/2024, 2:18 AM@dbt_assets
definitions, one with a partitions_def
for the partitioned assets and one without it for the unpartitioned assets. You can then use dbt tags to distinguish which dbt models are partitioned and assign them accordingly to the respective @dbt_assets
definition using the select
and exclude
parameters.
Here's a snippet from the documentation that outlines this approach:
I worked around this by having two @dbt_asset definitions, one with a partitions_def and one without. I then used dbt tags to mark which dbt models are partitioned daily (partitioned_daily). Then I could assign those models to the partitioned @dbt_asset using the select="tag:partitioned_daily" parameter. The unpartitioned @dbt_asset excludes these models with exclude="tag:partitioned_daily"
This method allows you to ensure that your partitioned Dagster assets map to partitioned dbt assets, and unpartitioned Dagster assets map to unpartitioned dbt assets. If you are encountering errors, it would be helpful to review the error messages and ensure that your job definition correctly separates the partitioned and non-partitioned assets as described. If the errors persist, it may be necessary to look into the specific details of the error messages and the job configuration to diagnose the issue further. dbt project with partitioned and unparti... button Allow more than one custom partition def... button How do I define a job with a multi-parti... button Cannot govern multiple dbt projects with... button Thank you for your feedback!Alexander Gregory
02/29/2024, 2:21 AM@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=DagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run"], context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream() # raise_on_error=False means all tests will run even if upstream test fails
@dbt_assets(
manifest=dbt_manifest_path,
select="tag:partitioned,tag:daily",
partitions_def=DailyPartitionsDefinition(start_date="2024-02-20"),
dagster_dbt_translator=DagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
start, end = context.partition_time_window
dbt_vars = {
"min_date": start.isoformat(),
"max_date": end.isoformat(),
}
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run", "--vars", json.dumps(dbt_vars)], context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
I have then defined the job like this:
manifest = json.loads(dbt_manifest_path.read_text())
daily_dbt_asset_selection = DbtManifestAssetSelection.build(
manifest=manifest,
select="tag:daily",
)
daily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
# partitions_def=DailyPartitionsDefinition("2024-01-01"),
)
daily_dbt_job_schedule = ScheduleDefinition(
job=daily_dbt_job,
cron_schedule="*/1 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
Yet, when this job runs, it errors: non_partitioned_dbt_assets - Dependencies for step non_partitioned_dbt_assets failed: ['daily_partitioned_dbt_assets']. Not executing.
Scout
02/29/2024, 2:21 AMnon_partitioned_dbt_assets - Dependencies for step non_partitioned_dbt_assets failed: ['daily_partitioned_dbt_assets']. Not executing.
, suggests that there is a dependency issue within your job definition. Specifically, it indicates that the non_partitioned_dbt_assets
step cannot execute because it depends on the daily_partitioned_dbt_assets
step, which seems to have failed or not executed.
In Dagster, assets can have dependencies on other assets, and if a dependent asset fails or is not executed, it can prevent downstream assets from executing. This is to ensure data consistency and to prevent partial updates when the entire pipeline is considered as a unit of work.
Given that you have partitioned and non-partitioned assets, it's important to ensure that the dependencies between them are correctly defined. If non_partitioned_dbt_assets
is intended to run independently of daily_partitioned_dbt_assets
, you should make sure that there are no implicit dependencies between them unless that is your intention.
Here are a few steps you can take to troubleshoot this issue:
1. Check Asset Dependencies: Verify that the non_partitioned_dbt_assets
does not unintentionally depend on daily_partitioned_dbt_assets
. If there is a dependency, you need to ensure that the daily_partitioned_dbt_assets
executes successfully before non_partitioned_dbt_assets
can run.
2. Review Asset Selection: Look at the daily_dbt_asset_selection
and ensure that it is selecting the correct assets for execution. If the selection inadvertently includes assets that should not be part of the job, it could lead to dependency issues.
3. Examine Partition Definitions: Since you have partitioned assets, ensure that the partition definitions are correctly set up and that the job is being run with the correct partition context. If there is a mismatch in the partition context, it could cause the partitioned assets to not execute as expected.
4. Check Asset Materialization: Ensure that the assets are materializing correctly during execution. If an asset fails to materialize, it could cause downstream assets to not execute.
5. Review Job and Schedule Definitions: Double-check the daily_dbt_job
and daily_dbt_job_schedule
definitions to ensure they are set up correctly and that the schedule is triggering the job as expected.
6. Inspect Logs and Error Messages: Look at the full logs and any error messages associated with the daily_partitioned_dbt_assets
to understand why it might be failing or not executing.
7. Isolate the Issue: Try running the partitioned and non-partitioned assets separately to determine if they can execute successfully on their own. This can help isolate whether the issue is with aScout
02/29/2024, 2:22 AMAlexander Gregory
02/29/2024, 2:27 AMFailure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
. I can only assume this is because no partition_def is passed to the define_asset_job
function. However, if I do pass a definition, I get an error from the non-partitioned assets saying it isn't partitionedScout
02/29/2024, 2:27 AMFailure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
indicates that there is a mismatch between the expected partition information and what is being provided during the execution of the partitioned asset. This error typically arises when a partitioned asset is expected to run with a specific partition key or range, but none is provided. On the other hand, if you pass a partitions_def
to the define_asset_job
function and receive an error from the non-partitioned assets stating that they aren't partitioned, it suggests that the job is being configured in a way that expects all included assets to be partitioned. To resolve this, you can consider the following approaches: 1. Separate Jobs: Create separate jobs for partitioned and non-partitioned assets. This way, you can run each job with the appropriate partition context (or lack thereof) without causing conflicts between partitioned and non-partitioned assets. 2. Partition Mapping: If you need to have both partitioned and non-partitioned assets in the same job, you may need to use a PartitionMapping
that correctly maps the dependencies between partitioned and non-partitioned assets. For example, AllPartitionMapping
can be used when a non-partitioned asset depends on all partitions of an upstream partitioned asset. 3. Dynamic Partitioning: If the partitioned and non-partitioned assets must be in the same job and run together, consider using dynamic partitioning, which allows for more flexible partitioning schemes that can accommodate varying partition setups. 4. Job Configuration: Ensure that the job configuration correctly reflects the partitioning scheme of the assets. If only some assets are partitioned, the job should be configured to handle this hybrid setup without imposing partition requirements on non-partitioned assets. 5. Debugging: Run the partitioned assets with explicit partition keys to ensure they work as expected. Then, try running the non-partitioned assets separately. This can help you identify where the configuration is going wrong when combining them. 6. Documentation and Community: Check the Dagster documentation for examples of jobs that include both partitioned and non-partitioned assets. You may also reach out to the Dagster community for advice on handling such cases, as other users may have encountered similar scenarios. Given the complexity of the issue, it may require a detailed review of your Dagster and dbt configurations. If the problem persists, consider providing more context or seeking direct support from the Dagster community or maintainers.Alexander Gregory
02/29/2024, 2:30 AMScout
02/29/2024, 2:30 AMdefine_asset_job
function in Dagster is designed to work with assets that have a uniform partitioning scheme. When you have a mix of partitioned and non-partitioned assets, the job needs to be configured to handle this mix correctly.
Unfortunately, the documentation does not provide a direct example of using partition mappings to resolve the specific issue you're facing with a mix of partitioned and non-partitioned dbt models within the same Dagster job. The use of partition mappings is more straightforward when all assets within a job are partitioned in a compatible way.
Since the documentation does not offer a clear solution for this scenario, I would recommend the following general debugging steps:
1. Review dbt and Dagster Configurations: Ensure that your dbt models are correctly configured for incremental runs and that the partitioning logic within dbt aligns with the partitioning scheme you're trying to implement in Dagster.
2. Test Partitioned Runs Separately: Try running your partitioned dbt models separately with explicit partition keys to ensure that the partitioning logic within dbt is functioning as expected.
3. Test Non-Partitioned Runs Separately: Similarly, run your non-partitioned dbt models separately to confirm that they execute without requiring partition information.
4. Explore Dagster's Partitioning Documentation: Review Dagster's documentation on partitioning to understand the available partition mappings and how they might be applied to your use case.
5. Seek Community Input: Since the documentation does not provide a specific example for your case, consider reaching out to the Dagster community for advice. Other users or the maintainers may have experience with similar scenarios and can offer guidance.
6. Consider Custom Logic: If Dagster's built-in partitioning system does not support your use case, you may need to implement custom logic to orchestrate the execution of your dbt models, potentially bypassing Dagster's partitioning system for this particular job.
If you continue to face challenges with this setup, it may be beneficial to seek direct support from the Dagster community, where you can sScout
02/29/2024, 2:31 AMAlexander Gregory
02/29/2024, 12:13 PMScout
02/29/2024, 12:13 PMDagsterDbtTranslator.get_partition_mapping
method to customize the partition mapping for dbt assets in Dagster. This can be useful when you have a mix of partitioned and non-partitioned assets and need to define how partitions should relate to each other.
The documentation mentions several implementations of PartitionMapping
that you can use depending on your use case:
- `TimeWindowPartitionMapping`: The default mapping between two TimeWindowPartitionsDefinitions
. It maps a partition in the downstream partitions definition to all partitions in the upstream asset whose time windows overlap it.
- `IdentityPartitionMapping`: Expects that the upstream and downstream assets are partitioned in the same way, and maps partitions in the downstream asset to the same partition in the upstream asset.
- `AllPartitionMapping`: Maps every partition in the downstream asset to every partition in the upstream asset. This is commonly used when the downstream asset is not partitioned, and the entire downstream asset depends on all partitions of the upstream asset.
- `LastPartitionMapping`: Maps all dependencies to the last partition in the upstream asset.
For your specific case, where you have an upstream asset that is partitioned and a downstream asset that is not, you might consider using the AllPartitionMapping
. This mapping would indicate that the non-partitioned downstream asset depends on all partitions of the upstream partitioned asset.
Here's an example of how you might override the get_partition_mapping
method in a custom `DagsterDbtTranslator`:
```python
from dagster_dbt import DagsterDbtTranslator
from dagster import AllPartitionMapping, PartitionMapping
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_partition_mapping(
cls,
dbt_resource_props: Mapping[str, Any],
dbt_parent_resource_props: Mapping[str, Any],
) -> Optional[PartitionMapping]:
# Assuming dbt_resource_props and dbt_parent_resource_props contain
# information about the dbt models and their dependencies, you can
# implement custom logic to return the appropriate PartitionMapping.
# For example, if the downstream asset is not partitioned and depends
# on an upstream partitioned asset, you might return an AllPartitionMapping.
# Here's a simple example where we always return an AllPartitionMapping,
# but you can include logic to make this decision based on the properties.
return AllPartitionMapping()
# Then, when defining your dbt assets, you would use your custom translator:
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
# ... other parameters ...
)
def my_dbt_aScout
02/29/2024, 12:14 PMget_partition_mapping
may need to be more complex, depending on the relationships between your dbt models and how you want to handle partition dependencies. Keep in mind that the use of custom partition mappings and translators should be carefully tested to ensure that they behave as expected in your Dagster jobs. If you encounter any issues or need further guidance, the Dagster community and maintainers can be a valuable resource.Alexander Gregory
02/29/2024, 12:24 PMScout
02/29/2024, 12:24 PMAllPartitionMapping
is used to indicate that the downstream asset depends on all partitions of the upstream asset. This is a fan-in scenario where the downstream asset is not partitioned. 2. Downstream partitioned, upstream not: For this scenario, the documentation does not provide a specific partition mapping. However, the default behavior in Dagster when a partition mapping is not specified is to assume that each partition of the downstream asset depends on the unpartitioned upstream asset. This is a fan-out scenario. 3. Both upstream and downstream partitioned: If both assets are partitioned, you can use IdentityPartitionMapping
if they are partitioned in the same way, or TimeWindowPartitionMapping
to map partitions based on overlapping time windows. The specific mapping to use would depend on how the partitions in the upstream and downstream assets relate to each other. 4. Neither upstream nor downstream partitioned: If neither asset is partitioned, there is no need for a partition mapping, as the assets would simply depend on each other without any partition context. For the case where the upstream is not partitioned and the downstream is, you typically don't need to do anything special. Each partition of the downstream asset will depend on the unpartitioned upstream asset by default. If you need to customize this behavior, you would likely need to implement custom logic within your asset definitions to handle the dependency relationship. It's important to note that while the documentation provides guidance on common scenarios, there may be cases that require custom solutions. If you find that the provided partition mappings do not cover your use case, or if you need to implement a more complex dependency relationship, you may need to reach out to the Dagster community for support or consider implementing custom logic within your asset definitions. Remember to test any custom partition mappings or logic thoroughly to ensure that they work as expected within your Dagster jobs.Alexander Gregory
02/29/2024, 12:31 PMScout
02/29/2024, 12:31 PMdefine_asset_job
, you can specify a selection
parameter to determine which assets the job will materialize. However, the partitions_def
parameter is optional. If you include a partitions_def
in the job definition, it implies that the job is intended to materialize partitioned assets, and you would typically run the job for specific partitions. Here's how you might define a job for a mix of partitioned and non-partitioned dbt models: python from dagster import define_asset_job, AssetSelection # Define an asset selection that includes both partitioned and non-partitioned dbt models asset_selection = AssetSelection.assets( # ... include your dbt assets here ... ) # Define the job without specifying a partitions_def dbt_job = define_asset_job( name="dbt_job", selection=asset_selection, ) # If you have partitioned assets, you would typically run the job for specific partitions # using the run configuration or through schedules or sensors that provide the partition key.
In this job definition, we do not specify a partitions_def
because the job includes both partitioned and non-partitioned assets. When you run the job, Dagster will materialize all selected assets. For partitioned assets, you would specify which partition(s) to materialize in the run configuration or through schedules or sensors that trigger the job. If you have a schedule or sensor that triggers this job, you would handle the partitioning logic there. For example, a schedule could be set up to run the job for the latest partition of the partitioned assets, while still materializing the non-partitioned assets. Keep in mind that the job's behavior regarding partitioning should be consistent with the partitioning logic defined in your dbt models. If your dbt models are set up to handle incremental runs based on partitioning, ensure that the job's run configuration aligns with those expectations. If you encounter any issues or have specific requirements that are not covered by the standard job definition, you may need to seek further guidance from the Dagster community or explore custom solutions.Alexander Gregory
02/29/2024, 12:35 PMScout
02/29/2024, 12:35 PMpython from dagster import asset @asset def non_partitioned_asset(): return "non_partitioned_data"
Next, we'll define a partitioned asset. For simplicity, let's assume daily partitions: python from dagster import asset, DailyPartitionsDefinition, PartitionKey @asset(partitions_def=DailyPartitionsDefinition(start_date="2023-01-01")) def partitioned_asset(context): # The context provides the partition key, which you can use to customize behavior partition_key = context.partition_key return f"partitioned_data_for_{partition_key}"
Now, we'll define a job that includes both assets. Since the job includes both partitioned and non-partitioned assets, we won't specify a partitions_def
in the job definition. Instead, we'll use the AssetSelection
to include both assets: python from dagster import define_asset_job, AssetSelection # Define an asset selection that includes both the partitioned and non-partitioned assets asset_selection = AssetSelection.assets( non_partitioned_asset, partitioned_asset, ) # Define the job without specifying a partitions_def my_asset_job = define_asset_job( name="my_asset_job", selection=asset_selection, )
When you run my_asset_job
, Dagster will materialize the non-partitioned asset without any partition context. For the partitioned asset, you will need to specify which partition you want to materialize when you trigger the job run. This can be done through the Dagit UI, the Dagster GraphQL API, or programmatically using the execute_in_process
method with the appropriate partition key. Here's an example of how you might run the job for a specific partition programmatically: python partition_key = "2023-01-01" my_asset_job.execute_in_process(partition_key=PartitionKey(partition_key))
In this example, the non-partitioned asset will run as usual, and the partitioned asset will run for the specified partition key. Please note that this is a simplified example to illustrate the concept. In a real-world scenario, you would need to handle the partitioning logic according to your specific data and processing requirements. If you have complex dependencies or need to handle different partitioning schemes, you may need to implement custom logic or use more advanced features of Dagster's partitioning system.Alexander Gregory
02/29/2024, 12:46 PMclass CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_partition_mapping(cls, dbt_resource_props, dbt_parent_resource_props) -> Optional[PartitionMapping]:
downstream_incremental = "partitioned" in dbt_resource_props.get("config").get("tags", [])
upstream_incremental = "partitioned" in dbt_parent_resource_props.get("config").get("tags", [])
# print(json.dumps(dbt_parent_resource_props.get("config")))
print(f"downstream: {dbt_resource_props.get('name')} upstream: {dbt_parent_resource_props.get('name')}")
if downstream_incremental and upstream_incremental:
# downstream IS partitioned, upstream IS partitioned
print("both")
return IdentityPartitionMapping()
elif downstream_incremental and not upstream_incremental:
# downstream IS partitioned, upstream NOT partitioned
print("downstream partitioned")
return None
# return AllPartitionMapping() # not sure on this
elif not downstream_incremental and upstream_incremental:
# downstream NOT partitioned, upstream IS partitioned
print("upstream partitioned")
return AllPartitionMapping()
# return AllPartitionMapping() # not sure on this
print("neither")
return None # neither partitioned
Where I am using a tag I set on the dbt model to decipher whether to map the dbt model to be a partitioned asset or not. However, this still does not fix anything.
It is worth noting if I manually run this job, it requests in UI I select some partitions and then beyond that is runs absolutely fine.Scout
02/29/2024, 12:47 PMDagsterDbtTranslator
you've implemented is a step in the right direction for handling the relationship between partitioned and non-partitioned dbt models in Dagster. The get_partition_mapping
method is used to determine the partition mapping between dbt assets based on their tags.
From your description, it seems that the job runs fine when you manually select partitions in the UI. This indicates that the partition selection is working as expected when explicitly provided. However, the issue might be related to how the job is triggered automatically (e.g., via a schedule or sensor) without manual partition selection.
Here are a few things to consider:
1. Automatic Partition Selection: When triggering the job automatically, you need to ensure that the correct partition information is being passed to the job. This could be done through a schedule or sensor that computes the appropriate partition keys based on the current time or other logic.
2. Job Definition: Ensure that the job is defined in a way that allows for both partitioned and non-partitioned assets to be materialized. Since you have a mix of partitioned and non-partitioned dbt models, the job should not have a partitions_def
specified, as this would imply that all assets in the job are expected to be partitioned.
3. Partition Mapping: The get_partition_mapping
method should return the correct PartitionMapping
based on the tags. It looks like you're returning None
when the downstream is partitioned and the upstream is not, which implies that the default partition mapping will be used. The default behavior is typically to map partition keys to the same partition keys in upstream assets, which may not be what you want in this case. You might need to return a specific PartitionMapping
that reflects the desired behavior.
4. Testing: Test the job execution with different scenarios (e.g., different partition keys, different combinations of partitioned and non-partitioned models) to ensure that the partition mappings are working as expected.
5. Logging and Debugging: Use logging to help debug the partition mappings. You've already added print statements in your get_partition_mapping
method, which can be helpful. Ensure that the logs are giving you the correct information about the relationships between assets.
6. Dagster Community: If the issue persists, consider reaching out to the Dagster community for support. Other users or the maintainers may have encountered similar scenarios and can offer guidance or solutions.
Remember that the partition mappings are there to define dependencies between assets. If the automatic triggering of the job is not working as expected, it's likely an issue with how the partitions are being determined or passedScout
02/29/2024, 12:47 PMAlexander Gregory
02/29/2024, 12:51 PMpartition_def
, since there is a mixture of partitioned and unpartitioned assets within it, you get the following error: Failure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
This makes sense when you consider manually running the job, you are prompted to pick the partitions for which you wish to run the job. It works fine in that scenario.Scout
02/29/2024, 12:51 PMFailure condition: Has a PartitionsDefinition, so should either have a partition key or a partition key range
, suggests that Dagster expects partition information for the job because it contains partitioned assets, even though the job itself was not defined with a partitions_def
.
To address this, you can implement the following:
1. Automatic Partition Selection: Create a schedule or sensor that provides the necessary partition information when triggering the job. This can be done by computing the partition keys based on the current time or other logic and passing them to the job when it runs.
Here's an example of how you might define a schedule that computes the partition key for the current day and triggers the job with that partition key:
python
from dagster import ScheduleDefinition, daily_schedule
from datetime import datetime
@daily_schedule(
pipeline_name='my_dbt_pipeline',
start_date=datetime(2023, 1, 1),
execution_timezone='UTC'
)
def my_daily_schedule(date):
# Compute the partition key for the current day
partition_key = date.strftime('%Y-%m-%d')
# Return the run configuration with the partition key
return {
'solids': {
'my_partitioned_solid': {
'config': {
'partition_key': partition_key
}
}
}
}
# Assuming 'my_dbt_pipeline' is the name of your pipeline that includes both partitioned and non-partitioned assets
2. Job Definition Without partitions_def
: Define the job without a partitions_def
and ensure that the schedule or sensor provides the partition key when triggering the job. This way, the job itself remains agnostic to partitioning, but the schedule or sensor handles the partition logic.
python
from dagster import define_asset_job, AssetSelection
# Define an asset selection that includes both the partitioned and non-partitioned assets
asset_selection = AssetSelection.assets(
# ... include your dbt assets here ...
)
# Define the job without specifying a partitions_def
my_asset_job = define_asset_job(
name="my_asset_job",
selection=asset_selection,
)
# The schedule defined above will trigger this job with the appropriate partition key
By using a schedule or sensor to provide the partition key, you can ensure that the partitioned assets within the job receive the necessary partition information for execution, while the non-partitioned assets simply run as usual.
Please note that the examples provided are for illustrative purposes and may need to be adjusted to fit your specific use case and Dagster version. If you're using a newer version of Dagster, the way you define schedules or sensors might differ, and you should refer to the latScout
02/29/2024, 12:52 PMAlexander Gregory
02/29/2024, 12:56 PMsolids
has been phased out of Dagster for a long time.
In addition, can you explain what you are trying to do?
Is it:
• Create a job without a partition_def, this contains both partitioned and unpartitioned assets
• Schedule something that computes the current partition and then triggers the job with the relevant partition?Scout
02/29/2024, 12:56 PMpartitions_def
: Create a job that includes both partitioned and unpartitioned assets without specifying a partitions_def
. This job will be able to run both types of assets.
python
from dagster import define_asset_job, AssetSelection
# Define an asset selection that includes both the partitioned and non-partitioned assets
asset_selection = AssetSelection.assets(
# ... include your dbt assets here ...
)
# Define the job without specifying a partitions_def
my_asset_job = define_asset_job(
name="my_asset_job",
selection=asset_selection,
)
2. Create a Schedule to Trigger the Job: Define a schedule that computes the current partition key based on the current time or other logic. This schedule will trigger the job and provide the partition key for the partitioned assets.
python
from dagster import ScheduleDefinition, daily_schedule
from datetime import datetime
@daily_schedule(
job=my_asset_job,
start_date=datetime(2023, 1, 1),
execution_timezone='UTC'
)
def my_daily_schedule(context):
# Compute the partition key for the current day
partition_key = context.scheduled_execution_time.strftime('%Y-%m-%d')
# Return the run configuration with the partition key
return {
'ops': {
'my_partitioned_op': {
'config': {
'partition_key': partition_key
}
}
}
}
In this example, my_daily_schedule
is a schedule that triggers my_asset_job
daily. The context.scheduled_execution_time
provides the time at which the schedule is set to run, which you can use to compute the partition key. The run configuration returned by the schedule should specify the partition key for the partitioned assets.
Please note that the ops
key in the run configuration should be replaced with the appropriate key for your partitioned assets. If your partitioned assets are defined using the @asset
decorator, you may not need to specify the ops
key at all, and the partition key would be provided directly in the run configuration.
This approach allows you to have a single job that handles both partitioned and unpartitioned assets, with a schedule that provides the necessary partition information for the partitioned assets at runtime. The non-partitioned assets will run as usual without any partition context.
Make sure to adjust the code to match the actual names of your assetsScout
02/29/2024, 12:56 PMAlexander Gregory
02/29/2024, 1:04 PMScout
02/29/2024, 1:04 PM@dbt_assets
decorator to define dbt models as Dagster assets, and you have a mix of partitioned and non-partitioned assets, you typically don't need to specify the partition key in the run configuration for non-partitioned assets. Dagster will handle the execution of non-partitioned assets without any partition context. For partitioned assets, you need to provide the partition key when triggering the job so that Dagster knows which partition of the asset to materialize. The schedule's responsibility is to compute the partition key and pass it to the job when it triggers a run. The documentation provides guidance on how to access partition information within an asset when it's being materialized. For example, you can use context.partition_key
within the asset function to get the partition key for the current run. Here's an example of how you might define a schedule that triggers a job with partitioned dbt assets: python from dagster import ScheduleDefinition, daily_schedule from datetime import datetime @daily_schedule( job=my_asset_job, start_date=datetime(2023, 1, 1), execution_timezone='UTC' ) def my_daily_schedule(context): # Compute the partition key for the current day partition_key = context.scheduled_execution_time.strftime('%Y-%m-%d') # Return the run configuration with the partition key return { 'resources': { 'dbt': { 'config': { 'vars': { 'partition_key': partition_key, } } } } }
In this example, the schedule computes the partition key and includes it in the run configuration under the vars
key, which is passed to the dbt resource. The dbt resource can then use this variable to run dbt models for the specified partition. Please note that the exact structure of the run configuration may vary depending on how your dbt resource and assets are set up. You should refer to the documentation for the dbt_assets
decorator and the dbt resource you are using to ensure that you are providing the partition key in the format expected by your dbt models. If you need to pass the partition key to dbt in a different way, such as through command-line arguments or environment variables, you would adjust the run configuration accordingly. Remember to test the schedule to ensure that it correctly triggers the job with the appropriate partition key and that your dbt models are receiving and using the partition key as expected. If you encounter any issues, you may need to adjust the run configuration or consult the documentation for more details on how to configure dbt assets with partitioning in Dagster.Alexander Gregory
02/29/2024, 1:09 PMScout
02/29/2024, 1:09 PM@schedule
decorator, which is the updated way to define schedules in Dagster. When you have a job that includes both partitioned and unpartitioned dbt assets, and you want to trigger this job with a schedule, you need to compute the partition key for the partitioned assets and pass it to the job at runtime. Here's how you can define a schedule using the @schedule
decorator that computes the partition key and triggers the job with that key: python from dagster import schedule, RunRequest from datetime import datetime @schedule( cron_schedule="0 0 * * *", # This is a daily schedule at midnight job=my_asset_job, execution_timezone='UTC' ) def my_daily_schedule(context): # Compute the partition key for the current day partition_key = datetime.now().strftime('%Y-%m-%d') # Construct the run configuration with the partition key run_config = { 'resources': { 'dbt': { 'config': { 'vars': { 'partition_key': partition_key, } } } } } # Return a RunRequest to trigger the job with the computed partition key return RunRequest(run_config=run_config)
In this example, the my_daily_schedule
function is decorated with @schedule
and is set to run daily at midnight UTC. The function computes the partition key for the current day and constructs a run_config
dictionary that includes the partition key under the vars
key, which is how dbt accepts runtime variables. The RunRequest
object is returned with the run_config
to trigger the job with the necessary partition information for the partitioned assets. The unpartitioned assets will run as usual without any partition context. Please adjust the run_config
as necessary to match the configuration expected by your dbt assets and resources. The structure of the run_config
may vary depending on your specific dbt setup. This approach allows you to trigger a job that includes both partitioned and unpartitioned dbt assets with the correct partition information for the partitioned assets. Make sure to test the schedule to ensure that it triggers the job correctly and that the partitioned dbt models are executed with the appropriate partition key.Alexander Gregory
02/29/2024, 1:11 PMrun_config = {
'resources': {
'dbt': {
'config': {
'vars': {
'partition_key': partition_key,
}
}
}
}
}
That to me looks like you are setting a var in dbt, i.e. a model is parameterised. This makes sense, somewhat, but the issue we are having is dagster needing a partition key, not dbt?Scout
02/29/2024, 1:12 PMrun_config
snippet I provided is for setting a variable in dbt, which would be used if your dbt models are parameterized to accept a partition_key
variable.
However, the issue you're facing is with Dagster needing a partition key for the partitioned assets when the job is triggered. To address this, you need to provide the partition key to Dagster in a way that it understands which partitions of the partitioned assets to materialize.
In Dagster, when you define a job that includes partitioned assets, you can use the with_resources
method on the job to provide the resources configuration, including any dbt-specific variables, if needed. However, for the partition key, you would typically use the run_config
to specify which partitions to materialize.
Since the @schedule
decorator is used to trigger a job, and you need to provide the partition key for the partitioned assets, you would use the run_config
to specify the partition key for the Dagster job, not for dbt. The run_config
should include the necessary information for Dagster to know which partitions of the partitioned assets to run.
Here's an updated example of how you might define a schedule that computes the partition key and triggers the job with that key:
python
from dagster import schedule, RunRequest
from datetime import datetime
@schedule(
cron_schedule="0 0 * * *", # This is a daily schedule at midnight
job=my_asset_job,
execution_timezone='UTC'
)
def my_daily_schedule(context):
# Compute the partition key for the current day
partition_key = datetime.now().strftime('%Y-%m-%d')
# Return a RunRequest to trigger the job with the computed partition key
return RunRequest(
run_key=partition_key,
run_config={
# Include any necessary run configuration for the job here
},
tags={
"dagster/partition": partition_key,
}
)
In this updated example, the RunRequest
includes a run_key
and tags
that specify the partition key. The run_config
is left as a comment because it may or may not be necessary to include additional configuration depending on your job's requirements.
When the schedule triggers the job, Dagster will use the partition key provided in the tags
to determine which partitions of the partitioned assets to materialize. The unpartitioned assets will run as usual.
Please ensure that your job and assets are set up to handle the partition key provided in the tags
. If your partitioned assets are defined using the @asset
decorator with a partitions_def
, Dagster will use the partition key to materialize the correct partitions.
Again, I apologize for any confusion, and I hope this clarifies how to usScout
02/29/2024, 1:12 PM@schedule
decorator to trigger a job with partitioned assets in Dagster.Alexander Gregory
02/29/2024, 1:23 PMmanifest = json.loads(dbt_manifest_path.read_text())
daily_dbt_asset_selection = DbtManifestAssetSelection.build(
manifest=manifest,
select="tag:daily",
)
daily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
)
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
partition_key=partition_key,
)
However, I am getting an error from the schedule: `dagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key 2024-02-29
.`. This partition does not exist, however, I naively assumed we could calculate todays partition this way? I'm not sure why this wouldn't work?Scout
02/29/2024, 1:23 PMdagster._core.errors.DagsterUnknownPartitionError: Could not find a partition with key 2024-02-29
, indicates that Dagster is trying to find a partition with the specified key in the partition set defined for your assets, but it cannot find one. This could happen if the partition key you're computing does not correspond to any of the defined partitions for your assets. When you define partitions for an asset in Dagster, you specify a range of valid partition keys (e.g., dates). If you attempt to run a job with a partition key that is not within this range, Dagster will raise an error because it expects all partition keys to be predefined and known ahead of time. In your schedule function daily_configurable_job
, you are computing the partition key as today's date using context.scheduled_execution_time.strftime("%Y-%m-%d")
. This approach is generally correct for generating a partition key for the current day. However, if the current day's partition key has not been defined in the partition set for your assets, Dagster will not be able to find the partition and will raise the DagsterUnknownPartitionError
. To resolve this issue, you need to ensure that the partition key you're generating corresponds to a valid partition in your partition set. Here are a few things to check: 1. Partition Set Definition: Make sure that the DailyPartitionsDefinition
(or other partition definitions) you're using for your assets includes the current day's date. If you're using a static partition set, you may need to update it to include new partitions as time goes on. 2. Partition Generation: If you're using dynamic partitioning, ensure that the logic for generating partitions is correct and includes the current day. 3. Time Zone: Be aware of time zone differences. The scheduled_execution_time
is in UTC by default, and if your partitions are defined in a different time zone, you may need to adjust the computation of the partition key accordingly. 4. Leap Year: The date 2024-02-29
is a leap day. Ensure that your partition set accounts for leap years if you're running this schedule in a leap year. 5. Validation: Before returning the RunRequest
, you can add validation logic to check if the computed partition key exists in the partition set. If it doesn't, you can handle the situation appropriately, such as by skipping the run or logging a warning. If you continue to face issues, you may need to review the partition definitions for your assets and ensure that they are set up to include all expected partition keys. If necessary, you can also reach out to the Dagster community for further assistance.Alexander Gregory
02/29/2024, 1:26 PM@dbt_assets(
manifest=dbt_manifest_path,
select="tag:partitioned,tag:daily",
partitions_def=daily_partition,
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
start, end = context.partition_time_window
dbt_vars = {
"min_date": start.isoformat(),
"max_date": end.isoformat(),
}
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run", "--vars", json.dumps(dbt_vars)], context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
Above is my definition of the partitioned assets. I assume the max_date is set to yesterday? How else should I define this?Scout
02/29/2024, 1:29 PMdaily_partitioned_dbt_assets
function is defined as a set of dbt assets that are partitioned daily. The context.partition_time_window
provides a start and end datetime object that represents the time window for the current partition being materialized. The start
and end
variables are set to the start and end of the partition's time window, respectively. When you use start.isoformat()
and end.isoformat()
, you are converting these datetime objects to ISO 8601 string representations, which are then passed as variables to dbt. The min_date
and max_date
variables are used to parameterize your dbt models. For example, if you have incremental models in dbt that filter data based on a date range, you would use these variables to ensure that each run of the dbt models only processes data for the specific time window of the current partition. Regarding the max_date
, it is not necessarily set to "yesterday" by default. The actual value of max_date
depends on how the DailyPartitionsDefinition
is set up and how the partition time window is calculated for each partition. If you want to ensure that max_date
is always set to "yesterday," you would need to adjust the partition definition or the logic in your asset function to calculate the end of the time window accordingly. Here's an example of how you might adjust the end
time to ensure it represents "yesterday": python from datetime import datetime, timedelta def daily_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): # Get the current date and time now = datetime.now() # Calculate "yesterday" by subtracting one day from the current date yesterday = now - timedelta(days=1) # Convert "yesterday" to ISO format max_date = yesterday.isoformat() # Use the start of the partition time window as the min_date min_date = context.partition_time_window.start.isoformat() dbt_vars = { "min_date": min_date, "max_date": max_date, } # ... rest of your dbt asset function ...
In this adjusted example, max_date
is explicitly set to "yesterday," regardless of the partition time window. You would need to ensure that this logic aligns with your data processing requirements and the expectations of your dbt models. Remember to test your asset definitions to verify that the min_date
and max_date
variables are being set correctly and that your dbt models are using these variables as intended. If you have any further questions or need additional clarification, please let me know.Alexander Gregory
02/29/2024, 1:35 PMScout
02/29/2024, 1:35 PMDailyPartitionsDefinition
by default creates partitions up to the current time, excluding the end date. If end_offset
is set to 0 (the default), the last partition ends before the current time. This means that if you run a materialization for the current day's partition, it will not include data for the entire day, as the partition represents the time window up to the start of the current day. To handle this, you have a couple of options: 1. Run for the Last X Partitions: As you suggested, you can run materializations for the last X partitions to ensure that you're capturing the most recent complete data. This approach allows you to include the latest available data while acknowledging that the current day's partition will not be complete until the day is over. 2. Include Today's Partition with an Offset: If you want to include today's partition, you can adjust the end_offset
parameter when defining your DailyPartitionsDefinition
. Setting end_offset
to 1 will extend the partitions forward so that the schedule tick that runs on day N will fill in day N's partition instead of day N-1. However, keep in mind that materializing today's partition before the day is over will result in partial data for the current day. 3. Materialize After the Day Ends: Another approach is to schedule the materialization to occur after the end of the day, ensuring that the partition for the current day is complete. This could be done by scheduling the materialization to run early in the morning of the following day. 4. Dynamic Partitioning: If your data ingestion pattern requires more flexibility, you might consider using a DynamicPartitionsDefinition
to create partitions dynamically based on the data available. This allows you to define partitions on the fly and can be useful for scenarios where the data arrival pattern is irregular. It's important to choose an approach that aligns with your data processing requirements and the expectations of your downstream consumers. If the data for the current day is needed as soon as possible, you might opt for more frequent materializations with the understanding that the data will be partial. If complete data is required, you would wait until the day is over before materializing the partition for that day. Remember to clearly communicate the partitioning strategy to users of the data so they understand the implications of the chosen approach on the completeness and freshness of the data.Alexander Gregory
02/29/2024, 3:30 PMdaily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
)
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
partition_key=partition_key,
)
Can we also configure the job with a config_schema such that I could parameterise the job so that I can modify the underlying dbt assets say every sunday to run a full refresh?
Focusing on the unpartitioned assets, they are defined like this:
@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run"], context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream() # raise_on_error=False means all tests will run even if upstream test fails
To keep things simple, I simply want to add --full-refresh
arg to the run
dbt command. I know the RunRequest object can accept a run_config.Scout
02/29/2024, 3:30 PMconfig_schema
for your job that includes a configuration option for a full refresh. Then, you can use this configuration in your schedule to conditionally add the --full-refresh
argument to the dbt command.
Here's how you can define a config_schema
for your job and use it in the schedule:
1. Define a config_schema
for your job that includes a full_refresh
option:
python
from dagster import Bool, Field, define_asset_job, AssetSelection
daily_dbt_asset_selection = AssetSelection(...)
daily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)},
)
2. Modify your dbt asset function to accept a configuration and conditionally add the --full-refresh
argument based on the full_refresh
configuration:
python
@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# Check if full_refresh is set in the job's configuration
full_refresh = context.op_config.get("full_refresh", False)
# Add --full-refresh argument conditionally
dbt_run_args = ["run"]
if full_refresh:
dbt_run_args.append("--full-refresh")
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(dbt_run_args, context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Update your schedule to include the run_config
with the full_refresh
option set to True
on Sundays:
python
from dagster import schedule, RunRequest
from datetime import datetime
@schedule(job=daily_dbt_job, cron_schedule="0 0 * * *") # This runs daily at midnight
def daily_configurable_job(context):
# Compute the partition key for the current day
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
# Determine if today is Sunday
is_sunday = context.scheduled_execution_time.weekday() == 6 # 6 is Sunday
# Set full_refresh to True if today is Sunday
run_config = {"full_refresh": is_sunday}
return RunRequest(
run_key=partition_key,
run_config=run_config,
)
In this example, the daily_configurable_job
schedule checks if the current day is Sunday and sets the full_refresh
configuration accordingly. The RunRequest
includes this configuration, which is then used by the dbt asset function to determine whether to add the `--full-Scout
02/29/2024, 3:32 PMcontext.op_config
access in the dbt asset function matches the way you have configured your job and assets. The exact key to access the configuration may vary depending on your setup. This approach allows you to have a configurable job that can perform a full refresh of dbt assets based on the schedule's logic, providing flexibility in how and when dbt models are materialized.Alexander Gregory
02/29/2024, 3:37 PMScout
02/29/2024, 3:37 PMdaily
job and a sunday_full_refresh
job can make the scheduling logic simpler and more explicit. Each job has its own dedicated schedule, and there's no need for conditional logic within the schedule to determine the behavior. This can be easier to understand and maintain, especially for teams with multiple members who might be managing the schedules.
2. Single Configurable Job: A single job with a config_schema
that includes a full_refresh
option can be more flexible and reduce duplication in job definitions. The schedule's logic determines whether to perform a full refresh based on the day of the week. This approach centralizes the logic for the full refresh in one place, which can be beneficial if the logic for determining a full refresh is more complex than just the day of the week.
Regarding the config_schema
, it is a way to define the configuration options for a job or asset in Dagster. The config_schema
specifies the shape and type of the configuration data that can be passed to a job or asset when it runs. This configuration is then accessible within the job or asset through the execution context.
The config_schema
is typically defined within the define_asset_job
function signature or the @asset
decorator for assets. It can also be defined for ops using the @op
decorator. The schema is defined using Dagster's field system, which allows you to specify types, defaults, and validation for configuration values.
Here's an example of defining a config_schema
within the define_asset_job
function:
python
from dagster import Field, Int, define_asset_job, AssetSelection
my_asset_selection = AssetSelection(...)
my_job = define_asset_job(
"my_job",
selection=my_asset_selection,
config_schema={
"example_config": Field(Int, is_required=False, default_value=42),
},
)
In this example, the config_schema
for my_job
includes an example_config
option, which is an integer with a default value of 42.
You wouldn't typically define config_schema
as a standalone object; it's usually defined in the context of a job, asset, or op where it's used to specify the configuration options for that entity.
The higher-level asset config that you mentioned, where you create classes and pass them via the asset function signature, is related to the config_schema
in that it defines the configuration for an asset. However, the config_schema
is more about defining the structure and validation of the configuration data, while the classes you pass to the assScout
02/29/2024, 3:37 PMconfig_schema
is a way to define configuration options for jobs and assets, and it's typically defined within the context of the job or asset definition.Alexander Gregory
02/29/2024, 3:46 PMScout
02/29/2024, 3:46 PMRunRequest
if the conditions for a run are met, or it can return a SkipReason
if there is no work to be done. You can also return None
if you want to skip without providing a reason. Here's a simple example of a sensor that triggers a job run every time it evaluates: python from dagster import sensor, RunRequest, DefaultSensorStatus @sensor(job=my_job) def my_simple_sensor(context): # You can access the sensor's context here, if needed # For example, you can read from the context to get the last run time # last_run_time = context.last_run_time # If the conditions are met, return a RunRequest to trigger a job run return RunRequest( run_key="unique_run_key", # A unique identifier for this run request run_config={}, # The run configuration for the job )
In this example, my_job
is the job that you want to trigger with this sensor. The run_key
is a unique identifier for the run request, which helps Dagster to deduplicate runs and ensure that the same conditions don't trigger multiple runs. The run_config
is the configuration that will be used for the job run; it should match the config_schema
defined for the job. If you want the sensor to only trigger under certain conditions, you can add logic to check for those conditions and only return a RunRequest
when they are met. If the conditions are not met, you can return a SkipReason
or `None`: python from dagster import sensor, RunRequest, SkipReason @sensor(job=my_job) def my_conditional_sensor(context): # Logic to check if conditions are met if are_conditions_met(): # Conditions are met, trigger a job run return RunRequest( run_key="unique_run_key", run_config={}, ) else: # Conditions are not met, skip this evaluation return SkipReason("Conditions are not met for triggering the job.")
In this conditional sensor, are_conditions_met()
is a placeholder for your actual condition-checking logic. If the function returns True
, a RunRequest
is returned, and if it returns False
, a SkipReason
is returned. Sensors are evaluated at a regular interval defined by the minimum_interval_seconds
parameter, which defaults to 30 seconds. You can adjust this interval to control how often the sensor checks for conditions. Remember to replace my_job
with your actual job definition and to implement the are_conditions_met
function with the logic specific to your use case.Alexander Gregory
02/29/2024, 3:47 PMScout
02/29/2024, 3:47 PMdefine_asset_job
function does not have an argument for config_schema
. The config_schema
is used at the asset or op level to define the configuration for those components. When you define an asset job using define_asset_job
, it will automatically include all the assets within the provided selection, and the configuration for those assets is defined at the individual asset level. If you want to provide configuration for the assets within a job, you would do so by passing a run_config
when you execute the job. The run_config
should match the config_schema
defined on the individual assets. Here's an example of how you might define an asset with a config_schema
and then provide a run_config
when executing the job: python from dagster import asset, Field, define_asset_job, AssetSelection @asset(config_schema={"param": Field(str)}) def my_asset(context): param = context.op_config["param"] # Use the param in your asset computation ... # Define an asset job that includes my_asset my_asset_job = define_asset_job( "my_asset_job", selection=AssetSelection.assets(my_asset), ) # When executing the job, provide a run_config that includes the necessary configuration run_config = { "ops": { "my_asset": { "config": { "param": "value", } } } } # Execute the job with the provided run_config result = my_asset_job.execute_in_process(run_config=run_config)
In this example, my_asset
has a config_schema
that expects a string parameter param
. When you execute my_asset_job
, you provide a run_config
that includes the configuration for my_asset
. Please note that the structure of the run_config
must match the expectations of the assets within the job. If your assets have different configuration schemas, the run_config
should include the configuration for each asset accordingly.Alexander Gregory
02/29/2024, 3:50 PM@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
full_refresh = context.op_config.get("full_refresh", False)
dbt_run_args = ["run"]
if full_refresh:
dbt_run_args.append("--full-refresh")
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(dbt_run_args, context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream() # raise_on_error=False means all tests will run even if upstream test fails
The intention was to use the scheduled function here:
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday
is_minute_event = context.scheduled_execution_time.minute % 2 == 0
run_config = {
"full_refresh": is_minute_event,
}
return RunRequest(
partition_key=partition_key,
run_config=run_config,
)
To determine whether we should run a full refresh or not. However, it seems that doesn't quite work as you explained prior?Scout
02/29/2024, 3:50 PMrun_config
in the RunRequest
should match the structure expected by the assets within the job. In the case of dbt assets defined with the @dbt_assets
decorator, the run_config
should specify the configuration for the dbt resource, not directly at the top level. Here's how you can adjust the run_config
in the RunRequest
to pass the full_refresh
configuration to the dbt assets: python from dagster import schedule, RunRequest @schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *") def daily_configurable_job(context): partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d") is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday is_minute_event = context.scheduled_execution_time.minute % 2 == 0 run_config = { "ops": { "non_partitioned_dbt_assets": { "config": { "full_refresh": is_minute_event or is_sunday, } } } } return RunRequest( run_key=f"run_{partition_key}", run_config=run_config, )
In this updated example, the run_config
is structured with an "ops"
key, which then contains the configuration for the specific asset. The asset name ("non_partitioned_dbt_assets"
) must match the name used within the job. The "config"
for the asset includes the full_refresh
option, which is set based on whether the current minute is an even number or if it's Sunday. Please ensure that the asset name in the run_config
matches the name used in the job. If you have multiple dbt assets or if the asset names are different, you will need to adjust the run_config
accordingly. The run_key
in the RunRequest
is a unique identifier for the run, which I've constructed using the partition_key
to ensure uniqueness. You can adjust this as needed for your use case. This approach should allow you to conditionally pass the full_refresh
configuration to your dbt assets based on the logic defined in your schedule.Alexander Gregory
02/29/2024, 3:53 PMScout
02/29/2024, 3:53 PMpython daily_dbt_job = define_asset_job( "daily_dbt", selection=daily_dbt_asset_selection, # No need to specify run_config here since it's the regular run ) @schedule(job=daily_dbt_job, cron_schedule="0 0 * * 1-6") # Monday to Saturday def daily_job_schedule(context): return RunRequest( run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), )
2. A separate job for the full refresh: python sunday_full_refresh_dbt_job = define_asset_job( "sunday_full_refresh_dbt", selection=daily_dbt_asset_selection, config={ "ops": { "non_partitioned_dbt_assets": { "config": { "full_refresh": True, } } } }, ) @schedule(job=sunday_full_refresh_dbt_job, cron_schedule="0 0 * * 0") # Sunday def sunday_full_refresh_schedule(context): return RunRequest( run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), )
In this setup, you have two jobs with their own schedules and run configurations. The daily_dbt_job
runs every day except Sunday without a full refresh, and the sunday_full_refresh_dbt_job
runs only on Sunday with a full refresh. Using the config
parameter of the define_asset_job
function allows you to specify the run configuration for the job at definition time. This configuration will apply to all runs of the job unless overridden at runtime. If you prefer to keep a single job and decide at runtime whether to perform a full refresh, you can continue using the run_config
within the schedule function to dynamically set the configuration based on the day of the week or other conditions. Both approaches are valid, and the choice depends on your preference for organizing jobs and schedules. Separate jobs can make the system more understandable, as the intent of each job is clear from its name and associated schedule. On the other hand, a single job with dynamic configuration can reduce duplication and keep all related logic in one place.Alexander Gregory
02/29/2024, 4:00 PMdaily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
)
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday
is_minute_event = context.scheduled_execution_time.minute % 2 == 0
run_config = {
"ops": {
"non_partitioned_dbt_assets": {
"config": {
"full_refresh": is_minute_event,
},
},
},
}
return RunRequest(
partition_key=partition_key,
run_config=run_config,
)
However, I am getting an error dagster._core.errors.DagsterInvalidConfigError: Error in config for job daily_dbt
Error 1: Received unexpected config entry "full_refresh" at path root:ops:non_partitioned_dbt_assets:config. Expected: "{ }".
Scout
02/29/2024, 4:00 PMfull_refresh
configuration entry for the non_partitioned_dbt_assets
within the run_config
. This is because the @dbt_assets
decorator does not expose a config_schema
that defines full_refresh
as a valid configuration key.
In Dagster, the config_schema
for an asset or op defines the structure and type of configuration that can be passed to it. Since @dbt_assets
generates assets based on your dbt project, and there isn't a direct way to pass additional configuration like full_refresh
through the @dbt_assets
decorator, you'll need to handle this differently.
To resolve this issue, you can modify the non_partitioned_dbt_assets
function to accept a custom configuration class that includes the full_refresh
option. However, since @dbt_assets
does not support passing a custom configuration class directly, you would need to use the context.resources
to access the configuration for full_refresh
.
Here's an example of how you might adjust your asset and schedule definitions:
1. Define a custom resource for the full_refresh
configuration:
python
from dagster import resource, Field, Bool
@resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)})
def dbt_full_refresh_resource(context):
return context.resource_config["full_refresh"]
2. Update your non_partitioned_dbt_assets
to use the custom resource:
python
@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
full_refresh = context.resources.dbt_full_refresh_resource
dbt_run_args = ["run"]
if full_refresh:
dbt_run_args.append("--full-refresh")
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(dbt_run_args, context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Update your schedule to provide the full_refresh
configuration via resources:
```python
@schedule(job=daily_dbt_job, cron_schedule="*/1 * * * *")
def daily_configurable_job(context: ScheduleEvaluationContext):
partition_key = context.scheduled_execution_time.strftime("%Y-%m-%d")
is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday
run_config = {
"resources": {
"dbt_full_refresh_resource": {
"config": {
"full_refresh": is_sunday,
},
},
},
}
return RunRequest(
run_key=f"run_{partition_key}",
run_config=run_configScout
02/29/2024, 4:01 PMdbt_full_refresh_resource
is defined as a resource with a config_schema
that includes the full_refresh
option. The non_partitioned_dbt_assets
function accesses the full_refresh
configuration through context.resources
. The schedule provides the full_refresh
configuration via the resources
section of the run_config
. Please ensure that you include the dbt_full_refresh_resource
in your repository definition so that it's available to the job when it runs. This approach allows you to pass custom configuration to your dbt assets indirectly through resources.Alexander Gregory
02/29/2024, 4:06 PMdbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir))
Now, technically it can take a list of global flags, including one for full-refresh. However, we need to support being able to run incremental and full_refresh, so we don't want to define the same underlying model twice as an asset.
Perhaps you could show an example of defining the dbt resource, adding it to the Definitions object and then how we can configure our dbt assets, job and schedule to handle being able to dynamically run full refresh or notScout
02/29/2024, 4:06 PMfull_refresh
flag based on tags or configuration passed at runtime. Here's how you can set up your dbt resource, assets, job, and schedule to handle this:
1. Define a dbt resource that accepts a full_refresh
flag:
python
from dagster import resource, Field, Bool, make_values_resource
@resource({
"full_refresh": Field(Bool, is_required=False, default_value=False)
})
def dbt_cli_with_refresh(context):
full_refresh_flag = ["--full-refresh"] if context.resource_config["full_refresh"] else []
return DbtCliResource(
project_dir=os.fspath(dbt_project_dir),
global_flags=full_refresh_flag
)
2. Update your non_partitioned_dbt_assets
to use the dbt_cli_with_refresh
resource:
python
@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# The dbt resource now includes the full_refresh flag if set
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(["run"], context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream()
3. Add the dbt_cli_with_refresh
resource to your repository definition:
python
from dagster import repository, define_asset_job, AssetSelection
@repository
def my_repository():
return [
dbt_cli_with_refresh,
non_partitioned_dbt_assets,
# ... other assets and jobs ...
]
4. Define your job without specifying `full_refresh`:
python
daily_dbt_job = define_asset_job(
"daily_dbt",
selection=daily_dbt_asset_selection,
)
5. Define a schedule that sets the full_refresh
flag based on the desired condition:
python
from dagster import schedule, RunRequest
@schedule(job=daily_dbt_job, cron_schedule="0 0 * * *") # Daily at midnight
def daily_configurable_job(context):
is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday
run_config = {
"resources": {
"dbt_cli_with_refresh": {
"config": {
"full_refresh": is_sunday,
}
}
}
}
return RunRequest(
run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"),
run_config=run_config,
)
In this setup, the dbt_cli_with_refresh
resource is configured to include the --full-refresh
flag when the full_refresh
configuration is True
. The daily_configurable_job
schedule checks if it's Sunday and sets theScout
02/29/2024, 4:07 PMfull_refresh
flag accordingly in the run_config
. When the job runs, it will use the dbt_cli_with_refresh
resource, which will have the --full-refresh
flag included if the condition is met. This approach allows you to dynamically control whether a full refresh should occur without needing to define the same dbt model as multiple assets. The full_refresh
behavior is controlled at the resource level, which is then used by the dbt assets during job execution.Alexander Gregory
02/29/2024, 4:11 PMScout
02/29/2024, 4:11 PMrun_config
based on the conditions you specify. This keeps the dynamic logic at the scheduling level, which can be more transparent than embedding it within assets. 3. Custom Sensor Logic: Instead of using a schedule, you could use a sensor to trigger runs with more complex logic. The sensor can check for conditions and decide whether to trigger a run with a full refresh or not. Here's an example of how you might use a sensor to trigger a job with or without a full refresh: python from dagster import sensor, RunRequest, repository, DefaultSensorStatus @sensor(job=daily_dbt_job) def full_refresh_sensor(context): is_sunday = context.scheduled_execution_time.weekday() == 6 # sunday if is_sunday: run_config = { "resources": { "dbt": { "config": { "global_flags": ["--full-refresh"] } } } } return RunRequest( run_key=context.scheduled_execution_time.strftime("%Y-%m-%d"), run_config=run_config, ) else: # Skip the full refresh on other days return None @repository def my_repository(): return [ daily_dbt_job, full_refresh_sensor, # ... other assets and jobs ... ]
In this example, the full_refresh_sensor
checks if it's Sunday and triggers a run with the --full-refresh
flag if the condition is met. On other days, it returns None
, indicating no run is requested. This approach allows you to encapsulate the dynamic full-refresh logic within the sensor, keeping your job and asset definitions static and easier to understand. The sensor becomes the place where dynamic behavior is defined, which can be a more natural fit for such logic. Ultimately, the best approach depends on your specific requirements and preferences for organizing and maintaining your Dagster codebase. Each method has its trade-offs, and you should choose the one that aligns best with your team's practices and the complexity of your workflows.Alexander Gregory
02/29/2024, 4:20 PMScout
02/29/2024, 4:20 PMrun_config
to a job execution in Dagster, it replaces the existing configuration for that run. If you want to modify only a part of the configuration without losing the existing settings, you need to ensure that the run_config
you provide includes all necessary configuration values, not just the changes. To get around the issue of losing existing configuration, you can: 1. Merge Configurations: Programmatically merge the new configuration with the existing one before passing it to the job execution. This way, you can update only the parts of the configuration that need to change while preserving the rest. 2. Use Defaults and Overrides: Define default configurations within your assets or resources, and use the run_config
to override specific values as needed. This allows you to maintain a base configuration that is always applied unless explicitly overridden. 3. Configurable Resources: If the dynamic part of your configuration is related to resources, you can define resources with a config_schema
that includes default values. Then, you can override these defaults with the run_config
at runtime. Here's an example of how you might define a resource with a config_schema
that includes a default value: python from dagster import resource, Field, Bool @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def dbt_resource(context): full_refresh = context.resource_config["full_refresh"] # Return the dbt resource with the appropriate configuration return DbtCliResource( project_dir=os.fspath(dbt_project_dir), global_flags=["--full-refresh"] if full_refresh else [] )
When you want to trigger a full refresh, you can provide a run_config
that overrides the default value: python run_config = { "resources": { "dbt_resource": { "config": { "full_refresh": True, } } } }
If you need to merge configurations programmatically, you can use Python's dictionary update methods or other utilities to combine the existing configuration with the new values before passing it to the job execution. Keep in mind that if you're using the Dagster UI (Dagit) to launch runs, you can manually adjust the run_config
for each run as needed. However, for automated runs triggered by schedules or sensors, you'll need to ensure that the run_config
is constructed correctly in code.Alexander Gregory
02/29/2024, 4:22 PMScout
02/29/2024, 4:22 PMrun_config
when the job starts. If the next run of the job provides different connection information in the run_config
, a new instance of the resource will be created with the new configuration. This instantiation behavior is what enables dynamic configuration of resources, such as deciding whether to perform a full refresh in a dbt job based on a condition evaluated at runtime. Each run can have its own run_config
that specifies whether the full_refresh
flag should be set for the dbt resource, and the resource will be instantiated accordingly. Here's a simplified example of how a resource might be defined and used in a job: python from dagster import resource, Field, Bool, job, op @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def dbt_resource(context): full_refresh = context.resource_config["full_refresh"] # Logic to handle the full_refresh flag ... @op(required_resource_keys={"dbt_resource"}) def my_op(context): # This op can access the dbt_resource which will be instantiated with the current run's config ... @job(resource_defs={"dbt_resource": dbt_resource}) def my_job(): my_op()
When you execute my_job
, the dbt_resource
will be instantiated with the configuration provided in the run_config
. If you trigger another run of my_job
with a different run_config
, a new instance of dbt_resource
will be created with that configuration. This design allows for flexibility and ensures that each job run can behave differently based on the provided configuration, making it suitable for scenarios where job behavior needs to change dynamically.Alexander Gregory
02/29/2024, 4:27 PMScout
02/29/2024, 4:27 PMDbtCliResource
that includes a full_refresh
option, you can subclass ConfigurableResource
and set the `DbtCliResource`'s _global_config_flags_
based on the full_refresh
configuration. Here's how you can define such a resource and use it to periodically run full refreshes: 1. Define the custom `DbtCliResource`: python from dagster import resource, Field, Bool from dagster_dbt import DbtCliResource @resource(config_schema={"full_refresh": Field(Bool, is_required=False, default_value=False)}) def custom_dbt_cli_resource(context): full_refresh = context.resource_config["full_refresh"] global_flags = ["--full-refresh"] if full_refresh else [] return DbtCliResource( project_dir=os.fspath(dbt_project_dir), global_flags=global_flags )
2. Use the custom resource in your dbt assets: python from dagster import AssetExecutionContext, asset from dagster_dbt import dbt_assets @dbt_assets def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): # The dbt resource now includes the full_refresh flag if set yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource: python from dagster import define_asset_job, AssetSelection my_asset_selection = AssetSelection.assets(my_dbt_assets) my_dbt_job = define_asset_job( "my_dbt_job", selection=my_asset_selection, resource_defs={"dbt": custom_dbt_cli_resource} )
4. Create a schedule that triggers the job with the full_refresh
option set periodically (e.g., every Sunday): python from dagster import schedule, RunRequest from datetime import datetime @schedule(job=my_dbt_job, cron_schedule="0 0 * * 0") # Every Sunday at midnight def weekly_full_refresh_schedule(context): run_config = { "resources": { "dbt": { "config": { "full_refresh": True, } } } } return RunRequest( run_key=datetime.now().isoformat(), run_config=run_config, )
In this setup, the weekly_full_refresh_schedule
is configured to trigger the my_dbt_job
every Sunday with the full_refresh
option set to True
. The custom_dbt_cli_resource
will be instantiated with the --full-refresh
flag included in the global_flags
for that run, causing dbt to perform a full refresh of the models. This approach allows you to have a configurable dbt resource that can be dynamically controlled by the schedule, enabling periodic full refreshes without the need to define multiple jobs or assets.Alexander Gregory
02/29/2024, 4:34 PMConfigurableResource
. Additionally, that hasn't solved the issue of the entire configuration being overwritten when you dynamically provide the run config.
So to reiterate:
• I define my dbt assets using the dbt_assets decorator
• I define a dbt resource using the DbtCliResource
• I want to run a selection of assets daily, some will be incremental and others not. Always run incremental for those that are though.
• I then want to run a full_fresh once a week. This can be configured in many places: resource itself, not the run config due to it overwriting things, the dbt_assets decorated function itself.
We cannot define multiple assets for the same underlying model. I need to trigger the job run via a schedule which returns a RunRequest.Scout
02/29/2024, 4:34 PMConfigurableResource
that includes a full_refresh
option and use it to periodically run full refreshes without overwriting the entire configuration, you can follow these steps:
1. Define a subclass of ConfigurableResource
that includes the full_refresh
option:
python
from dagster import ResourceDefinition
from dagster_dbt import dbt_cli_resource
from dagster.core.execution.resources_init import InitResourceContext
class CustomDbtCliResource(ResourceDefinition):
def __init__(self, project_dir):
self.project_dir = project_dir
super().__init__(resource_fn=self.create_resource)
def create_resource(self, context: InitResourceContext):
full_refresh = context.resource_config.get("full_refresh", False)
global_flags = ["--full-refresh"] if full_refresh else []
return dbt_cli_resource(
project_dir=self.project_dir,
global_flags=global_flags
)
2. Use the custom resource in your dbt assets:
python
from dagster_dbt import dbt_assets
@dbt_assets
def my_dbt_assets(context, dbt):
# The dbt resource now includes the full_refresh flag if set
yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource:
python
from dagster import define_asset_job, AssetSelection
my_asset_selection = AssetSelection.assets(my_dbt_assets)
my_dbt_job = define_asset_job(
"my_dbt_job",
selection=my_asset_selection,
resource_defs={"dbt": CustomDbtCliResource(project_dir=dbt_project_dir)}
)
4. Create a schedule that triggers the job with the full_refresh
option set periodically (e.g., every Sunday):
python
from dagster import schedule, RunRequest
from datetime import datetime
@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0") # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
# Only set the full_refresh option in the resource config
resource_config = {
"dbt": {
"config": {
"full_refresh": True,
}
}
}
# Merge the new resource config with the existing base config
run_config = merge_dicts(context.scheduled_execution_time, resource_config)
return RunRequest(
run_key=datetime.now().isoformat(),
run_config=run_config,
)
In this setup, the weekly_full_refresh_schedule
is configured to trigger the my_dbt_job
every Sunday with the full_refresh
option set to True
. The CustomDbtCliResource
will be instantiated with the --full-refresh
flag included in the global_flags
for that run, causing dbt to perform a full refresh of the models.
The merge_dicts
function is a utility function that you would need to implement to merge the new resource configurationScout
02/29/2024, 4:35 PMAlexander Gregory
02/29/2024, 4:45 PM@dbt_assets(
manifest=dbt_manifest_path,
exclude="tag:partitioned",
dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)),
)
def non_partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_run_args = ["run"]
if full_refresh: # replace with check day?
dbt_run_args.append("--full-refresh")
yield from dbt.cli(["seed"], context=context).stream()
yield from dbt.cli(dbt_run_args, context=context).stream()
yield from dbt.cli(["test"], context=context, raise_on_error=False).stream() # raise_on_error=False means all tests will run even if upstream test fails
Is there anything wrong with this approach. What are the best practises in Dagster that you would suggest given the constraints.Scout
02/29/2024, 4:45 PMdbt_assets
function, you can control the behavior of the dbt run based on conditions evaluated at runtime. This method keeps your job and asset definitions static and moves the dynamic logic into the asset execution context, which can be a good practice when you want to avoid duplicating asset definitions. Here are some best practices to consider: 1. Clear Logic: Ensure that the logic for determining whether to perform a full refresh is clear and easy to understand. For example, you could encapsulate the logic in a well-named function: python def should_perform_full_refresh(context: AssetExecutionContext) -> bool: # Implement the logic to determine if a full refresh should be performed # For example, check if today is Sunday: return context.scheduled_execution_time.weekday() == 6
2. Documentation: Document the behavior of your assets, especially if the logic for triggering a full refresh is not immediately visible in the job or schedule definitions. This helps other developers understand when and why a full refresh will occur. 3. Testing: Test the asset logic under different conditions to ensure that the full refresh behavior works as expected. 4. Scheduling: If you're using a schedule to trigger the job that includes these assets, make sure the schedule aligns with the refresh logic. For example, if the full refresh is supposed to happen every Sunday, the schedule should reflect that. 5. Resource Configuration: If the full refresh flag is something that could be used by multiple assets or jobs, consider using a resource to hold this configuration. This allows you to centralize the configuration and avoid repeating the same logic in multiple places. 6. Monitoring: Implement monitoring or alerting to track when full refreshes occur, especially if they are resource-intensive operations. This can help you quickly identify and address any issues that arise during these operations. Given these best practices, the approach you've taken is reasonable. It allows you to maintain a single set of dbt assets and control the full refresh behavior dynamically, without complicating your job and schedule definitions. Just make sure to document the behavior and test it thoroughly to ensure it meets your needs.Alexander Gregory
02/29/2024, 4:49 PMScout
02/29/2024, 4:49 PM@dbt_assets
decorator, you need to specify which resource to use, and it's not straightforward to switch between different resources (like DbtCliResource
and DbtCliFullRefreshResource
) at the job or schedule level in Dagster.
Configurable resources can indeed be useful in this scenario. A configurable resource allows you to define a resource that can be configured at runtime, typically through the run_config
. However, as you've noted, you want to avoid dealing with merging configurations in the RunRequest
.
Here's how you could use a configurable resource to control the full_refresh
behavior:
1. Define a configurable resource that includes the full_refresh
option:
python
from dagster import resource, Field, Bool
from dagster_dbt import dbt_cli_resource
@resource({
"full_refresh": Field(Bool, is_required=False, default_value=False)
})
def configurable_dbt_cli_resource(context):
full_refresh = context.resource_config["full_refresh"]
global_flags = ["--full-refresh"] if full_refresh else []
return dbt_cli_resource(
project_dir=os.fspath(dbt_project_dir),
global_flags=global_flags
)
2. Use the configurable resource in your dbt assets:
python
from dagster_dbt import dbt_assets
@dbt_assets
def my_dbt_assets(context, dbt):
# The dbt resource now includes the full_refresh flag if set
yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the configurable dbt resource:
python
from dagster import define_asset_job, AssetSelection
my_asset_selection = AssetSelection.assets(my_dbt_assets)
my_dbt_job = define_asset_job(
"my_dbt_job",
selection=my_asset_selection,
resource_defs={"dbt": configurable_dbt_cli_resource}
)
4. Create a schedule that triggers the job with the full_refresh
option set periodically (e.g., every Sunday):
python
from dagster import schedule, RunRequest
from datetime import datetime
@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0") # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
run_config = {
"resources": {
"dbt": {
"config": {
"full_refresh": True,
}
}
}
}
return RunRequest(
run_key=datetime.now().isoformat(),
run_config=run_config,
)
In this setup, the weekly_full_refresh_schedule
is configured to trigger the my_dbt_job
every Sunday with the full_refresh
option set to True
. The configurable_dbt_cli_resource
will be instantiated with the --full-refresh
flag included in the global_flags
for that run, causing dbt to perform a full refresh of the models.
This approach allows you to control the fuScout
02/29/2024, 4:50 PMfull_refresh
configuration is provided directly in the run_config
of the RunRequest
, and there's no need to merge configurations since the run_config
provided at the schedule level is intended to be the complete configuration for that run.Alexander Gregory
02/29/2024, 4:52 PMScout
02/29/2024, 4:52 PM@resource
decorator is part of the legacy resource system in Dagster. To define a configurable resource without using the @resource
decorator, you can subclass ConfigurableResource
directly. Here's how you can create a subclass of ConfigurableResource
for the DbtCliResource
that includes a full_refresh
option:
1. Define a subclass of `ConfigurableResource`:
python
from dagster import ConfigurableResource
from dagster_dbt import dbt_cli_resource
class CustomDbtCliResource(ConfigurableResource):
def __init__(self, project_dir):
self.project_dir = project_dir
def setup_for_execution(self, context):
full_refresh = context.resource_config.get("full_refresh", False)
global_flags = ["--full-refresh"] if full_refresh else []
return dbt_cli_resource(
project_dir=self.project_dir,
global_flags=global_flags
)
2. Use the custom resource in your dbt assets:
python
from dagster_dbt import dbt_assets
@dbt_assets
def my_dbt_assets(context, dbt):
# The dbt resource now includes the full_refresh flag if set
yield from dbt.cli(["run"], context=context).stream()
3. Define a job that uses the custom dbt resource:
python
from dagster import define_asset_job, AssetSelection
my_asset_selection = AssetSelection.assets(my_dbt_assets)
my_dbt_job = define_asset_job(
"my_dbt_job",
selection=my_asset_selection,
resource_defs={"dbt": CustomDbtCliResource(project_dir=dbt_project_dir)}
)
4. Create a schedule that triggers the job with the full_refresh
option set periodically (e.g., every Sunday):
python
from dagster import schedule, RunRequest
from datetime import datetime
@schedule(job=my_dbt_job, cron_schedule="0 0 * * 0") # Every Sunday at midnight
def weekly_full_refresh_schedule(context):
run_config = {
"resources": {
"dbt": {
"config": {
"full_refresh": True,
}
}
}
}
return RunRequest(
run_key=datetime.now().isoformat(),
run_config=run_config,
)
In this setup, the weekly_full_refresh_schedule
is configured to trigger the my_dbt_job
every Sunday with the full_refresh
option set to True
. The CustomDbtCliResource
will be instantiated with the --full-refresh
flag included in the global_flags
for that run, causing dbt to perform a full refresh of the models.
This approach allows you to control the full refresh behavior dynamically at the schedule level without the need to define multiple resources for different refresh behaviors. The full_refresh
configuration is provided directly in the run_config
of the RunRequest
, and there's no need to merge configurations since the `run_cAlexander Gregory
02/29/2024, 4:56 PMScout
02/29/2024, 4:56 PM@resource
decorator is not deprecated in Dagster. However, if you prefer not to use the @resource
decorator and instead want to use a class-based approach for defining resources, you can directly subclass ResourceDefinition
. This allows you to create more complex resource classes with custom initialization logic. Here's an example of how you can define a custom DbtCliResource
by subclassing `ResourceDefinition`: python from dagster import ResourceDefinition from dagster_dbt import dbt_cli_resource class CustomDbtCliResource(ResourceDefinition): def __init__(self, project_dir, full_refresh): self.project_dir = project_dir self.full_refresh = full_refresh super().__init__(resource_fn=self._resource_fn) def _resource_fn(self, _): global_flags = ["--full-refresh"] if self.full_refresh else [] return dbt_cli_resource( project_dir=self.project_dir, global_flags=global_flags ) # Usage in job definition my_dbt_resource = CustomDbtCliResource(project_dir=dbt_project_dir, full_refresh=False) my_dbt_job = define_asset_job( "my_dbt_job", selection=my_asset_selection, resource_defs={"dbt": my_dbt_resource} )
In this example, CustomDbtCliResource
is a subclass of ResourceDefinition
that takes project_dir
and full_refresh
as initialization arguments. The _resource_fn
method is used to create an instance of the DbtCliResource
with the appropriate global_flags
. To dynamically control the full_refresh
behavior, you can create instances of CustomDbtCliResource
with different full_refresh
values and pass them to different job definitions or use a factory pattern to create the resource instance based on runtime conditions. Regarding schedules and dynamic behavior, you would still need to manage the full_refresh
flag at runtime. One way to do this is by using a sensor or a schedule that checks the condition and triggers a job with the appropriate resource instance. Please note that the class-based approach for resources is more complex and less common than using the @resource
decorator. The decorator-based approach is generally recommended for its simplicity and ease of use, unless you have specific requirements that necessitate a class-based approach.Scout
02/29/2024, 4:58 PMAlexander Gregory
02/29/2024, 5:01 PMScout
02/29/2024, 5:01 PMresource_defs
is not a parameter you can pass directly to define_asset_job
. Instead, resources are associated with assets or jobs at a higher level in Dagster, typically when constructing a repository or defining a job using the to_job
method on a graph. Given the constraints and the need to dynamically control whether a full refresh should occur, you can consider using a sensor to trigger the job with the appropriate configuration. The sensor can check the current day and decide whether to include the --full-refresh
flag in the run_config
for the dbt CLI resource. Here's an example of how you might use a sensor to achieve this: python from dagster import sensor, RunRequest, repository, define_asset_job, AssetSelection from datetime import datetime # Define your dbt assets using the dbt_assets decorator @dbt_assets(...) def my_dbt_assets(...): ... # Define your job that includes the dbt assets my_dbt_job = define_asset_job( "my_dbt_job", selection=AssetSelection.assets(my_dbt_assets), # Resources are defined at the repository level, not here ) # Define a sensor that checks the current day and decides whether to run a full refresh @sensor(job=my_dbt_job) def my_dbt_full_refresh_sensor(context): # Check if today is Sunday (or any other condition you want to use) is_sunday = datetime.now().weekday() == 6 # Construct the run configuration with the full_refresh flag for the dbt CLI resource run_config = { "resources": { "dbt": { "config": { "global_flags": ["--full-refresh"] if is_sunday else [] } } } } # Trigger a run with the constructed run configuration return RunRequest( run_key=context.cursor, run_config=run_config, ) # Define your repository that includes the job and sensor @repository def my_repository(): return [ my_dbt_job, my_dbt_full_refresh_sensor, # Include other assets, jobs, sensors, and resources as needed ]
In this setup, the my_dbt_full_refresh_sensor
checks if it's Sunday and constructs a run_config
that includes the --full-refresh
flag for the dbt CLI resource if the condition is met. When the sensor triggers a run, it will use the provided run_config
, which will apply the full refresh only if it's Sunday. This approach allows you to maintain a single job definition and control the full refresh behavior dynamically using a sensor, without the need to define multiple jobs or manage merging configurations.