Corbett Carrel
02/09/2024, 12:33 AMScout
02/09/2024, 12:33 AMDailyPartitionsDefinition
.
2. Create a custom schedule: You will need to write a custom schedule function that determines which partitions are missing and should be backfilled. This function will be invoked according to the schedule you set (every 30 days in your case).
3. Calculate the partitions to backfill: Inside your custom schedule function, calculate the date range that covers the last 30 partitions. You can use datetime operations to find the start and end dates for the partitions you want to backfill.
4. Create RunRequests for the missing partitions: For each of the missing partitions within the calculated date range, create a RunRequest
object. This object should specify the partition key that corresponds to the partition you want to backfill.
5. Set the schedule's cron expression: Define the cron expression for your schedule to trigger every 30 days. For example, if you want the schedule to run on the first day of every month, your cron expression would be "0 0 1 * *"
.
6. Register the schedule: Add the schedule to your repository so that Dagster is aware of it and can manage its execution.
Here is a conceptual example of what the custom schedule function might look like:
python
from datetime import datetime, timedelta
from dagster import schedule, RunRequest
@schedule(cron_schedule="0 0 1 * *", job=my_partitioned_job)
def my_monthly_backfill_schedule(context):
today = datetime.today()
thirty_days_ago = today - timedelta(days=30)
# Assuming you have a function to get the list of missing partition keys
# for the last 30 days
missing_partition_keys = get_missing_partition_keys(thirty_days_ago, today)
run_requests = []
for partition_key in missing_partition_keys:
run_requests.append(RunRequest(run_key=partition_key, run_config={}))
return run_requests
Please note that the function get_missing_partition_keys
is hypothetical and would need to be implemented based on your specific logic to determine which partitions are missing and need to be backfilled.
Remember to replace my_partitioned_job
with the actual job that you want to schedule for backfilling. The cron expression "0 0 1 * *"
is just an example; you should adjust it to match your exact scheduling needs.
Keep in mScout
02/09/2024, 12:34 AMCorbett Carrel
02/09/2024, 12:36 AMScout
02/09/2024, 12:36 AMall_partitions_materialized
that can be used to check if a provided list of partitions have been materialized for a particular asset. However, this method checks all materializations for the asset and does not directly provide a list of missing partitions.
To find the missing partitions, you would typically need to:
1. Define the range of partitions you want to check.
2. Use the Dagster instance to get the materialization events for the asset.
3. Compare the expected partitions with the materialized ones to identify which are missing.
Here's a conceptual example of how you might write a function to find missing partitions:
python
from dagster import DagsterInstance, AssetKey
def find_missing_partitions(asset_key, partition_keys):
instance = DagsterInstance.get()
materialized_partitions = set(
event.partition
for event in instance.get_event_records(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
)
if event.is_dagster_event and event.dagster_event.partition
)
missing_partitions = [
partition_key for partition_key in partition_keys
if partition_key not in materialized_partitions
]
return missing_partitions
# Example usage:
# Define your asset key and the range of partition keys you want to check
my_asset_key = AssetKey(["my_asset"])
my_partition_keys = ["2023-01-01", "2023-01-02", "2023-01-03"] # etc.
# Find the missing partitions
missing_partitions = find_missing_partitions(my_asset_key, my_partition_keys)
print("Missing partitions:", missing_partitions)
In this example, my_asset_key
should be replaced with the actual AssetKey
of your partitioned asset, and my_partition_keys
should be a list of partition keys that you expect to be materialized.
Please note that this is a simplified example. Depending on your specific use case and the partitioning scheme of your asset, you may need to adjust the logic to correctly identify the range of expected partitions and to handle any nuances related to your asset's partitioning. [UI] Partitioned Asset materialization m... button Thank you for your feedback!Corbett Carrel
02/09/2024, 12:50 AMScout
02/09/2024, 12:50 AMpython from dagster import AssetKey, DagsterInstance def get_last_materialization_time(asset_key): instance = DagsterInstance.get() latest_materialization = instance.get_latest_materialization_event(asset_key=asset_key) if latest_materialization is not None: return latest_materialization.timestamp else: return None # Example usage: # Define your asset key my_asset_key = AssetKey(["my_asset"]) # Get the last materialization time last_materialization_time = get_last_materialization_time(my_asset_key) if last_materialization_time: print(f"The asset was last materialized at: {last_materialization_time}") else: print("The asset has not been materialized yet.")
In this example, my_asset_key
should be replaced with the actual AssetKey
of your asset. The function get_last_materialization_time
will return the timestamp of the last materialization event if it exists, or None
if the asset has never been materialized. Please ensure that you have the appropriate context to access the Dagster instance, as this code assumes you are running within a Dagster environment where DagsterInstance.get()
can be called to retrieve the current instance.Scout
02/09/2024, 12:51 AMget_latest_materialization_event
method provided by the Dagster instance. This method allows you to query for the most recent materialization event of a specific asset. Here's an example of how you can use this method: python from dagster import AssetKey, DagsterInstance # Get the Dagster instance instance = DagsterInstance.get() # Define the asset key for which you want to find the last materialization time asset_key = AssetKey("my_asset") # Get the latest materialization event for the asset latest_materialization = instance.get_latest_materialization_event(asset_key=asset_key) # Extract the timestamp from the latest materialization event if latest_materialization: last_materialization_time = latest_materialization.timestamp print(f"The asset was last materialized at: {last_materialization_time}") else: print("The asset has not been materialized yet.")
In this code snippet, replace "my_asset"
with the name of your actual asset. The get_latest_materialization_event
method will return the latest materialization event for the given AssetKey
, from which you can extract the timestamp to find out when the asset was last materialized. If there is no materialization event for the asset, it means the asset has not been materialized yet.Corbett Carrel
02/12/2024, 3:29 PMScout
02/12/2024, 3:29 PM@asset
decorator or the @dbt_assets
decorator if you are working with DBT models to define your DBT asset within Dagster. 2. Create a job: Define a job that includes the DBT asset. This job will be responsible for running the DBT commands that materialize your asset. 3. Create a schedule or sensor: Define a schedule or sensor that will create run requests for the job. A schedule runs at a specific time interval, while a sensor triggers runs based on certain conditions. Here's a conceptual example of how you might set this up: python from dagster import job, schedule, RunRequest, repository from dagster_dbt import dbt_cli_resource, dbt_run_op # Define a job that includes the DBT asset @job(resource_defs={"dbt": dbt_cli_resource}) def my_dbt_job(): dbt_run_op() # Define a schedule that creates run requests for the DBT job @schedule(cron_schedule="0 0 * * *", job=my_dbt_job, execution_timezone="UTC") def my_dbt_schedule(context): # You can customize the run config or tags as needed run_config = {"resources": {"dbt": {"config": {"project_dir": "path/to/dbt/project"}}}} return RunRequest(run_key=context.scheduled_execution_time.strftime("%Y-%m-%d-%H-%M"), run_config=run_config) # Collect jobs and schedules in a repository @repository def my_repository(): return [my_dbt_job, my_dbt_schedule]
In this example, dbt_run_op
is an operation provided by the dagster_dbt
library that runs the dbt run
command. The dbt_cli_resource
is a resource that configures how the DBT CLI is invoked. The my_dbt_schedule
schedule is set to run every day at midnight UTC, and it creates a RunRequest
with a specific run_key
and run_config
. Please adjust the run_config
and the cron_schedule
according to your specific DBT project configuration and scheduling needs. Also, ensure that the project_dir
points to the correct location of your DBT project. Remember that this is a high-level example, and you will need to adapt the code to fit your specific use case, including the correct setup for your DBT assets and resources.Corbett Carrel
02/13/2024, 6:33 PMScout
02/13/2024, 6:33 PMdbt run
command with the --select
flag to specify the particular model you want to run. In Dagster, you can create a job that includes an operation to execute this dbt command, and then create a run request for that job with the appropriate configuration. Here's a conceptual example of how you might set up a Dagster job to run a single dbt model: python from dagster import job, op, repository from dagster_dbt import dbt_cli_resource # Define an operation that runs a single dbt model @op(required_resource_keys={"dbt"}) def run_single_dbt_model(context): # Replace 'my_model' with the name of your dbt model context.resources.dbt.run(["run", "--select", "my_model"]) # Define a job that includes the operation @job(resource_defs={"dbt": dbt_cli_resource}) def my_single_dbt_model_job(): run_single_dbt_model() # Collect jobs in a repository @repository def my_repository(): return [my_single_dbt_model_job]
In this example, the run_single_dbt_model
operation uses the dbt_cli_resource
to run the dbt run
command with the --select
flag to target a specific model named my_model
. You should replace 'my_model'
with the actual name of the dbt model you want to run. To create a run request for this job, you would typically use a schedule or sensor. However, if you want to create a run request manually, you can do so through the Dagster UI or the GraphQL API. Here's an example of how you might create a run request manually through the Dagster UI: 1. Navigate to the Dagster UI. 2. Go to the "Jobs" section and find my_single_dbt_model_job
. 3. Click on the "Launchpad" tab for the job. 4. Configure any necessary run configuration, if needed. 5. Click the "Launch Run" button to execute the job. If you need to create a run request programmatically, you can use the GraphQL API provided by Dagster to trigger a run for the job with the desired configuration. Please note that the above example assumes you have set up the dbt_cli_resource
with the appropriate configuration for your dbt project. Adjust the resource configuration to match your dbt project's directory and profiles as needed.Corbett Carrel
02/13/2024, 7:26 PMScout
02/13/2024, 7:26 PMrun_config
that specifies running a single dbt model or asset, you can use the dbt_cli_resource
in Dagster and provide the necessary configuration to the dbt run
command. The run_config
should include the dbt run
command with the --select
flag followed by the name of the model or asset you want to run. Here's an example of how you might define a run_config
for a job that runs a single dbt model: python from dagster import job, op, repository from dagster_dbt import dbt_cli_resource # Define an operation that runs a single dbt model @op(required_resource_keys={"dbt"}) def run_single_dbt_model(context): # The model name is provided through the run config model_name = context.op_config["model_name"] context.resources.dbt.run(["run", "--select", model_name]) # Define a job that includes the operation @job(resource_defs={"dbt": dbt_cli_resource}) def my_single_dbt_model_job(): run_single_dbt_model() # Define the run_config for the job run_config = { "ops": { "run_single_dbt_model": { "config": { "model_name": "my_model" # Replace with your dbt model name } } }, "resources": { "dbt": { "config": { "project_dir": "path/to/your/dbt/project", # Replace with the path to your dbt project # Include other necessary dbt configuration here } } } } # Collect jobs in a repository @repository def my_repository(): return [my_single_dbt_model_job]
In this run_config
, the ops
section specifies the configuration for the run_single_dbt_model
operation, which includes the name of the dbt model to run. The resources
section includes the configuration for the dbt_cli_resource
, such as the project_dir
which should be set to the path of your dbt project. When you launch a run for the my_single_dbt_model_job
job, you would provide this run_config
to specify that you want to run the my_model
dbt model. You can launch the run through the Dagster UI or programmatically via the Dagster API. Please ensure that you replace "my_model"
with the actual name of your dbt model and "path/to/your/dbt/project"
with the correct path to your dbt project directory. Additionally, you may need to include other dbt configuration options specific to your setup.Corbett Carrel
02/13/2024, 8:25 PMScout
02/13/2024, 8:25 PMget_event_records
that can be used to fetch materialization events, which you can filter by the asset key and event type. Here's a conceptual example of how you might write a function to find out which partitions have been materialized for a given asset: python from dagster import DagsterInstance, EventRecordsFilter, DagsterEventType, AssetKey def get_materialized_partitions(asset_key): instance = DagsterInstance.get() materialization_events = instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, ) ) # Extract the partition keys from the materialization events materialized_partitions = { event.dagster_event.partition for event in materialization_events if event.is_dagster_event and event.dagster_event.partition } return materialized_partitions # Example usage: # Define your asset key my_asset_key = AssetKey(["my_asset"]) # Get the set of materialized partitions materialized_partitions = get_materialized_partitions(my_asset_key) print("Materialized partitions:", materialized_partitions)
In this example, my_asset_key
should be replaced with the actual AssetKey
of your partitioned asset. The function get_materialized_partitions
will return a set of partition keys that have been materialized for the given asset. Please ensure that you have the appropriate context to access the Dagster instance, as this code assumes you are running within a Dagster environment where DagsterInstance.get()
can be called to retrieve the current instance.Corbett Carrel
02/13/2024, 8:46 PMScout
02/13/2024, 8:46 PM@asset_sensor
decorator provided by Dagster. This sensor will monitor materialization events for a specific asset key and can be configured to trigger a job or some other action when a materialization occurs. Here's an example of how you might define an asset sensor for a dbt asset: python from dagster import asset_sensor, RunRequest, SensorEvaluationContext, AssetKey # Define the job that should be triggered by the asset sensor my_downstream_job = ... @asset_sensor(asset_key=AssetKey("my_dbt_asset"), job=my_downstream_job) def my_dbt_asset_sensor(context: SensorEvaluationContext): asset_event = context.latest_materialization if asset_event: # Optionally, you can use the materialization timestamp or other metadata # from the asset_event to customize the run request or run config timestamp = asset_event.timestamp # Create a RunRequest to trigger the job yield RunRequest( run_key=f"my_dbt_asset_{timestamp}", run_config={ # Include any necessary run configuration here }, )
In this example, replace "my_dbt_asset"
with the actual name of your dbt asset. The my_downstream_job
should be the job that you want to trigger when the dbt asset is materialized. The sensor function my_dbt_asset_sensor
checks for the latest materialization event of the asset and yields a RunRequest
to trigger the specified job. The run_key
is constructed using the materialization timestamp to ensure uniqueness, but you can customize it as needed. The run_config
can also be customized to provide any necessary configuration for the job being triggered. Please note that you will need to replace my_downstream_job
with the actual job you want to run when the dbt asset is materialized. Also, ensure that the run_config
is properly set up for your specific use case. Once you have defined the sensor, you need to include it in your repository definition so that Dagster can execute it: python from dagster import repository @repository def my_repository(): return [my_downstream_job, my_dbt_asset_sensor]
This sensor will now monitor for materializations of the specified dbt asset and trigger the downstream job accordingly.Corbett Carrel
02/14/2024, 2:16 PMScout
02/14/2024, 2:16 PM@asset_sensor
decorator in Dagster. This sensor will monitor for materialization events of a particular asset and can trigger a dbt job that includes the specific dbt model you want to run. Here's a conceptual example based on the documentation: python from dagster import asset_sensor, RunRequest, SensorEvaluationContext, AssetKey # Define the job that should be triggered by the asset sensor # This job should include the dbt model you want to run my_dbt_job = ... @asset_sensor(asset_key=AssetKey("my_asset"), job=my_dbt_job) def my_dbt_model_sensor(context: SensorEvaluationContext): asset_event = context.latest_materialization if asset_event: # Optionally, you can use the materialization timestamp or other metadata # from the asset_event to customize the run request or run config timestamp = asset_event.timestamp # Create a RunRequest to trigger the dbt job yield RunRequest( run_key=f"my_dbt_model_{timestamp}", run_config={ # Specify the dbt model to run using the `models` parameter # in your dbt run config "ops": { "dbt_run": { "config": { "models": ["my_specific_dbt_model"] } } } }, )
In this example, replace "my_asset"
with the actual name of the asset whose materialization should trigger the dbt model run. The my_dbt_job
should be the job that includes the dbt operation to run the dbt model. The my_dbt_model_sensor
function checks for the latest materialization event of the asset and yields a RunRequest
to trigger the dbt job with a specific run configuration. The run_config
is set up to include the models
parameter, which specifies the dbt model you want to run. Replace "my_specific_dbt_model"
with the actual name of your dbt model. Please note that you will need to replace my_dbt_job
with the actual job you want to run when the asset is materialized. Also, ensure that the run_config
is properly set up for your specific use case, including the correct dbt CLI configuration. Once you have defined the sensor, you need to include it in your repository definition so that Dagster can execute it: python from dagster import repository @repository def my_repository(): return [my_dbt_job, my_dbt_model_sensor]
This sensor will now monitor for materializations of the specified asset and trigger the dbt job to run the specific dbt model accordingly.Corbett Carrel
02/14/2024, 2:32 PMScout
02/14/2024, 2:32 PM@multi_asset_sensor
decorator provided by Dagster. This sensor can monitor multiple assets or partitions and trigger a job only when certain conditions are met, such as all partitions being materialized. Here's a conceptual example based on the documentation: python from dagster import multi_asset_sensor, RunRequest, AssetKey, MultiAssetSensorEvaluationContext # Define the job that should be triggered by the asset sensor # This job should be the one you want to run after the last partition is materialized my_job = ... @multi_asset_sensor(monitored_assets=[AssetKey("my_partitioned_asset")], job=my_job) def my_partitioned_asset_sensor(context: MultiAssetSensorEvaluationContext): # Get the latest materialization records for each partition materialization_records = context.latest_materialization_records_by_partition(AssetKey("my_partitioned_asset")) # Check if all expected partitions have been materialized expected_partitions = [...] # List of expected partition keys if all(partition in materialization_records for partition in expected_partitions): # All expected partitions have been materialized, trigger the job yield RunRequest( run_key="run_after_all_partitions_materialized", run_config={ # Include any necessary run configuration here }, )
In this example, replace "my_partitioned_asset"
with the actual name of your partitioned asset. The expected_partitions
should be a list of partition keys that you expect to be materialized before triggering the job. The my_partitioned_asset_sensor
function checks if all expected partitions have materialization records, and if so, it yields a RunRequest
to trigger the job. Please note that you will need to replace my_job
with the actual job you want to run when all partitions are materialized. Also, ensure that the expected_partitions
list accurately reflects the partitions you are monitoring. Once you have defined the sensor, you need to include it in your repository definition so that Dagster can execute it: python from dagster import repository @repository def my_repository(): return [my_job, my_partitioned_asset_sensor]
This sensor will now monitor for materializations of the specified partitioned asset and trigger the job only after the last partition has been materialized.Corbett Carrel
02/14/2024, 3:09 PMScout
02/14/2024, 3:09 PM@asset
decorator using the ins
parameter. Here's an example of how you might define assets with dependencies: python from dagster import asset, AssetIn # Define an upstream asset that produces a list of values @asset def upstream_asset(): return [1, 2, 3] # Define a downstream asset that takes the list of values from the upstream asset @asset(ins={"upstream_values": AssetIn()}) def downstream_asset(upstream_values): # Use the list of values from the upstream asset return sum(upstream_values)
In this example, downstream_asset
depends on upstream_asset
, and the list of values produced by upstream_asset
is passed to downstream_asset
as the upstream_values
input. If you want to add additional context or metadata to be used by another asset, you can use the AssetExecutionContext
within the asset function to access the context and pass metadata or other information to downstream assets. Here's an example of using the context within an asset: python from dagster import asset, AssetExecutionContext @asset def asset_with_context(context: AssetExecutionContext): # You can access the context to log information, access resources, etc. <http://context.log.info|context.log.info>("Producing a list of values.") # You can also add metadata or other context-related information here return [1, 2, 3] @asset def another_asset(values): # Use the values from the upstream asset return sum(values)
In this example, asset_with_context
uses the AssetExecutionContext
to log information, and it could also use the context to add metadata or access resources. The values produced by asset_with_context
are then passed to another_asset
. Please note that while you can pass data directly between assets, the context itself is not passed between assets. Instead, the context is used within each asset to access execution-related information and capabilities. If you need to pass additional metadata or configuration, you should include it in the data produced by the assets or use resources to share common configuration or state.Corbett Carrel
02/14/2024, 3:10 PMScout
02/14/2024, 3:10 PMRunRequest
objects that can include run configuration data to parameterize the runs they trigger. If you want to pass a list of values from a sensor to a job or asset, you can include that list in the run_config
of the RunRequest
. The job or asset can then access this configuration when it executes. Here's an example of how you might create a sensor that includes a list of values in the `run_config`: python from dagster import sensor, RunRequest, repository, job @job def my_job(): # Job definition that will use the list of values from the sensor pass @sensor(job=my_job) def my_sensor(context): # Define the list of values you want to pass to the job list_of_values = [1, 2, 3, 4, 5] # Create a RunRequest with a run_config that includes the list of values run_request = RunRequest( run_key=context.cursor, run_config={ "ops": { "my_op": { # Replace "my_op" with the actual op name in your job "config": { "list_of_values": list_of_values } } } } ) yield run_request @repository def my_repository(): return [my_job, my_sensor]
In this example, my_sensor
creates a RunRequest
that includes a run_config
with a list of values under the ops
configuration for an operation named my_op
. When the sensor triggers a run of my_job
, the job will receive this configuration and can access the list of values. Please note that you need to replace "my_op"
with the name of the operation in your job that should receive the list of values. The operation should be defined to accept configuration that includes the list_of_values
field. Keep in mind that sensors are not designed to pass data between each other but to trigger runs based on external events. The pattern shown here is for passing configuration data from a sensor to a job at the time the run is triggered.Scout
02/14/2024, 3:11 PMRunRequest
objects that can trigger pipeline runs with a specific configuration. If you want to pass a list of values from one sensor to another, you would typically do this by triggering a run with a specific configuration that includes the values you want to pass. These values can be included in the run_config
of the RunRequest
and then accessed within the pipeline run. Here's a conceptual example of how you might create a sensor that triggers a run with a list of values: python from dagster import sensor, RunRequest @sensor(job=my_job) def my_sensor(): # Define the list of values you want to pass to the pipeline run values_to_pass = [1, 2, 3] # Create a RunRequest with a run_config that includes the list of values yield RunRequest( run_key="unique_run_key", # Unique identifier for this run request run_config={ "solids": { "my_solid": { "config": { "my_values": values_to_pass } } } } )
In this example, my_job
is the job that will be triggered by the sensor, and my_solid
is a solid within that job that expects a configuration field called my_values
. The values_to_pass
list is included in the run_config
under the appropriate solid's config. When the sensor triggers a run of my_job
, the my_solid
solid within that job will receive the values_to_pass
list as part of its configuration and can use it during execution. Please note that this example is conceptual, and you will need to adapt the code to fit your specific use case, including the correct job and solid names, as well as the structure of your run_config
. Sensors themselves do not have a mechanism for directly passing data to each other; they are designed to observe external conditions and trigger runs accordingly.Corbett Carrel
02/23/2024, 4:03 PMScout
02/23/2024, 4:03 PMDagsterInstance.ephemeral()
for local testing, you can use the DagsterInstance.ephemeral()
method to create an in-memory instance of Dagster that is suitable for ephemeral execution. This is particularly useful in test contexts where you don't want to persist any data or configuration to disk. Here's an example of how you might use DagsterInstance.ephemeral()
in a test: python from dagster import DagsterInstance def test_my_thing(): # Create an ephemeral Dagster instance with DagsterInstance.ephemeral() as instance: # Your test code here, using the ephemeral instance ...
When using DagsterInstance.ephemeral()
, the instance is created for the duration of the context manager block and is automatically cleaned up when the block exits. This means that any runs, events, or other data associated with the instance are only stored in memory and are not persisted after the test completes. This approach is helpful when writing unit tests for Dagster code, as it allows you to test your pipelines, solids, and assets without affecting your actual Dagster instance or requiring any external resources like a database. Remember to replace ...
with your actual test code that interacts with the ephemeral Dagster instance.Corbett Carrel
02/27/2024, 2:54 PMScout
02/27/2024, 2:54 PMdefine_asset_job
function provided by Dagster. This function allows you to define a job that materializes a set of assets, which can include dbt assets loaded into Dagster. Here's a step-by-step guide on how to build a job for dbt assets: 1. Load dbt assets into Dagster: Use the load_assets_from_dbt_project
or load_assets_from_dbt_manifest
function provided by the dagster-dbt
library to load your dbt models as Dagster assets. 2. Define a job for dbt assets: Use the define_asset_job
function to create a job that materializes the dbt assets. You can specify which assets to include in the job using AssetSelection
. 3. Register the job in your repository: Include the job in your repository definition so that Dagster can execute it. Here's an example of how you might do this: python from dagster import define_asset_job, repository from dagster_dbt import load_assets_from_dbt_project, dbt_cli_resource # Load dbt assets into Dagster dbt_assets = load_assets_from_dbt_project( project_dir='path/to/your/dbt/project', profiles_dir='path/to/your/dbt/profiles', ) # Define a job that materializes all dbt assets dbt_assets_job = define_asset_job( name='dbt_assets_job', selection=dbt_assets, resource_defs={'dbt': dbt_cli_resource.configured({'project_dir': 'path/to/your/dbt/project'})}, ) # Collect jobs in a repository @repository def my_repository(): return [dbt_assets_job]
In this example, replace 'path/to/your/dbt/project'
and 'path/to/your/dbt/profiles'
with the actual paths to your dbt project and profiles directories. The dbt_assets_job
is configured to materialize all dbt assets loaded into Dagster. Please note that the dbt_cli_resource
is configured with the path to your dbt project directory. You may need to configure additional settings for the dbt_cli_resource
based on your dbt project's requirements. This job can now be executed through the Dagster UI, CLI, or scheduled to run at specific intervals using Dagster's scheduling system.Corbett Carrel
02/28/2024, 2:09 AMScout
02/28/2024, 2:09 AMRunRequest
: If the most recent partition is missing, the sensor should yield a RunRequest
to trigger a pipeline run that will materialize that partition.
Here's an example of how you might implement this:
python
from dagster import sensor, RunRequest, DagsterInstance, AssetKey
from datetime import datetime, timedelta
# Define the partitioned asset key
partitioned_asset_key = AssetKey("my_partitioned_asset")
# Define the sensor
@sensor(job=my_job)
def my_partition_sensor(context):
instance = DagsterInstance.get()
# Define the function to calculate the most recent partition
def get_most_recent_partition():
# This is an example and should be replaced with your actual logic
# to calculate the most recent partition key
today = datetime.today()
return today.strftime("%Y-%m-%d")
# Get the most recent partition key
most_recent_partition = get_most_recent_partition()
# Check if the most recent partition has been materialized
materializations = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=partitioned_asset_key,
)
)
materialized_partitions = {
event.dagster_event.partition for event in materializations
if event.is_dagster_event and event.dagster_event.partition
}
# If the most recent partition is not materialized, yield a RunRequest
if most_recent_partition not in materialized_partitions:
yield RunRequest(
run_key=f"fill_missing_partition_{most_recent_partition}",
run_config={
# Include any necessary run configuration here
# For example, you might need to specify the partition to materialize
"solids": {
"my_solid": {
"config": {
"partition_key": most_recent_partition
}
}
}
},
)
In this example, my_job
is the job that will be triggered by the sensor to materialize the missing partition. The get_most_recent_partition
function is a placeholder anScout
02/28/2024, 2:09 AMrun_config
in the RunRequest
should be configured to include any necessary configuration for the job to materialize the specific partition. The run_key
is constructed using the most recent partition key to ensure uniqueness. Please note that this example is conceptual, and you will need to adapt the code to fit your specific use case, including the correct job, asset key, and partition calculation logic.Corbett Carrel
02/28/2024, 2:11 AMScout
02/28/2024, 2:12 AMrun_config
within a sensor, in Dagster, you can indeed specify the configuration for a job's ops (or solids) when creating a RunRequest
in a sensor. A job in Dagster can be composed of ops or a graph of ops, and you can configure these ops in the run_config
to pass specific parameters or settings that they need to execute. Here's a conceptual example of how you might specify op configuration in a run_config
within a sensor: python from dagster import sensor, RunRequest @sensor(job=my_job) def my_sensor(context): # Define the configuration for the ops within the job op_config = { "ops": { "my_op": { "config": { "param1": "value1", "param2": "value2", } } } } # Create a RunRequest with the specified op configuration yield RunRequest( run_key="unique_run_key", run_config=op_config, )
In this example, my_job
is the job that will be triggered by the sensor. The my_op
is an op within that job, and we are specifying its configuration in the run_config
dictionary under the ops
key. The param1
and param2
are hypothetical configuration parameters for my_op
. Please replace my_job
, my_op
, param1
, and param2
with the actual job, op, and configuration parameters relevant to your use case. If "grap operated" refers to something else or if you have additional context or a different question, please provide more details so I can assist you further.Scout
02/28/2024, 2:12 AMrun_config
when creating a RunRequest
in a sensor. This allows you to configure the behavior of the ops that will be executed when the sensor triggers a pipeline run. Here's an example of how you might include op configurations in a run_config
within a sensor: python from dagster import sensor, RunRequest @sensor(job=my_job) def my_sensor(context): # Define the configuration for the ops within the job op_config = { "ops": { "my_op": { "inputs": { "input_name": {"value": "some_value"} }, "config": { "param1": "value1", "param2": "value2", } } } } # Create a RunRequest with the specified op configuration yield RunRequest( run_key="unique_run_key", run_config=op_config, )
In this example, my_job
is the job that will be triggered by the sensor. The my_op
is an op within that job, and we are specifying its configuration in the run_config
dictionary under the ops
key. The input_name
is a hypothetical input to my_op
, and param1
and param2
are hypothetical configuration parameters for my_op
. Please replace my_job
, my_op
, input_name
, param1
, and param2
with the actual job, op, input, and configuration parameters relevant to your use case. When the sensor triggers a run of my_job
, the my_op
op within that job will receive the specified inputs and configuration parameters as part of its execution context. Keep in mind that the structure of the run_config
should match the expectations of the job and its ops. The configuration keys and values should correspond to the defined inputs and configuration schema of the ops in your job.Corbett Carrel
02/28/2024, 2:15 AMScout
02/28/2024, 2:15 AMrun_config
in the RunRequest
yielded by a sensor to specify which assets or operations should be executed when the job is triggered. This is done by specifying the relevant op configurations or asset keys within the run_config
. Here's a conceptual example of how you might trigger multiple assets from a sensor: python from dagster import sensor, RunRequest @sensor(job=my_assets_job) def my_sensor(context): # Define the configuration for the job that includes the assets run_config = { "solids": { "my_solid": { "config": { # Configuration specific to the solid that computes the asset } } }, "assets": { "my_asset_1": { # Configuration specific to my_asset_1 }, "my_asset_2": { # Configuration specific to my_asset_2 }, # Add additional assets as needed } } # Create a RunRequest with the specified run configuration yield RunRequest( run_key="unique_run_key", run_config=run_config, )
In this example, my_assets_job
is the job that includes the asset computations you want to trigger. The run_config
specifies the configuration for the solids or assets that are part of the job. When the sensor triggers a run of my_assets_job
, the specified assets will be computed according to the provided configuration. Please replace my_assets_job
, my_solid
, my_asset_1
, my_asset_2
, and the configuration details with the actual job, solids, assets, and configuration relevant to your use case. Remember that the run_config
should match the structure expected by your job and its assets. The configuration keys and values should correspond to the defined configuration schema of the assets and their associated computations.Scout
02/28/2024, 2:16 AMRunRequest
objects that specify the job to run and the run_config
to use, which can include configurations for dbt assets. To trigger multiple dbt assets from a sensor, you would typically have a job that includes the dbt assets you want to run. The sensor would then yield a RunRequest
for that job, and you can use the run_config
to specify which dbt models to run using the models
parameter in the dbt CLI configuration. Here's a conceptual example of how you might set up a sensor to trigger a job that runs multiple dbt assets: python from dagster import sensor, RunRequest # Define the job that includes multiple dbt assets my_dbt_job = ... @sensor(job=my_dbt_job) def my_dbt_sensor(context): # Define the configuration for the dbt CLI to run specific models run_config = { "resources": { "dbt": { "config": { "models": ["+my_first_dbt_model", "+my_second_dbt_model"] } } } } # Create a RunRequest with the specified dbt CLI configuration yield RunRequest( run_key="unique_run_key_for_dbt_assets", run_config=run_config, )
In this example, my_dbt_job
is the job that includes the dbt assets you want to run. The run_config
specifies the dbt CLI configuration to run the models my_first_dbt_model
and my_second_dbt_model
, including their dependencies (as indicated by the +
prefix). Please replace my_dbt_job
, my_first_dbt_model
, and my_second_dbt_model
with the actual job and model names relevant to your dbt project. When the sensor triggers a run of my_dbt_job
, the specified dbt models will be run according to the configuration provided in the run_config
. Keep in mind that the structure of the run_config
should match the expectations of the dbt CLI and your dbt project configuration. The models
parameter should list the dbt models you want to run, and you can use dbt's selection syntax to specify models and their dependencies.Scout
02/28/2024, 2:16 AMpython from dagster import sensor, RunRequest, repository from dagster_dbt import dbt_cli_resource # Define a job that runs multiple dbt assets @job(resource_defs={"dbt": dbt_cli_resource}) def my_dbt_assets_job(): # Your dbt asset operations go here ... # Define a sensor that triggers the dbt assets job @sensor(job=my_dbt_assets_job) def my_dbt_assets_sensor(context): # Logic to determine when to trigger the job # For example, you might check for certain conditions or events if should_trigger_dbt_assets(): yield RunRequest( run_key="unique_run_key_for_this_trigger", run_config={ # You can specify run config for the dbt job here if needed }, ) # Include the job and sensor in your repository @repository def my_repository(): return [my_dbt_assets_job, my_dbt_assets_sensor]
In this example, my_dbt_assets_job
is a job that includes the dbt asset operations you want to run. The my_dbt_assets_sensor
sensor checks for certain conditions using a hypothetical should_trigger_dbt_assets
function, and when those conditions are met, it yields a RunRequest
to trigger my_dbt_assets_job
. Please replace the placeholder code with the actual logic for your dbt assets and the conditions under which you want to trigger them. The run_config
in the RunRequest
can be used to pass any necessary configuration to the dbt job when it runs. Keep in mind that the run_key
should be unique for each trigger to avoid collisions with other runs. The run_config
should match the structure expected by your dbt job and its assets. The configuration keys and values should correspond to the defined configuration schema of the dbt assets and their associated computations.Corbett Carrel
03/06/2024, 11:40 PMScout
03/06/2024, 11:40 PMdagster-snowflake
library, which is an integration between Dagster and Snowflake.
The Snowflake resource abstracts away the details of connecting to Snowflake, allowing you to focus on writing your data processing logic. It manages the lifecycle of the connection to the Snowflake database, ensuring that connections are properly opened and closed.
Here's an example of how you might use the Snowflake resource in a Dagster asset:
python
from dagster import asset
from dagster_snowflake import SnowflakeResource
@asset(required_resource_keys={"snowflake"})
def my_snowflake_asset(context):
# Use the Snowflake resource to execute a query
with context.resources.snowflake.get_connection() as conn:
result = conn.cursor().execute("SELECT * FROM my_table").fetchall()
# Do something with the result
return result
In this example, the my_snowflake_asset
asset requires the Snowflake resource, which is specified by the required_resource_keys
parameter. Inside the asset function, the get_connection
method is used to obtain a connection to the Snowflake database, and a query is executed using that connection.
To configure the Snowflake resource, you would typically provide the necessary connection parameters, such as account, user, password, database, schema, and warehouse. These parameters can be set directly or through environment variables for better security and flexibility.
Here's an example of how you might configure the Snowflake resource in your repository definition:
python
from dagster import repository
from dagster_snowflake import SnowflakeResource
@repository
def my_repository():
return {
"resources": {
"snowflake": SnowflakeResource.configured({
"account": "my_snowflake_account",
"user": "my_snowflake_user",
"password": "my_snowflake_password",
"database": "my_snowflake_database",
"schema": "my_snowflake_schema",
"warehouse": "my_snowflake_warehouse",
})
},
"assets": [my_snowflake_asset],
}
In this repository definition, the Snowflake resource is configured with the necessary connection parameters. These parameters are then used by the my_snowflake_asset
asset when it is executed.
The Snowflake resource provides a Pythonic way to interact with Snowflake, and it can be used in conjunction with other Dagster features, such as IO managers and asset materializations, to build robScout
03/06/2024, 11:40 PMCorbett Carrel
04/01/2024, 4:53 PMScout
04/01/2024, 4:53 PMtempfile
module to create temporary files or directories. These temporary files and directories are useful for storing intermediate data that you don't need to persist beyond the lifetime of the asset computation. Here's an example of how you might use tempfile
within an asset: python import tempfile from dagster import asset @asset def my_asset_with_temp_storage(): # Create a temporary file with tempfile.NamedTemporaryFile(mode='w+', delete=True) as tmp_file: # Write some data to the temporary file tmp_file.write('some temporary data') tmp_file.flush() # Read the data back from the temporary file tmp_file.seek(0) data = tmp_file.read() # The temporary file is automatically deleted after exiting the block # Do something with the data return data
In this example, NamedTemporaryFile
is used to create a temporary file that is automatically deleted when the file is closed (when exiting the with
block). The delete=True
argument ensures that the file is deleted after use. You can write to and read from the temporary file as needed. If you need a temporary directory instead of a file, you can use tempfile.TemporaryDirectory
in a similar way: python import tempfile from dagster import asset @asset def my_asset_with_temp_dir(): # Create a temporary directory with tempfile.TemporaryDirectory() as tmp_dir: # Use the temporary directory to store files or other data tmp_file_path = os.path.join(tmp_dir, 'temp_file.txt') with open(tmp_file_path, 'w') as tmp_file: tmp_file.write('some temporary data') # The temporary directory and all its contents are automatically deleted after exiting the block # Continue with your asset logic ...
In this example, TemporaryDirectory
is used to create a temporary directory that is automatically cleaned up when the context manager exits. Please note that temporary files and directories created with tempfile
are usually stored in the system's default temporary directory (e.g., /tmp
on Unix-like systems). The exact location can vary based on the operating system and environment configuration. If you need to know the path to the temporary file or directory, you can use the .name
attribute of the object returned by NamedTemporaryFile
or the string value provided to the context manager by TemporaryDirectory
.Corbett Carrel
04/02/2024, 3:51 PMScout
04/02/2024, 3:51 PMAssetIn
and AssetKey
classes to specify dependencies between assets. Here's a step-by-step guide on how to set up this relationship: 1. Define your dbt asset: Use the @dbt_assets
decorator to load your dbt models as Dagster assets. 2. Define your Python asset: Use the @asset
decorator to define your Python asset. Specify the dbt asset as a dependency using the ins
parameter. 3. Access the dbt asset in the Python asset: The dbt asset will be passed as an argument to the Python asset function, where you can use it. Here's a conceptual example: python from dagster import asset, AssetIn, AssetKey from dagster_dbt import dbt_assets # Load dbt models as Dagster assets @dbt_assets(...) def my_dbt_assets(): ... # Define a Python asset that depends on a specific dbt model @asset(ins={"my_dbt_model": AssetIn(key=AssetKey("my_dbt_model"))}) def my_python_asset(my_dbt_model): # Use the dbt model data in the Python asset # Perform your Python logic here, using the dbt model data ...
In this example, my_dbt_assets
is a function decorated with @dbt_assets
that loads dbt models as Dagster assets. my_python_asset
is a Python asset that depends on the dbt model named my_dbt_model
. The AssetIn
object specifies that my_python_asset
has an input asset with the key AssetKey("my_dbt_model")
, which corresponds to the dbt model you want to use. When you execute my_python_asset
, Dagster will ensure that my_dbt_model
is materialized first, and its output will be passed to my_python_asset
as an argument. Please note that you need to replace "my_dbt_model"
with the actual name of your dbt model, and you should define the @dbt_assets
function according to your dbt project's configuration. Additionally, you may need to configure an IO manager to handle how the output of the dbt asset is stored and passed to the Python asset. The IO manager will be responsible for loading the materialized dbt model data into a format that your Python asset can use, such as a Pandas DataFrame or a CSV file.Corbett Carrel
04/23/2024, 11:23 AMScout
04/23/2024, 11:24 AM@asset
decorator to define your asset and specify a configuration schema if needed. 2. Access the configuration within the asset: Within the asset function, you can access the configuration values through the context parameter. 3. Provide the configuration when materializing the asset: When you materialize the asset, you can provide the configuration values, which can include the variable you want to use. Here's a conceptual example: python from dagster import asset, Field # Define an asset that expects a configuration value @asset(config_schema={"external_variable": Field(str)}) def my_asset(context): # Access the external variable from the asset's configuration external_variable = context.op_config["external_variable"] # Use the external variable in your asset logic ...
In this example, my_asset
is defined with a configuration schema that expects a field named external_variable
. Within the asset function, you can access this configuration value through context.op_config
. When you materialize the asset, you can provide the configuration, including the value for `external_variable`: python result = my_asset.materialize( run_config={ "ops": { "my_asset": { "config": { "external_variable": "value_from_outside" } } } } )
Replace "value_from_outside"
with the actual value you want to pass to the asset. Please note that while you can use external variables in this way, it's important to manage them carefully to maintain the reproducibility and clarity of your asset computations. Using environment variables, resource configurations, or other Dagster mechanisms can help manage external dependencies in a more controlled manner.Corbett Carrel
04/23/2024, 4:11 PMScout
04/23/2024, 4:11 PMuser_streak_s3
step because one of its dependencies, specifically the my_dbt_assets.model_dbt_consumer_app_fct_user_streak
asset, has been skipped. In Dagster, steps or assets can be skipped for several reasons: 1. Conditional Execution: If the execution logic of your pipeline includes conditions that determine whether certain steps should run, and those conditions evaluate to False
, the dependent steps will be skipped. 2. Upstream Failures: If an upstream step or asset fails or is skipped, all downstream steps that depend on it will also be skipped to prevent execution with incomplete or incorrect data. 3. Unmet Expectations: If an asset has expectations or checks that are not met, it may result in the asset being skipped, along with any downstream assets that depend on it. 4. Missing Inputs: If an asset or step expects inputs that are not provided or generated by upstream steps, it will be skipped. To resolve this issue, you should investigate why the my_dbt_assets.model_dbt_consumer_app_fct_user_streak
asset is being skipped. Here are some steps you can take: - Check Upstream Assets: Ensure that all upstream assets that the my_dbt_assets.model_dbt_consumer_app_fct_user_streak
asset depends on have been successfully materialized. - Review Asset Definitions: Look at the definitions of the skipped asset and its dependencies to ensure they are configured correctly and that any conditions or expectations are met. - Examine Logs: Review the logs for the run to find any error messages or warnings that occurred before the skip message. This can provide clues as to why the asset was skipped. - Check Configuration: Verify that the run_config
provided for the run includes all necessary configurations for the assets and that any required inputs are available. - Review Code Changes: If the skipping behavior started after a code change, review recent changes to the asset definitions or pipeline code to identify any modifications that could lead to skipped executions. By investigating these areas, you should be able to determine the cause of the skipped asset and take appropriate action to ensure that it and its downstream dependencies are executed as expected. If the issue persists, you may want to reach out to the Dagster community or support channels for further assistance.Corbett Carrel
05/22/2024, 7:03 PMScout
05/22/2024, 7:03 PM@asset_sensor
decorator. This sensor will monitor the materialization events of the dbt asset and trigger a job that includes the Python asset when the dbt asset is materialized.
Here's a step-by-step guide on how to set this up:
1. Define your dbt asset: Use the @dbt_assets
decorator to load your dbt models as Dagster assets.
2. Define your Python asset: Use the @asset
decorator to define your Python asset.
3. Define a job that includes the Python asset: Create a job that includes the Python asset.
4. Create a sensor that monitors the dbt asset: Use the @asset_sensor
decorator to define a sensor that monitors the dbt asset and triggers the job with the Python asset.
Here's an example:
python
from dagster import asset, AssetKey, asset_sensor, RunRequest, SensorEvaluationContext, job
from dagster_dbt import dbt_assets
# Step 1: Define your dbt asset
@dbt_assets(
manifest_path="path/to/your/dbt/manifest.json",
select="model_dbt_consumer_app_fct_user_streak"
)
def my_dbt_assets():
pass
# Step 2: Define your Python asset
@asset
def my_python_asset(context, dbt_output):
# Use the dbt_output in your Python asset logic
<http://context.log.info|context.log.info>(f"Received dbt output: {dbt_output}")
# Perform your Python logic here
return dbt_output
# Step 3: Define a job that includes the Python asset
@job
def my_python_job():
my_python_asset()
# Step 4: Create a sensor that monitors the dbt asset
@asset_sensor(asset_key=AssetKey("model_dbt_consumer_app_fct_user_streak"), job=my_python_job)
def my_dbt_asset_sensor(context: SensorEvaluationContext, asset_event):
# Extract the dbt output from the asset event
dbt_output = asset_event.dagster_event.step_materialization_data.materialization.metadata_entries
# Trigger the job with the Python asset
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"my_python_asset": {
"inputs": {
"dbt_output": dbt_output
}
}
}
}
)
In this example:
- my_dbt_assets
is a function decorated with @dbt_assets
that loads the dbt model model_dbt_consumer_app_fct_user_streak
as a Dagster asset.
- my_python_asset
is a Python asset that takes the output of the dbt asset as an input.
- my_python_job
is a job that includes the my_python_asset
.
- my_dbt_asset_sensor
is a sensor that monitors the materialization of the dbt asset model_dbt_consumer_app_fct_user_streak
and triggers my_python_job
when the dbt asset is materialized.
Make sure to replace "path/to/your/dbt/manifest.json"
with the actual pathScout
05/22/2024, 7:04 PMmodel_dbt_consumer_app_fct_user_streak
is materialized, the sensor will trigger the job that runs the Python asset my_python_asset
, passing the dbt output to it. Sensor to monitor OpExecutionContext for... button How to Trigger a Job with Sensors Based ... button Asset Sensors | Dagster Docs button allow sensors to directly target softwar... button Thank you for your feedback!Corbett Carrel
05/22/2024, 7:17 PMScout
05/22/2024, 7:17 PMget_latest_materialization_event
method to fetch the latest materialization event for each partition and then determine the most recent one.
Here's a step-by-step guide on how to achieve this:
1. Define your partitioned asset: Ensure that your asset is partitioned using a PartitionsDefinition
.
2. Query the latest materialization events: Use the DagsterInstance to fetch the latest materialization events for the asset.
3. Determine the most recent partition: Extract the partition keys from the materialization events and determine the most recent one.
Here's an example of how you might implement this:
python
from dagster import DagsterInstance, AssetKey, DailyPartitionsDefinition, asset
from datetime import datetime
# Step 1: Define your partitioned asset
partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=partitions_def)
def my_partitioned_asset():
# Asset logic here
pass
# Step 2: Query the latest materialization events
def get_most_recent_materialized_partition(asset_key):
instance = DagsterInstance.get()
materialization_events = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
)
)
# Step 3: Determine the most recent partition
materialized_partitions = [
event.dagster_event.partition for event in materialization_events
if event.is_dagster_event and event.dagster_event.partition
]
if not materialized_partitions:
return None
# Assuming partition keys are date strings, sort them to find the most recent one
most_recent_partition = max(materialized_partitions, key=lambda p: datetime.strptime(p, "%Y-%m-%d"))
return most_recent_partition
# Example usage
asset_key = AssetKey("my_partitioned_asset")
most_recent_partition = get_most_recent_materialized_partition(asset_key)
print(f"The most recent materialized partition is: {most_recent_partition}")
In this example:
- my_partitioned_asset
is defined with a daily partitioning scheme starting from January 1, 2023.
- The get_most_recent_materialized_partition
function queries the latest materialization events for the specified asset key and extracts the partition keys.
- The function then determines the most recent partition by sorting the partition keys (assuming they are date strings in the format %Y-%m-%d
).
Replace "my_partitioned_asset"
with the actual name of your partitioned asset and adjust the partition key format if necessary.
This approach ensures tScout
05/22/2024, 7:17 PMCorbett Carrel
05/23/2024, 12:58 PMScout
05/23/2024, 12:58 PM@asset_sensor
decorator. This sensor will monitor the materialization events of the dbt asset and trigger a job that includes the partitioned asset when the dbt asset is materialized.
Here's a step-by-step guide on how to set this up:
1. Define your dbt asset: Use the @dbt_assets
decorator to load your dbt models as Dagster assets.
2. Define your partitioned asset: Use the @asset
decorator to define your partitioned asset with a partitioning scheme.
3. Define a job that includes the partitioned asset: Create a job that includes the partitioned asset.
4. Create a sensor that monitors the dbt asset: Use the @asset_sensor
decorator to define a sensor that monitors the dbt asset and triggers the job with the partitioned asset, specifying the latest partition to backfill.
Here's an example:
python
from dagster import asset, AssetKey, asset_sensor, RunRequest, SensorEvaluationContext, job, DailyPartitionsDefinition
from dagster_dbt import dbt_assets
from datetime import datetime
# Step 1: Define your dbt asset
@dbt_assets(
manifest_path="path/to/your/dbt/manifest.json",
select="model_dbt_consumer_app_fct_user_streak"
)
def my_dbt_assets():
pass
# Step 2: Define your partitioned asset
partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=partitions_def)
def my_partitioned_asset(context, partition_key: str):
# Use the partition_key in your asset logic
<http://context.log.info|context.log.info>(f"Processing partition: {partition_key}")
# Perform your partitioned asset logic here
...
# Step 3: Define a job that includes the partitioned asset
@job
def my_partitioned_job():
my_partitioned_asset()
# Step 4: Create a sensor that monitors the dbt asset
@asset_sensor(asset_key=AssetKey("model_dbt_consumer_app_fct_user_streak"), job=my_partitioned_job)
def my_dbt_asset_sensor(context: SensorEvaluationContext, asset_event):
# Extract the latest partition key from the dbt asset materialization
latest_partition_key = datetime.now().strftime("%Y-%m-%d") # Example: use the current date as the partition key
# Trigger the job with the partitioned asset, specifying the latest partition to backfill
yield RunRequest(
run_key=f"backfill_partition_{latest_partition_key}",
run_config={
"ops": {
"my_partitioned_asset": {
"config": {
"partition_key": latest_partition_key
}
}
}
}
)
In this example:
- my_dbt_assets
is a function decorated with @dbt_assets
that loads the dbt model `model_dbt_consumer_app_fcScout
05/23/2024, 12:59 PMmy_partitioned_asset
is a partitioned asset defined with a daily partitioning scheme starting from January 1, 2023.
- my_partitioned_job
is a job that includes the my_partitioned_asset
.
- my_dbt_asset_sensor
is a sensor that monitors the materialization of the dbt asset model_dbt_consumer_app_fct_user_streak
and triggers my_partitioned_job
when the dbt asset is materialized. The sensor specifies the latest partition to backfill using the current date as an example partition key.
Make sure to replace "path/to/your/dbt/manifest.json"
with the actual path to your dbt manifest file and adjust the asset names, paths, and partition key logic as needed.
This setup ensures that whenever the dbt asset model_dbt_consumer_app_fct_user_streak
is materialized, the sensor will trigger the job that runs the partitioned asset my_partitioned_asset
, specifying the latest partition to backfill. how to get partition info from an asset ... button Asset reconciliation sensor is triggerin... button How do I use a sensor with a partition d... button Best way for an asset_sensor to trigger ... button Thank you for your feedback!Corbett Carrel
05/24/2024, 12:33 PMdagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
Scout
05/24/2024, 12:33 PMdagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
indicates that the code is trying to access a partition_key
in a context where the run is not partitioned. This typically happens when you attempt to access the partition_key
in a run that does not have partitioning configured.
To fix this error, you need to ensure that the run configuration includes the necessary partition key when triggering the partitioned asset. Here’s how you can address this issue step-by-step:
1. Ensure the Partitioned Asset is Correctly Defined: Make sure your partitioned asset is defined with a partitioning scheme.
2. Modify the Sensor to Include the Partition Key: When creating a RunRequest
in the sensor, ensure that you specify the partition_key
in the run_config
.
3. Check the Job Configuration: Ensure that the job configuration correctly passes the partition key to the partitioned asset.
Here's an example of how you might modify the sensor to correctly pass the partition key:
python
from dagster import asset, AssetKey, asset_sensor, RunRequest, SensorEvaluationContext, job, DailyPartitionsDefinition
from dagster_dbt import dbt_assets
from datetime import datetime
# Define your dbt asset
@dbt_assets(
manifest_path="path/to/your/dbt/manifest.json",
select="model_dbt_consumer_app_fct_user_streak"
)
def my_dbt_assets():
pass
# Define your partitioned asset
partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=partitions_def)
def my_partitioned_asset(context, partition_key: str):
# Use the partition_key in your asset logic
<http://context.log.info|context.log.info>(f"Processing partition: {partition_key}")
# Perform your partitioned asset logic here
...
# Define a job that includes the partitioned asset
@job
def my_partitioned_job():
my_partitioned_asset()
# Create a sensor that monitors the dbt asset
@asset_sensor(asset_key=AssetKey("model_dbt_consumer_app_fct_user_streak"), job=my_partitioned_job)
def my_dbt_asset_sensor(context: SensorEvaluationContext, asset_event):
# Determine the latest partition key (this example uses the current date)
latest_partition_key = datetime.now().strftime("%Y-%m-%d")
# Trigger the job with the partitioned asset, specifying the latest partition to backfill
yield RunRequest(
run_key=f"backfill_partition_{latest_partition_key}",
run_config={
"ops": {
"my_partitioned_asset": {
"config": {
"partition_key": latest_partition_key
}
}
}
},
partition_key=latest_partition_key # Ensure the partition_key is specified here
)
In this exampScout
05/24/2024, 12:33 PMmy_partitioned_asset
is defined with a daily partitioning scheme. - The my_dbt_asset_sensor
sensor monitors the materialization of the dbt asset and triggers my_partitioned_job
when the dbt asset is materialized. - The RunRequest
includes the partition_key
in both the run_config
and the partition_key
parameter to ensure that the partitioned asset receives the correct partition key. Make sure to replace "path/to/your/dbt/manifest.json"
with the actual path to your dbt manifest file and adjust the asset names, paths, and partition key logic as needed. By ensuring that the partition_key
is correctly specified in the RunRequest
, you can avoid the DagsterInvariantViolationError
and ensure that the partitioned asset is correctly triggered with the appropriate partition key.Corbett Carrel
05/24/2024, 12:40 PMScout
05/24/2024, 12:40 PMDagsterInstance.ephemeral()
to create an in-memory instance for testing.
2. Build the Sensor Context: Use build_sensor_context
to create a context for the sensor evaluation.
3. Invoke the Sensor: Call the sensor function with the test context and evaluate the results.
Here's an example of how you might write a unit test for an asset sensor:
```python
from dagster import DagsterInstance, build_sensor_context, materialize_to_memory, asset, AssetKey, asset_sensor, RunRequest, SensorEvaluationContext, job
from dagster_dbt import dbt_assets
# Define your dbt asset
@dbt_assets(
manifest_path="path/to/your/dbt/manifest.json",
select="model_dbt_consumer_app_fct_user_streak"
)
def my_dbt_assets():
pass
# Define your partitioned asset
@asset
def my_partitioned_asset(context, partition_key: str):
context.log.info(f"Processing partition: {partition_key}")
return partition_key
# Define a job that includes the partitioned asset
@job
def my_partitioned_job():
my_partitioned_asset()
# Create a sensor that monitors the dbt asset
@asset_sensor(asset_key=AssetKey("model_dbt_consumer_app_fct_user_streak"), job=my_partitioned_job)
def my_dbt_asset_sensor(context: SensorEvaluationContext, asset_event):
latest_partition_key = "2023-01-01" # Example partition key for testing
yield RunRequest(
run_key=f"backfill_partition_{latest_partition_key}",
run_config={
"ops": {
"my_partitioned_asset": {
"config": {
"partition_key": latest_partition_key
}
}
}
},
partition_key=latest_partition_key
)
# Test the asset sensor
def test_my_dbt_asset_sensor():
with DagsterInstance.ephemeral() as instance:
# Materialize the dbt asset to create a materialization event
materialize_to_memory([my_dbt_assets], instance=instance)
# Build the sensor context
context = build_sensor_context(instance=instance)
# Invoke the sensor and collect the run requests
run_requests = list(my_dbt_asset_sensor(context))
# Assert that the sensor produced the expected run request
assert len(run_requests) == 1
assert run_requests[0].run_key == "backfill_partition_2023-01-01"
assert run_requests[0].run_config["ops"]["my_partitioned_asset"]["config"]["partition_key"] == "2023-01-Scout
05/24/2024, 12:41 PMDagsterInstance.ephemeral()
to create an in-memory instance for testing. 3. Materialize the dbt Asset: The materialize_to_memory
function is used to create a materialization event for the dbt asset. This simulates the condition that the sensor is monitoring. 4. Build the Sensor Context: The build_sensor_context
function creates a context for the sensor evaluation. 5. Invoke the Sensor: The sensor function is called with the test context, and the resulting RunRequest
objects are collected. 6. Assert the Results: The test asserts that the sensor produced the expected RunRequest
, including the correct run_key
and partition_key
. Make sure to replace "path/to/your/dbt/manifest.json"
with the actual path to your dbt manifest file and adjust the asset names, paths, and partition key logic as needed. This approach allows you to test the behavior of your asset sensor in isolation, ensuring that it correctly triggers the expected runs based on the monitored asset's materialization events.