https://dagster.io/ logo
#ask-ai
Title
# ask-ai
t

Tara Larrue

02/14/2024, 6:23 PM
Is it possible to define a partitioned observable sourced asset?
s

Scout

02/14/2024, 6:23 PM
Yes, it is possible to define a partitioned observable source asset in Dagster. The
@observable_source_asset
decorator can take a
partitions_def
argument. When defining an observable source asset with partitions, you should return a
DataVersionsByPartition
object in the observe function, which contains the data versions for each partition. Here is an example from the documentation on how to define a partitioned observable source asset:
Copy code
python from dagster import DataVersionsByPartition, StaticPartitionsDefinition, observable_source_asset  @observable_source_asset(     partitions_def=StaticPartitionsDefinition(["a", "b"]) ) def foo():     return DataVersionsByPartition({"a": "alpha", "b": "beta"})
In this example, the
foo
function is decorated as an observable source asset with two static partitions, "a" and "b". The function returns a
DataVersionsByPartition
object with data versions for each of these partitions.
t

Tara Larrue

02/14/2024, 6:29 PM
@Scout how does this change if the partition is multikey? Does the DataVersionsByPartition argument accept multikey names? And, am I able to view partitions in the webserver UI like other assets?
s

Scout

02/14/2024, 6:29 PM
When dealing with multikey partitions, you would use a
MultiPartitionKey
to represent the partition keys for each dimension. The
DataVersionsByPartition
object would then accept these multikey names as keys. Here's a conceptual example based on the documentation:
Copy code
python from dagster import DataVersionsByPartition, MultiPartitionKey, observable_source_asset  @observable_source_asset(partitions_def=...) def foo():     # Assume we have a function that retrieves data versions for each multikey partition     data_versions = {         MultiPartitionKey({"dimension1": "value1", "dimension2": "value2"}): data_version_for_value1_value2,         MultiPartitionKey({"dimension1": "value1", "dimension2": "value3"}): data_version_for_value1_value3,         # ... other partitions     }     return DataVersionsByPartition(data_versions)
In this example,
data_versions
is a dictionary where the keys are
MultiPartitionKey
objects representing each multikey partition, and the values are the corresponding data versions. Regarding viewing partitions in the webserver UI, yes, you should be able to view partitions for observable source assets just like other assets. Dagit, the web interface for Dagster, provides a UI for inspecting and interacting with assets, including those that are partitioned. You can see the partitions, their materializations, and associated metadata within the Dagit UI.
t

Tara Larrue

02/14/2024, 8:14 PM
@Scout How do I get a partition of a source asset to show a "materialization"? Can this happen thru observation?
s

Scout

02/14/2024, 8:14 PM
In Dagster, materializations and observations are distinct concepts: - Materializations represent the output of a computation or transformation, typically associated with an asset. When an asset is materialized, it means that the asset's data has been computed and stored somewhere, and this event is recorded in the event log. - Observations are used to record metadata about an asset without implying that the asset's data has been recomputed. Observations can be used with source assets to track information about the data, such as when new data has arrived or when the data has changed in some way. For a source asset, you cannot directly create a "materialization" because source assets are typically not the result of a Dagster computation. However, you can create an observation that indicates a new version of the data is available. If you want to represent a change in the source asset's data as a materialization in the UI, you would typically do this by having a downstream asset that processes the source data and creates a materialization event when it runs. If you have an observable source asset, you can use the observation mechanism to indicate that new data is available or that the data has changed. Here's an example of how you might define an observable source asset with an observation function:
Copy code
python from dagster import observable_source_asset, AssetObservation, EventMetadataEntry  @observable_source_asset def my_source_asset():     # Logic to check for new data or data changes     new_data_available = check_for_new_data()      if new_data_available:         # Create metadata entries to describe the observation         metadata_entries = [EventMetadataEntry.text("New data available", "Data Update")]          # Return an AssetObservation to record the observation         return AssetObservation(metadata_entries=metadata_entries)
When the observation function is run, it will generate an
AssetObservation
event that can be viewed in the Dagit UI. This observation can be used to indicate to users that the source asset has new data available, even though it's not a materialization in the traditional sense. To trigger observations for source assets, you can use the "Observe sources" button in the Dagit UI, which will execute the observation function for the source asset and record any observations. Observations can help inform users or automated systems that downstream assets may need to be updated due to changes in the source data.
t

Tara Larrue

02/14/2024, 8:20 PM
@Scout do date partitions get added automatically? I am logging AssetObservations for a source asset, but the corresponding date partition isn't being created
s

Scout

02/14/2024, 8:20 PM
In Dagster, partitions for an asset do not get created automatically simply by logging
AssetObservation
events. Partitions must be defined explicitly when you define your asset, and they represent a way to slice the data that the asset represents. For example, you might have daily partitions for an asset that represents logs, where each partition corresponds to one day's worth of logs. When you define a partitioned asset, you specify a
partitions_def
that describes how the partitions should be structured. This could be a static list of partitions, or it could be a dynamic definition based on time (e.g.,
DailyPartitionsDefinition
). Here's an example of how you might define a partitioned asset:
Copy code
python from dagster import asset, DailyPartitionsDefinition  @asset(partitions_def=DailyPartitionsDefinition(start_date="2021-01-01")) def my_partitioned_asset():     # Asset computation logic here     ...
In this example,
my_partitioned_asset
is defined with daily partitions starting from January 1, 2021. When you log an
AssetObservation
for a partitioned asset, you should specify the partition key to associate the observation with a particular partition:
Copy code
python from dagster import AssetObservation, EventMetadataEntry, build_op_context  @op def my_op(context):     # Your logic to check for new data     new_data_available = True  # Placeholder for actual check      if new_data_available:         # Log an observation for a specific partition         context.log_event(             AssetObservation(                 asset_key="my_partitioned_asset",                 partition="2021-01-01",  # Specify the partition key here                 metadata_entries=[EventMetadataEntry.text("New data available", "Data Update")]             )         )
In this example, the
AssetObservation
is associated with the partition key
"2021-01-01"
. If you are logging observations and not seeing the corresponding partitions being created or recognized in the Dagit UI, there could be a few reasons for this: 1. The
partitions_def
may not be set up correctly for the asset. 2. The partition key provided in the
AssetObservation
may not match any of the defined partitions. 3. There may be a mismatch between the partition keys and the partition definition (e.g., date format issues). To resolve this, ensure that your partition definition is correct and that the partition keys you are using in your observations match the keys defined by your partition definition. If you continue to experience issues, it may be helpful to review the asset definition and the code that logs the observations to ensure they are aligned with the partitioning scheme.
t

Tara Larrue

02/14/2024, 8:32 PM
@Scout can you set DataVersionsByPartition for a source asset from another function (not within the source asset definition)?
s

Scout

02/14/2024, 8:32 PM
No, you cannot set
DataVersionsByPartition
for a source asset from another function that is external to the source asset definition. In Dagster, the
DataVersionsByPartition
should be returned directly from the observation function that is defined within the
@observable_source_asset
decorator. This is because the observation function is responsible for providing the data versions for each partition of the source asset, and it is invoked by the system to observe changes or new data for the asset. The observation function is the designated place to encapsulate the logic for determining the data versions for each partition. It is not designed to be set or modified from outside of this function. If you need to use external data or computations to determine the data versions for each partition, you should call those external functions or gather that data within the observation function and then return the
DataVersionsByPartition
object accordingly. Here's a conceptual example of how you might use an external function within the observation function to set `DataVersionsByPartition`:
Copy code
python from dagster import observable_source_asset, DataVersionsByPartition  # This could be an external function that computes data versions def compute_data_versions():     # Logic to compute data versions for each partition     # ...     return {         "partition_1": "version_1",         "partition_2": "version_2",         # ...     }  @observable_source_asset(partitions_def=...) def my_source_asset():     data_versions = compute_data_versions()     return DataVersionsByPartition(data_versions)
In this example, the
compute_data_versions
function is external to the
my_source_asset
definition but is called within the observation function to obtain the data versions, which are then returned as a
DataVersionsByPartition
object. Remember that the observation function is the only place where you can define the behavior for observing a source asset, and it must return either a
DataVersion
or a
DataVersionsByPartition
object.
t

Tara Larrue

02/14/2024, 9:03 PM
@Scout I'm having trouble with date partitions. My assets only seem to have created the date corresponding with my specified "start_date", but has not added dates that exist after that.
s

Scout

02/14/2024, 9:03 PM
If your asset has only created a partition corresponding to the specified "start_date" and has not added subsequent dates, there are a few potential issues to consider: 1. Partition Definition: Ensure that your partition definition is set up to generate partitions beyond the start date. For example, if you're using
DailyPartitionsDefinition
, it should automatically create a new partition for each day after the start date up to the current date or a specified end date. 2. Partition Generation: Partitions are not automatically materialized; they are definitions of how the data can be sliced. You need to trigger a materialization or an observation for each partition you want to be reflected in the system. 3. Schedules or Sensors: If you expect partitions to be created automatically on a schedule, ensure that you have a schedule or sensor set up to trigger the materialization of the asset for new partitions. 4. Time Zone Issues: If you're using time zones in your partition definitions, ensure that the time zone is correctly specified and that it aligns with the time zone of the data source and the system where Dagster is running. 5. End Offset: If you're using an
end_offset
in your partition definition, it might prevent the creation of partitions up to the current date. The
end_offset
parameter determines how many partitions are created from the end date backward. For example, an
end_offset
of 1 would mean that the last partition ends before the current time. 6. Observation Function: If you're using an observation function with
@observable_source_asset
, ensure that it's correctly observing new data and returning the appropriate
DataVersionsByPartition
object. Here's an example of how you might define a daily partitioned asset:
Copy code
python
from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2021-01-01"))
def my_daily_partitioned_asset():
    # Asset computation logic here
    ...
In this example,
my_daily_partitioned_asset
should have a partition for each day starting from January 1, 2021. If you've checked these areas and are still experiencing issues, you may need to debug further: - Review the asset definition to ensure that the partition definition is correct. - Check the logs to see if there are any errors or warnings related to partition creation or materialization. - Verify that any schedules or sensors intended to trigger partition materialization are active and configured correctly. - If you're using Dagit, check the asset catalog to see the list of partitions that have been created and any associated materializations or observations. If after checking these areas you're still unable to resolve the issue, you may want to provide more
specific details about your asset definition, partition definition, and any schedules or sensors you have set up, so that we can offer more targeted assistance.
t

Tara Larrue

02/14/2024, 9:16 PM
@Scout this is my asset definition that is not resulting in new date partitions being created upon observation. What am I doing wrong?
Copy code
day_dt_partitions_def = MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date=START_DATE),
            "datatype": datatype_partitions_def,
        }
    )

@observable_source_asset(partitions_def=day_dt_partitions_def, required_resource_keys={"dp_monitoring_append"})
def datapoint_monitoring(context):
    """Datapoint monitoring database table reference.
    Partitions by date + datatype allow us to observe the time of the most recent successful insert to the table."""

    observe_query = f"""
    SELECT 
        point_time::date as date,
        datatype.name AS datatype,
        MAX(created_at) as latest_update_time
    FROM datapoint_monitoring
    LEFT JOIN datatype ON datapoint_monitoring.point_type = datatype.id
    WHERE point_time BETWEEN '{START_DATE}' AND '{datetime.now()+timedelta(days=1)}'
    GROUP BY 1,2;
    """
    data_versions = {}
    try:
        engine = context.resources.dp_monitoring_append.engine
        conn = engine.raw_connection()
        df = pd.read_sql(observe_query, conn)
    except exc.SQLAlchemyError as e:
        raise Failure(
            description="Query execution failure.",
            metadata={
                "error": str(e),
            }
        )
    else:

        for idx,row in df.iterrows():
            partition_key = MultiPartitionKey({"date": row["date"].isoformat(), "datatype": row["datatype"]})
            data_versions.update({partition_key:row["latest_update_time"].isoformat()})
            context.log_event(
                AssetObservation(
                    asset_key="datapoint_monitoring",
                    partition=partition_key,
                    metadata={"latest_update_time": row["latest_update_time"].isoformat()},
                )
            )

    return DataVersionsByPartition(data_versions)
This results in a observation for partition "LMP|2024-02-14", but when I click on the partition name in the UI, that partition does not exist.
s

Scout

02/14/2024, 9:16 PM
Based on the code snippet you've provided, it seems that you are correctly defining a
MultiPartitionsDefinition
with a daily partition for the "date" dimension and another partition definition for the "datatype" dimension. You are also correctly logging
AssetObservation
events within your
datapoint_monitoring
observable source asset function. However, there are a few things to consider that might be causing the issue with partitions not appearing in the UI: 1. Partition Keys Format: Ensure that the format of the partition keys you are creating in the
MultiPartitionKey
matches the format expected by the
DailyPartitionsDefinition
. For example, if the
DailyPartitionsDefinition
expects a date in the format "YYYY-MM-DD", the
partition_key
should match this format. 2. Observation Logging: When logging an
AssetObservation
, you are specifying the
asset_key
as a string ("datapoint_monitoring"). However, the
asset_key
should be an instance of
AssetKey
, which uniquely identifies the asset. If "datapoint_monitoring" is the name of your asset, you should create an
AssetKey
like this:
AssetKey("datapoint_monitoring")
. 3. Partition Creation: Remember that logging an
AssetObservation
does not create a partition; it only logs metadata about a partition. Partitions are defined by the
partitions_def
and are conceptual slices of your asset. They need to be materialized or observed to appear in the UI. 4. Time Zone Awareness: If your system is running in a different time zone than the data source, there might be discrepancies in the partition dates. Ensure that the time zones are aligned or explicitly handled in your partition definitions and observation logic. 5. Dagit UI: After logging the observation, you may need to refresh the Dagit UI to see the updated partitions and observations. 6. Error Handling: Ensure that your try-except block is not silently failing and that the
AssetObservation
events are indeed being logged without any exceptions. If you've checked these areas and the partitions still do not appear in the UI, you may want to look into the following additional debugging steps: - Check the Dagster instance event logs to see if there are any errors or warnings when logging the
AssetObservation
events. - Verify that the
START_DATE
variable is correctly set and that it aligns with the
DailyPartitionsDefinition
start date. - Ensure that the
observe_query
is returning the expected results and that the
for
loop is iterating over the dataframe as expected. If after these checks the issue persists, it might be helpful to reach out to the Dagster community or support with specific details about your Dagster version, the configuration of your Dagster instance, and any relevant logs or error m
essages. This could provide further insights into why the partitions are not being created as expected in the UI.
t

Tara Larrue

02/14/2024, 9:22 PM
@Scout what is the default time zone for a date partition?
s

Scout

02/14/2024, 9:22 PM
The default time zone for a date partition in Dagster, when not explicitly specified, is UTC. If you want to use a different time zone for your partitions, you need to specify it using the
timezone
parameter in your partition definition. Here's an example from the documentation on how to specify a time zone in a `DailyPartitionsDefinition`:
Copy code
python from dagster import DailyPartitionsDefinition  daily_partitions_def = DailyPartitionsDefinition(     start_date="2021-01-01",     timezone="America/Los_Angeles" )
In this example, the
timezone
parameter is set to "America/Los_Angeles", which means that each date in the partition set will exist in the Pacific Time Zone. If you do not provide the
timezone
parameter, Dagster will use UTC by default.
t

Tara Larrue

02/14/2024, 9:33 PM
@Scout I solved it by adding the
end_offset=1
parameter to
DailyPartitionsDefinition
s

Scout

02/14/2024, 9:33 PM
Great to hear that you resolved the issue by adding the
end_offset=1
parameter to your
DailyPartitionsDefinition
. This parameter extends the partition set forward, ensuring that the current day's partition is included in the partition set. By default, without specifying
end_offset
, the last partition would end before the current time, which could exclude the current day's partition depending on when the partition set is evaluated. With
end_offset=1
, you're effectively telling Dagster to include the partition for the current day, which is likely why you're now seeing the expected partitions being created. If you have any more questions or need further assistance, feel free to ask!
t

Tara Larrue

02/14/2024, 9:38 PM
@Scout can I add metadata directly to
DataVersionsByPartition
?
s

Scout

02/14/2024, 9:38 PM
No, you cannot directly add metadata to
DataVersionsByPartition
. The
DataVersionsByPartition
object is specifically designed to hold a mapping of partition keys to data versions, and it does not support attaching additional metadata. However, you can log
AssetObservation
events with metadata for each partition when you are observing a source asset. When you create an
AssetObservation
, you can include metadata entries that describe the observation. This allows you to attach relevant metadata to each partition at the time you are observing it. Here's an example of how you might log an
AssetObservation
with metadata for a partition:
Copy code
python from dagster import AssetObservation, EventMetadataEntry, MultiPartitionKey  # ... inside your observation function for the source asset  partition_key = MultiPartitionKey({"date": "2023-01-01", "datatype": "example"}) data_version = "some_version_identifier"  context.log_event(     AssetObservation(         asset_key="my_partitioned_asset",         partition=partition_key,         metadata_entries=[             EventMetadataEntry.text("Data version", data_version),             EventMetadataEntry.text("Additional info", "Some extra details about the observation"),             # ... other metadata entries         ],     ) )
In this example, the
AssetObservation
is associated with a specific
MultiPartitionKey
and includes metadata entries that provide additional information about the observation. Remember that
AssetObservation
events are used to record observations about assets, and they are separate from the actual materialization of asset data. If you need to attach metadata to materializations, you would use
AssetMaterialization
events instead, which can also include metadata entries.
t

Tara Larrue

02/14/2024, 9:39 PM
@Scout can I add system tags to
AssetObservation
?
s

Scout

02/14/2024, 9:39 PM
As of my knowledge cutoff date in early 2023,
AssetObservation
events in Dagster do not support system tags in the same way that pipeline runs do. The
AssetObservation
event is used to log metadata about an asset at a point in time, and it includes a set of metadata entries that can be displayed in the Dagit UI, but these are not the same as system tags. System tags are typically used in Dagster to annotate pipeline runs with additional context, such as the user who triggered the run, the scheduler that scheduled the run, or custom tags that you define for your own purposes. These tags can be used for filtering and organizing runs in the Dagit UI. However, if you want to include additional context or categorization information with an
AssetObservation
, you can include custom metadata entries using the
EventMetadataEntry
class. These metadata entries can contain various types of information, such as text, URLs, markdown, or JSON blobs, and they will be associated with the
AssetObservation
event in the event log and visible in the Dagit UI. Here's an example of how to include custom metadata entries with an `AssetObservation`:
Copy code
python from dagster import AssetObservation, EventMetadataEntry, op  @op def my_observation_op(context):     # Your logic to observe the asset     # ...      context.log_event(         AssetObservation(             asset_key="my_asset",             metadata_entries=[                 EventMetadataEntry.text("my_custom_tag", "my_custom_value", "Custom Tag"),                 # ... other metadata entries             ],         )     )
In this example, a custom metadata entry with the label "Custom Tag" is added to the
AssetObservation
. This entry will be visible in the Dagit UI, but it is not a system tag in the sense of pipeline run tags. If you need to use system tags or have other requirements for tagging that are not covered by the current
AssetObservation
functionality, you may want to check the latest Dagster documentation or reach out to the Dagster community for updates or best practices that may have been introduced after my knowledge cutoff date.
t

Tara Larrue

02/14/2024, 9:43 PM
@Scout how do I observe a source asset on a frequent schedule?
s

Scout

02/14/2024, 9:43 PM
To observe a source asset on a frequent schedule in Dagster, you would typically use a schedule or a sensor. Schedules allow you to run a job at a fixed interval, such as every hour or every day, while sensors can trigger runs based on certain conditions or events. Here's how you can set up a schedule to observe a source asset frequently: 1. Define an observable source asset with an observation function. 2. Define an asset job that includes the observable source asset. 3. Create a schedule that targets the asset job. Here's an example based on the documentation:
Copy code
python from dagster import define_asset_job, observable_source_asset, ScheduleDefinition  @observable_source_asset def my_source_asset():     # Logic to observe the source asset     ...  # Define an asset job that includes the observable source asset observation_job = define_asset_job("observation_job", [my_source_asset])  # Define a schedule that will run the observation job every hour observation_schedule = ScheduleDefinition(     name="observation_schedule",     cron_schedule="0 * * * *",  # This cron schedule means "at the start of every hour"     job=observation_job, )
In this example,
my_source_asset
is an observable source asset with an observation function that defines the logic for observing the source asset. The
observation_job
is an asset job that includes this source asset. The
observation_schedule
is a schedule definition that uses a cron expression to run the
observation_job
every hour. To apply this schedule, you would include it in your repository definition:
Copy code
python from dagster import repository  @repository def my_repository():     return [observation_job, observation_schedule]
Once you have defined the schedule and included it in your repository, Dagster will automatically run the observation job according to the schedule you've set up. Please note that the cron expression
"0 * * * *"
specifies that the job should run at the start of every hour. You can adjust the cron expression to match the frequency with which you want to observe your source asset. Remember to check the latest Dagster documentation or reach out to the Dagster community for any updates or changes to this functionality that may have occurred after my knowledge cutoff date.
t

Tara Larrue

02/14/2024, 10:25 PM
@Scout how can I call a source asset (or source asset job) from inside an IOManager?
s

Scout

02/14/2024, 10:25 PM
As of my knowledge cutoff date, calling a source asset or a source asset job directly from inside an
IOManager
is not a typical pattern in Dagster.
IOManagers
are designed to handle the storage and retrieval of data for assets and ops, not to trigger the execution of jobs or assets. In Dagster, the execution flow is generally defined at the job or graph level, where you specify the assets and ops that should be run and the order in which they should be executed. If you need to trigger a job or an asset based on certain conditions, you would typically use a sensor or a schedule, as mentioned in the previous answer. However, if you find yourself needing to trigger an asset computation from within an
IOManager
, you might want to reconsider your design. It could be a sign that the logic you're trying to implement in the
IOManager
should actually be part of an op or an asset. If the logic is complex and involves triggering other computations, it's likely better suited to be implemented as an op that can be part of a job's execution plan. If you have a use case where you believe you need to call a source asset or source asset job from an
IOManager
, it might be helpful to describe the use case in more detail. This way, we can provide more specific advice or alternative approaches that align with Dagster's design principles. For any advanced or non-standard use cases, it's also a good idea to consult the latest Dagster documentation or seek guidance from the Dagster community to ensure that you're following best practices and to get advice on how to implement your desired functionality.
t

Tara Larrue

02/14/2024, 10:31 PM
@Scout alternatively, how do I trigger an op after an IOmanager + pass variables from the handle_output function into that op?
s

Scout

02/14/2024, 10:31 PM
In Dagster,
IOManagers
are responsible for handling the storage and retrieval of data artifacts produced by ops. If you want to trigger an op after an
IOManager
has handled an output and pass variables from the
handle_output
function into that op, you would typically use the output of one op as the input to another op within a job. The
IOManager
would store the output of the first op, and the second op would load this output as its input. Here's a step-by-step guide on how to set this up: 1. Define your ops, making sure that the output of the first op is passed to the second op.
Copy code
python
from dagster import op, Out, In, job

@op(out=Out())
def produce_data():
    # Your logic here
    return some_data

@op(ins={"input_data": In()})
def consume_data(input_data):
    # Your logic here using the input_data
    pass
2. Define a job that sequences the ops, ensuring that the output of
produce_data
is passed to
consume_data
.
Copy code
python
@job
def my_job():
    consume_data(produce_data())
3. Implement your
IOManager
with the
handle_output
method that stores the output of
produce_data
. The
load_input
method of the
IOManager
will be used to load the stored data for
consume_data
.
Copy code
python
from dagster import IOManager, OutputContext, InputContext

class MyIOManager(IOManager):
    def handle_output(self, context: OutputContext, obj):
        # Your logic to store the obj
        # Optionally, you can add metadata or perform other actions here
        pass

    def load_input(self, context: InputContext):
        # Your logic to load the data for the input
        return loaded_data
4. Configure your job to use the custom
IOManager
.
Copy code
python
from dagster import make_values_resource

@job(resource_defs={"io_manager": make_values_resource(MyIOManager())})
def my_job():
    consume_data(produce_data())
In this setup, the
produce_data
op will run and produce some data, which will be handled by
MyIOManager
. Then, the
consume_data
op will automatically run after
produce_data
and will receive the data produced by
produce_data
as its input. If you need to pass additional variables or metadata from the
handle_output
function to the
consume_data
op, you can store this information alongside the data artifact in a way that the
load_input
method can retrieve it. For example, you could store a tuple or a dictionary that includes both the data and the additional variables, and then unpack this in
consume_data
. Please note that
IOManagers
are not designed to trigger ops directly; they are part of the data handling layer. The sequencing and triggering of ops are handled at the job or graph level in Dagster. If you have a more complex use case that doesn't fit this pattern, you may need to rethink your pip
eline design or consult the latest Dagster documentation for more advanced features that might help with your specific scenario.
t

Tara Larrue

02/14/2024, 10:37 PM
@Scout for more context, I have an IO manager that handles an upsert into a database table. The result of the upset operation tells me how many rows were actually affected. I want to use this information from the operation result in order to trigger certain source asset partitions to be observed. What's the best way to achieve this?
s

Scout

02/14/2024, 10:37 PM
In Dagster, if you want to use the result of an operation performed by an
IOManager
(such as the number of rows affected by an upsert) to trigger the observation of certain source asset partitions, you would typically handle this logic within an op and then use the result of that op to decide whether to trigger the observation of the source asset partitions. Here's a conceptual approach to achieve this: 1. Define an op that performs the upsert operation and returns the number of rows affected.
Copy code
python
from dagster import op

@op
def upsert_data_to_db(context, data):
    # Perform the upsert operation
    rows_affected = perform_upsert(data)
    return rows_affected
2. Define another op that observes the source asset partitions based on the number of rows affected.
Copy code
python
from dagster import op, AssetKey, AssetObservation

@op
def observe_source_asset_partitions(context, rows_affected):
    if rows_affected > 0:
        # Logic to determine which partitions to observe
        partitions_to_observe = determine_partitions_to_observe()

        for partition_key in partitions_to_observe:
            # Log an AssetObservation for each partition
            context.log_event(
                AssetObservation(
                    asset_key=AssetKey("my_source_asset"),
                    partition=partition_key,
                    metadata_entries=[],
                )
            )
3. Define a job that sequences these ops.
Copy code
python
from dagster import job

@job
def my_job():
    rows_affected = upsert_data_to_db()
    observe_source_asset_partitions(rows_affected)
4. Implement the
perform_upsert
function that the
upsert_data_to_db
op calls. This function should interact with the database and return the number of rows affected by the upsert operation.
Copy code
python
def perform_upsert(data):
    # Database interaction logic here
    # ...
    return rows_affected
5. Implement the
determine_partitions_to_observe
function that decides which partitions should be observed based on the upsert operation.
Copy code
python
def determine_partitions_to_observe():
    # Logic to determine which partitions to observe
    # ...
    return partitions_to_observe
In this setup, the
upsert_data_to_db
op will perform the upsert operation and return the number of rows affected. The
observe_source_asset_partitions
op will then use this information to determine which source asset partitions need to be observed and log
AssetObservation
events for those partitions. Please note that this approach assumes that the logic for determining which partitions to observe can be derived from the number of rows affected by the upsert operation. If you need more detailed information from the upsert operation to determine the partitions, you should ens
ure that the
perform_upsert
function returns the necessary details. This approach keeps the upsert logic and the observation logic within the Dagster ops, which is the recommended way to handle such scenarios. The
IOManager
is not directly involved in triggering the observation of partitions; instead, it is used for managing the input and output data artifacts for the ops. If your use case requires a different approach or if you encounter any challenges, you may want to consult the latest Dagster documentation or seek guidance from the Dagster community.
t

Tara Larrue

02/14/2024, 10:40 PM
@Scout it looks like you're suggesting I don't use an IO Manager at all for this use case, is that correct?
s

Scout

02/14/2024, 10:40 PM
Yes, that's correct. Based on the documentation, for the use case you described—where you want to trigger certain source asset partitions to be observed based on the result of a database upsert operation (specifically, the number of rows affected)—it is more appropriate to handle this logic within ops rather than using an
IOManager
. The
IOManager
in Dagster is primarily responsible for handling the storage and retrieval of data artifacts. It is not designed to trigger further computations or observations based on the results of I/O operations. Instead, ops are the building blocks in Dagster that encapsulate the business logic and computation, and they are better suited for this kind of conditional workflow. By using ops to perform the upsert and then conditionally triggering the observation of source asset partitions, you maintain a clear separation of concerns and keep the orchestration logic within the Dagster execution plan. This approach aligns with Dagster's design principles and allows you to leverage the full capabilities of the system for managing dependencies and execution flow. If you need to pass along the result of the upsert operation (such as the number of rows affected) to downstream ops for further processing or decision-making, you can simply return that result from the op that performs the upsert and then pass it as an input to the subsequent ops that need it. This keeps the data flow explicit and manageable within the Dagster job.
t

Tara Larrue

02/15/2024, 3:21 AM
@Scout how do I log metadata + a new data version in the same asset observation for a partitioned source asset?
s

Scout

02/15/2024, 3:21 AM
To log metadata and a new data version in the same
AssetObservation
for a partitioned source asset, you would use the
AssetObservation
event within the observation function of your
@observable_source_asset
. You can include both metadata entries and a data version in the
AssetObservation
. Here's an example of how you might do this:
Copy code
python from dagster import (     observable_source_asset,     AssetObservation,     EventMetadataEntry,     MultiPartitionKey,     DataVersion, )  @observable_source_asset(partitions_def=...) def my_partitioned_source_asset(context):     # Your logic to determine the new data version and metadata     new_data_version = "new_version_identifier"     metadata_entries = [         EventMetadataEntry.text("description", "New data available"),         # Add more metadata entries as needed     ]      # Your logic to determine the relevant partition key     partition_key = MultiPartitionKey({"date": "2023-01-01", "datatype": "example"})      # Log the AssetObservation with the new data version and metadata     context.log_event(         AssetObservation(             asset_key="my_partitioned_source_asset",             partition=partition_key,             metadata_entries=metadata_entries,             data_version=DataVersion(new_data_version),         )     )      # Return the new data version for the observed partition     return DataVersionsByPartition({partition_key: new_data_version})
In this example, the
my_partitioned_source_asset
function is decorated with
@observable_source_asset
and defines the observation logic for a partitioned source asset. Within the function, we determine the new data version and create a list of metadata entries. We then create a
MultiPartitionKey
for the relevant partition. When logging the
AssetObservation
, we include the
asset_key
,
partition
,
metadata_entries
, and
data_version
. The
data_version
is an instance of
DataVersion
that represents the new version of the data for the observed partition. Finally, we return a
DataVersionsByPartition
object that maps the partition key to the new data version. This object is used by Dagster to track the data versions for each partition of the source asset. By following this pattern, you can log both metadata and a new data version for a partitioned source asset in Dagster.
t

Tara Larrue

02/15/2024, 3:29 AM
@Scout data_version seems to be an unexpected argument for AssetObservation. I am using dagster 1.6.3 - is there another way to do this?
s

Scout

02/15/2024, 3:29 AM
In Dagster 1.6.3, the
AssetObservation
event does not have a
data_version
argument. Instead, you should return a
DataVersion
or
DataVersionsByPartition
from the observation function of your
@observable_source_asset
. The
DataVersion
or
DataVersionsByPartition
represents the new data version for the asset or its partitions, and the
AssetObservation
event can include metadata entries related to the observation. Here's how you can log metadata and return a new data version in the observation function:
Copy code
python from dagster import (     observable_source_asset,     AssetObservation,     EventMetadataEntry,     MultiPartitionKey,     DataVersionsByPartition, )  @observable_source_asset(partitions_def=...) def my_partitioned_source_asset(context):     # Your logic to determine the new data version and metadata     new_data_version = "new_version_identifier"     metadata_entries = [         EventMetadataEntry.text("description", "New data available"),         # Add more metadata entries as needed     ]      # Your logic to determine the relevant partition key     partition_key = MultiPartitionKey({"date": "2023-01-01", "datatype": "example"})      # Log the AssetObservation with metadata     context.log_event(         AssetObservation(             asset_key="my_partitioned_source_asset",             partition=partition_key,             metadata_entries=metadata_entries,         )     )      # Return the new data version for the observed partition     return DataVersionsByPartition({partition_key: new_data_version})
In this updated example, we log an
AssetObservation
with metadata entries but without specifying a
data_version
argument, as it's not supported in the version you're using. Instead, we return a
DataVersionsByPartition
object from the observation function, which maps the partition key to the new data version. This approach aligns with the capabilities of Dagster 1.6.3 and allows you to log metadata about the observation while also indicating the new data version for the asset's partitions. The
DataVersionsByPartition
return value is used by Dagster to track the data versions for each partition of the source asset.
t

Tara Larrue

02/15/2024, 3:33 AM
@Scout I have a partitioned observable source asset + a job associated with it. How can I launch this job from inside another job for a particular partition?
s

Scout

02/15/2024, 3:33 AM
In Dagster, you can launch a job from within another job by using the
execute_pipeline
function or by triggering a job through an asset sensor. However, as of my knowledge cutoff date, there isn't a direct way to launch one job from within another job while specifying a particular partition. Jobs in Dagster are typically triggered by schedules or sensors, and partition selection is often done at the time of configuring the schedule or sensor. That being said, if you have a partitioned observable source asset and an associated job, and you want to trigger an observation for a specific partition from another job, you would typically use an asset sensor to do this. An asset sensor can monitor changes to an asset (such as new materializations or observations) and trigger a job run in response. Here's a conceptual example of how you might set up an asset sensor to trigger a job based on a new observation for a specific partition:
Copy code
python
from dagster import sensor, RunRequest, repository, define_asset_job

# Define your observable source asset
@observable_source_asset(partitions_def=...)
def my_partitioned_source_asset():
    ...

# Define the job associated with the observable source asset
my_asset_observation_job = define_asset_job(
    "my_asset_observation_job",
    selection=[my_partitioned_source_asset],
)

# Define an asset sensor that triggers the job for a specific partition
@sensor(job=my_asset_observation_job)
def my_asset_sensor(context):
    # Logic to check for new observations or materializations
    # ...

    # If a new observation for a specific partition is detected, trigger the job
    if new_observation_detected:
        run_config = {
            "ops": {
                "my_partitioned_source_asset": {
                    "config": {
                        "partition": "specific_partition_key"
                    }
                }
            }
        }
        return RunRequest(run_key="unique_run_key", run_config=run_config)

# Include the job and sensor in your repository
@repository
def my_repository():
    return [my_asset_observation_job, my_asset_sensor]
In this example,
my_asset_sensor
is an asset sensor that checks for new observations or materializations of
my_partitioned_source_asset
. If it detects a new observation for a specific partition, it creates a
RunRequest
with a run configuration that specifies the partition key. This run configuration is used to launch
my_asset_observation_job
with the context of the specific partition. Please note that the example above is conceptual, and the actual implementation may vary based on your specific use case and the version of Dagster you are using. Additionally, the ability to specify partition keys directly in the run configuration for a job may depend o
n how your assets and jobs are set up to handle partitions. If you need to trigger a job from within another job's op and pass a specific partition key, you might need to use custom logic to handle this, as it's not a typical pattern in Dagster. You may also want to consult the latest Dagster documentation or seek guidance from the Dagster community for updates or best practices related to triggering jobs for specific partitions.
t

Tara Larrue

02/15/2024, 3:39 AM
@Scout If I log an asset observation from an op, like the following, will that trigger an eager auto-materialization of the downstream asset of "datapoint_monitoring"?
Copy code
partition_key = MultiPartitionKey({"date": day, "datatype": datatype})
context.log_event(AssetObservation(asset_key="datapoint",
                                     partition=partition_key,
                                     metadata={"num_inserted_rows": rows_affected},
                                     tags={"data_version": datetime.now().isoformat()})
                )
s

Scout

02/15/2024, 3:39 AM
Logging an
AssetObservation
from an op does not trigger an eager auto-materialization of downstream assets.
AssetObservation
events are used to record metadata about an asset at a specific point in time, but they do not imply that the asset's data has been mutated or that a materialization has occurred. Auto-materialization in Dagster is typically triggered by changes in upstream assets' materializations. If an upstream asset is materialized and there are downstream assets that depend on it, Dagster can automatically materialize those downstream assets if they are configured to do so. However, an
AssetObservation
by itself does not constitute a materialization and therefore does not automatically trigger materializations of downstream assets. If you want to trigger materializations of downstream assets based on certain conditions, you would need to use a different mechanism, such as an asset sensor that monitors for specific events (like materializations or observations) and then triggers a run request for a job that materializes the downstream assets. It's also important to note that the
tags
parameter in the
AssetObservation
event is not supported. Instead, you should use
metadata_entries
to attach metadata to an
AssetObservation
. Here's an example of how to correctly log an
AssetObservation
with metadata:
Copy code
python from dagster import AssetObservation, EventMetadataEntry, MultiPartitionKey  # Inside your op partition_key = MultiPartitionKey({"date": day, "datatype": datatype}) metadata_entries = [     <http://EventMetadataEntry.int|EventMetadataEntry.int>(value=rows_affected, label="num_inserted_rows"),     EventMetadataEntry.text(value=datetime.now().isoformat(), label="data_version"), ]  context.log_event(     AssetObservation(         asset_key="datapoint_monitoring",         partition=partition_key,         metadata_entries=metadata_entries,     ) )
In this corrected example,
metadata_entries
is used to attach metadata to the
AssetObservation
. This metadata will be recorded in the event log and can be viewed in the Dagit UI, but it will not trigger downstream materializations. To trigger downstream materializations, you would need to materialize the asset explicitly or set up a sensor to do so based on the observations.
t

Tara Larrue

02/15/2024, 3:47 AM
@Scout I have 2 assets that have multi-partitions, with only 1 matching dimension between them. The upstream asset's partitions is "region" and "datatype", the downstream asset's partitions are "datatype" and "date" (daily). How do I trigger a materialization of the downstream asset based on the "datatype" partition and the date present in the contents of the dataframe outputted in the upstream asset?
s

Scout

02/15/2024, 3:47 AM
To trigger a materialization of the downstream asset based on the "datatype" partition and the date present in the contents of the dataframe outputted by the upstream asset, you can use an asset sensor. The asset sensor can monitor the materialization of the upstream asset and then trigger the downstream asset's job with the appropriate partition keys. Here's a conceptual approach to how you might set up an asset sensor for this purpose: 1. Define your upstream and downstream assets with their respective partition definitions.
Copy code
python
from dagster import asset, MultiPartitionsDefinition, DailyPartitionsDefinition

# Define your upstream asset with "region" and "datatype" partitions
@asset(partitions_def=MultiPartitionsDefinition({
    "region": ...,
    "datatype": ...,
}))
def upstream_asset():
    # Your logic here
    ...

# Define your downstream asset with "datatype" and "date" partitions
@asset(partitions_def=MultiPartitionsDefinition({
    "datatype": ...,
    "date": DailyPartitionsDefinition(start_date="2021-01-01"),
}))
def downstream_asset():
    # Your logic here
    ...
2. Create an asset sensor that triggers the downstream asset's job based on the upstream asset's materialization.
Copy code
python
from dagster import sensor, RunRequest, AssetKey

@sensor(job=downstream_asset)
def my_asset_sensor(context):
    # Get the latest materialization event for the upstream asset
    materialization_event = context.instance.get_latest_materialization(AssetKey("upstream_asset"))

    if materialization_event:
        # Extract the "datatype" partition from the upstream asset's materialization
        datatype_partition = materialization_event.dagster_event.event_specific_data.partition

        # Extract the date from the contents of the dataframe in the upstream asset's materialization
        # This assumes you have a way to get the date from the materialization event, such as metadata
        date_from_dataframe = extract_date_from_dataframe(materialization_event)

        # Trigger the downstream asset's job with the appropriate partition keys
        run_config = {
            "ops": {
                "downstream_asset": {
                    "config": {
                        "datatype": datatype_partition,
                        "date": date_from_dataframe,
                    }
                }
            }
        }
        return RunRequest(run_key="unique_run_key", run_config=run_config)

def extract_date_from_dataframe(materialization_event):
    # Your logic to extract the date from the dataframe's contents
    # This is a placeholder for the actual implementation
    return "2021-01-01"
In this example, the
my_asset_sensor
function is an asset sensor that checks for the latest materialization of the upstream asset. Whe
n it detects a new materialization, it extracts the "datatype" partition and the date from the dataframe's contents. It then constructs a
RunRequest
with a run configuration that specifies the "datatype" and "date" partition keys for the downstream asset's job. Please note that the
extract_date_from_dataframe
function is a placeholder for the actual implementation you would need to write to extract the date from the dataframe's contents. This might involve accessing metadata that was logged during the upstream asset's materialization or directly inspecting the dataframe if it's available. This approach allows you to trigger the downstream asset's materialization based on the specific "datatype" partition and the date present in the upstream asset's output. Remember to replace the placeholders with your actual partition definitions and logic for extracting the date from the dataframe.
t

Tara Larrue

02/15/2024, 4:13 AM
@Scout how do I save metadata with a materialization event?
s

Scout

02/15/2024, 4:13 AM
To save metadata with a materialization event in Dagster, you can use the
AssetMaterialization
class to create a materialization event and attach metadata to it. This metadata can include various types of information, such as text, URLs, markdown, or JSON blobs, and it will be associated with the materialization event in the event log and visible in the Dagit UI. Here's an example of how to log an
AssetMaterialization
with metadata within an op:
Copy code
python from dagster import op, AssetMaterialization, EventMetadataEntry  @op def my_op(context):     # Your logic to produce some data     data = produce_data()      # Your logic to persist the data, e.g., to a database or a file system     persist_data(data)      # Create metadata entries to describe the materialization     metadata_entries = [         EventMetadataEntry.text("Text-based metadata for this event", "text_metadata"),         EventMetadataEntry.url("<http://mycoolsite.com/url_for_my_data>", "dashboard_url"),         <http://EventMetadataEntry.int|EventMetadataEntry.int>(len(data), "row_count"),         # Add more metadata entries as needed     ]      # Log the AssetMaterialization event with the metadata     yield AssetMaterialization(         asset_key="my_asset",         description="Persisted result to storage",         metadata_entries=metadata_entries,     )      # Continue with the rest of your op logic     ...
In this example, the
my_op
op performs some data production and persistence logic. After persisting the data, it creates a list of
EventMetadataEntry
objects that describe various aspects of the materialization, such as a text description, a URL, and the row count of the data. The op then yields an
AssetMaterialization
event, specifying the
asset_key
for the materialized asset, a description of the materialization, and the list of metadata entries. When this op is executed within a job, Dagster will record the materialization event along with the attached metadata. Remember to replace the placeholders in this example with your actual logic for data production, persistence, and metadata that is relevant to your use case.
t

Tara Larrue

02/15/2024, 4:23 AM
@Scout
latest_materializations = context.latest_materialization_records_by_partition_and_asset()
can I get any information on the dataframe that was materialized in these items?
s

Scout

02/15/2024, 4:23 AM
The
context.latest_materialization_records_by_partition_and_asset()
function returns a mapping of partition keys to a mapping of asset keys to the latest materialization event records for those partitions. However, these materialization event records do not directly contain the dataframe or its contents. Instead, they contain metadata about the materialization event, such as the asset key, partition, and any metadata entries that were logged when the materialization occurred. If you have logged metadata entries that describe the contents of the dataframe during the materialization, such as row counts, column names, or summary statistics, you can access this metadata from the materialization event records. Here's an example of how you might access metadata from the latest materialization records:
Copy code
python latest_materializations = context.latest_materialization_records_by_partition_and_asset()  for partition_key, asset_materializations in latest_materializations.items():     for asset_key, materialization_record in asset_materializations.items():         if materialization_record:             materialization_data = materialization_record.dagster_event.event_specific_data              # Access the metadata entries from the materialization             metadata_entries = materialization_data.metadata_entries              # Extract specific metadata entries, such as row count             row_count_entry = next((entry for entry in metadata_entries if entry.label == "row_count"), None)             if row_count_entry:                 row_count = row_count_entry.entry_data.value              # You can access other metadata entries in a similar way             # ...              # Use the metadata as needed             <http://context.log.info|context.log.info>(f"Asset {asset_key} in partition {partition_key} has {row_count} rows.")
In this example, we iterate over the latest materialization records, access the
metadata_entries
from each materialization event, and extract specific metadata entries by their label. We then use this metadata for logging or other purposes. If you need to access the actual contents of the dataframe that was materialized, you would typically do this by loading the data from the storage system where it was persisted. This could involve using an
IOManager
or custom logic to read the data from a file, database, or other storage backend based on the asset key and partition. Remember that the ability to access and use the metadata will depend on what metadata was logged during the materialization event. If you need to work with the actual data contents, you will need to retrieve them from where they were stored.
t

Tara Larrue

02/15/2024, 4:25 AM
@Scout using dagster 1.6.3, how would I log the
max(created_at)
stat for this asset materialization?
Copy code
@asset(partitions_def=ba_dt_partitions_def,
       auto_materialize_policy=auto_policy_max5,
       io_manager_key="dp_monitoring_upsert",
       output_required=False)
def datapoint_monitoring(context, input_data_sources):
    """Datapoint monitoring database table, partitioned by date + datatype.
     Transform scraped input datapoints into `datapoint_monitoring` table format + upsert to datapoint_monitoring."""
    cur = get_cur(readonly=False)

    processed_df = input_data_sources.copy()
    abbrev_to_id = get_ba_abbrev_to_id(cur)
    processed_df["ba_id"] = processed_df["abbrev"].map(abbrev_to_id)
    datatype_to_id = get_datapoint_type_to_id(cur)
    processed_df["point_type"] = processed_df["datatype"].map(datatype_to_id)
    processed_df['point_time'] = processed_df["ts"]
    processed_df['frequency'] = 300
    processed_df['meta'] = ''
    processed_df['created_at'] = datetime.utcnow()

    return processed_df[['ba_id', 'point_time', 'point_type', 'value', 'frequency', 'meta', 'created_at']]
s

Scout

02/15/2024, 4:25 AM
To log the
max(created_at)
statistic for the
datapoint_monitoring
asset materialization in Dagster 1.6.3, you can use the
AssetMaterialization
event within your asset function. You would calculate the maximum
created_at
value from the dataframe and then include it as a metadata entry when you log the materialization event. Here's an example of how you might do this:
Copy code
python from dagster import asset, AssetMaterialization, EventMetadataEntry from datetime import datetime  # Assuming ba_dt_partitions_def and auto_policy_max5 are defined elsewhere @asset(partitions_def=ba_dt_partitions_def,        auto_materialize_policy=auto_policy_max5,        io_manager_key="dp_monitoring_upsert",        output_required=False) def datapoint_monitoring(context, input_data_sources):     # ... existing logic to process the dataframe ...      # Calculate the max(created_at) value from the dataframe     max_created_at = processed_df['created_at'].max()      # Log the AssetMaterialization event with the max(created_at) metadata     yield AssetMaterialization(         asset_key="datapoint_monitoring",         description="Upserted datapoint_monitoring data",         metadata_entries=[             EventMetadataEntry.text(                 label="max_created_at",                 text=str(max_created_at),                 description="The maximum created_at timestamp for the records in this materialization"             ),             # Include other metadata entries as needed         ]     )      return processed_df[['ba_id', 'point_time', 'point_type', 'value', 'frequency', 'meta', 'created_at']]
In this example, after processing the dataframe, we calculate the maximum value of the
created_at
column using the
max()
function provided by pandas. We then yield an
AssetMaterialization
event, specifying the
asset_key
, a description, and a list of metadata entries. The metadata entry for
max_created_at
includes the maximum timestamp as a text value. When this asset function is executed within a job, Dagster will record the materialization event along with the attached metadata, which will include the
max(created_at)
statistic. This metadata will be visible in the Dagit UI and can be used for tracking and analysis purposes. Please ensure that the rest of your asset function and any other necessary context (such as the partition definition and auto materialization policy) are correctly defined and that the dataframe
processed_df
is being created as expected.
t

Tara Larrue

02/15/2024, 4:30 AM
@Scout metadata_entries is not an expected argument
s

Scout

02/15/2024, 4:30 AM
I apologize for the confusion. In Dagster 1.6.3, you should use the
metadata
parameter instead of
metadata_entries
when yielding an
AssetMaterialization
. The
metadata
parameter takes a dictionary where the keys are string labels and the values are instances of
MetadataValue
. Here's how you can log the
max(created_at)
statistic for the
datapoint_monitoring
asset materialization using the
metadata
parameter:
Copy code
python from dagster import asset, AssetMaterialization, MetadataValue  @asset(partitions_def=ba_dt_partitions_def,        auto_materialize_policy=auto_policy_max5,        io_manager_key="dp_monitoring_upsert",        output_required=False) def datapoint_monitoring(context, input_data_sources):     # ... existing logic to process the dataframe ...      # Calculate the max(created_at) value from the dataframe     max_created_at = processed_df['created_at'].max()      # Log the AssetMaterialization event with the max(created_at) metadata     yield AssetMaterialization(         asset_key="datapoint_monitoring",         description="Upserted datapoint_monitoring data",         metadata={             "max_created_at": MetadataValue.text(str(max_created_at))         }     )      return processed_df[['ba_id', 'point_time', 'point_type', 'value', 'frequency', 'meta', 'created_at']]
In this updated example, we use the
metadata
parameter with a dictionary to log the
max(created_at)
statistic. The
MetadataValue.text
method is used to create a text metadata value from the maximum
created_at
timestamp. When this asset function is executed within a job, Dagster will record the materialization event along with the attached metadata, which will include the
max(created_at)
statistic. This metadata will be visible in the Dagit UI and can be used for tracking and analysis purposes.
t

Tara Larrue

02/15/2024, 5:01 AM
@Scout can you fix this function? I'm using 1.6.3 and
dagster_event
is not an attribute of materialization_record.
Copy code
@multi_asset_sensor(
    monitored_assets=[AssetKey("datapoint_monitoring")],
    job=datapoint_refresh_job,
    default_status=DefaultSensorStatus.RUNNING,
    minimum_interval_seconds=45,
)
def datapoint_monitoring_asset_sensor(context):
    """If a datapoint_monitoring partition is updated, refresh downstream datapoint partition."""
    run_requests = []

    # Get the latest materialization records for the monitored assets
    latest_materializations = context.latest_materialization_records_by_partition_and_asset()

    # Iterate over the partitions and check for materializations
    for partition, materialization_record in latest_materializations.items():
        # If there is a materialization record for the partition, trigger the downstream job for matching partitions
        if materialization_record:
            datatype = partition.keys_by_dimension["datatype"]
            min_point_time = materialization_record.dagster_event.event_specific_data.metadata["min_point_time"]
            max_point_time = materialization_record.dagster_event.event_specific_data.metadata["max_point_time"]
            for date in pd.date_range(start=min_point_time, end=max_point_time):
                multikey = {"date": date, "datatype": datatype}
                run_requests.append(RunRequest(partition_key=MultiPartitionKey(multikey)))

    context.update_cursor(context.cursor)

    return run_requests
s

Scout

02/15/2024, 5:01 AM
In Dagster 1.6.3, to access the metadata of a materialization event, you should use the
event_log_entry
attribute of the
EventLogRecord
object, which contains the
DagsterEvent
object. The
DagsterEvent
object then has an
event_specific_data
attribute that contains the materialization metadata. Here's how you can fix the function to access the metadata correctly:
Copy code
python
from dagster import multi_asset_sensor, AssetKey, RunRequest, MultiPartitionKey
import pandas as pd

@multi_asset_sensor(
    monitored_assets=[AssetKey("datapoint_monitoring")],
    job=datapoint_refresh_job,
    default_status=DefaultSensorStatus.RUNNING,
    minimum_interval_seconds=45,
)
def datapoint_monitoring_asset_sensor(context):
    """If a datapoint_monitoring partition is updated, refresh downstream datapoint partition."""
    run_requests = []

    # Get the latest materialization records for the monitored assets
    latest_materializations = context.latest_materialization_records_by_partition_and_asset()

    # Iterate over the partitions and check for materializations
    for partition, materialization_record in latest_materializations.items():
        # If there is a materialization record for the partition, trigger the downstream job for matching partitions
        if materialization_record:
            datatype = partition.keys_by_dimension["datatype"]
            # Access the metadata entries from the materialization event
            metadata_entries = materialization_record.event_log_entry.dagster_event.event_specific_data.materialization.metadata_entries
            # Extract min_point_time and max_point_time from metadata entries
            min_point_time = next((entry.entry_data.value for entry in metadata_entries if entry.label == "min_point_time"), None)
            max_point_time = next((entry.entry_data.value for entry in metadata_entries if entry.label == "max_point_time"), None)
            if min_point_time and max_point_time:
                for date in pd.date_range(start=min_point_time, end=max_point_time):
                    multikey = {"date": date.strftime("%Y-%m-%d"), "datatype": datatype}
                    run_requests.append(RunRequest(partition_key=MultiPartitionKey(multikey)))

    context.update_cursor(context.cursor)

    return run_requests
In this updated function, we iterate over the latest materialization records and extract the
min_point_time
and
max_point_time
from the metadata entries. We then use these values to generate a range of dates and create run requests for each date within the range, using the
MultiPartitionKey
with the "date" and "datatype" dimensions. Please note that the
update_cursor
method is used to update the sensor's cursor to the latest processed event. However, if you want to advance
the cursor for each materialization record, you should use
context.advance_cursor
instead, passing the relevant asset key and the materialization record. Also, ensure that
datapoint_refresh_job
is defined elsewhere in your code and that it is the correct job to run for the downstream asset. The
min_point_time
and
max_point_time
metadata entries must be logged during the materialization of the
datapoint_monitoring
asset for this sensor to work as expected.
m

martin o leary

03/04/2024, 8:03 AM
@Scout how can I log properly rendered markdown content to the structured event stream during the execution of an op?
s

Scout

03/04/2024, 8:03 AM
To log properly rendered markdown content to the structured event stream during the execution of an op in Dagster, you can use the
EventMetadataEntry.md
method to create a markdown metadata entry. This entry can then be included in an
AssetMaterialization
or
AssetObservation
event, which is logged using the
context.log_event
method. Here's an example of how you might log markdown content with an
AssetMaterialization
event:
Copy code
python
from dagster import op, AssetMaterialization, EventMetadataEntry

@op
def my_op(context):
    # Your op logic here

    # Create a markdown metadata entry
    markdown_content = "## This is a header\n\nThis is some *markdown* content."
    markdown_metadata = EventMetadataEntry.md(markdown_content, label="my_markdown")

    # Log the AssetMaterialization event with the markdown metadata
    context.log_event(
        AssetMaterialization(
            asset_key="my_asset",
            description="A description of the materialization event",
            metadata_entries=[markdown_metadata],
        )
    )
In this example, the
my_op
op creates a markdown metadata entry using
EventMetadataEntry.md
with the markdown content and a label. It then logs an
AssetMaterialization
event with this markdown metadata entry included in the
metadata_entries
list. When this op is executed within a job, Dagster will record the materialization event along with the attached markdown metadata. The markdown content will be properly rendered in the Dagit UI, allowing you to view formatted markdown content associated with the event. Remember to replace the placeholder markdown content with your actual markdown content that you want to log. Op Events | Dagster button Dagster Docs button Support for
DbtCliResource
commands to... button User log messages in `execute_pipeline_i... button Thank you for your feedback!
12 Views