I changed a dagster asset called my_asset_1 to a d...
# ask-ai
g
I changed a dagster asset called my_asset_1 to a dagster multi_asset called my_function. Here is my method.
Copy code
from dagster import AssetOut, multi_asset, AssetExecutionContext


@multi_asset(
    outs={
        "my_string_asset": AssetOut(),
        "my_int_asset": AssetOut(),
    }
)
def my_function():
    return "abc", 123
how to deal with other assets that depends on the original "my_asset_1"?
Copy code
@asset
def my_asset_2(context, my_asset_1):
    call_method()

@asset
def my_asset_3(context, my_asset_1):
    call_method()
I would like to create new dependencies between my_asset_2 with a singular asset generated by "my_function". I want also to create two dependecies to pass to "my_asset_3" with 2 assets generated by the multi_asset "my_function" The error I get for now is the following: Input asset '["my_asset_1"]' for asset '["my_asset_2"]' is not produced by any of the provided asset ops and is not one of the provided sources
s
When you transition from a single asset to a multi-asset, you need to update the dependencies of the downstream assets that previously depended on the original asset. In your case,
my_asset_1
has been replaced by
my_function
, which now produces two separate assets:
my_string_asset
and
my_int_asset
. To fix the dependency issue, you need to update
my_asset_2
and
my_asset_3
to depend on the new assets produced by
my_function
. Here's how you can do it: For
my_asset_2
, if you want to depend on `my_string_asset`:
Copy code
python @asset def my_asset_2(context, my_string_asset):     call_method()
For
my_asset_3
, if you want to depend on both
my_string_asset
and `my_int_asset`:
Copy code
python @asset def my_asset_3(context, my_string_asset, my_int_asset):     call_method()
By updating the function signatures of
my_asset_2
and
my_asset_3
to include the new asset keys (
my_string_asset
and
my_int_asset
) as parameters, you establish the correct dependencies. This should resolve the error you're encountering, as the assets
my_string_asset
and
my_int_asset
are now produced by the
my_function
multi-asset. Remember that the names of the arguments in the
@asset
-decorated functions must match the asset keys they depend on. In this case,
my_string_asset
and
my_int_asset
are the asset keys produced by
my_function
, and they are used as argument names in the downstream assets to establish the dependency.
g
@Scout ok but "my_asset_2" and "my_asset_3" are in another module. How to import the
my_string_asset
and
my_int_asset
to use as parameters of the asset methods?
s
When assets are defined in different modules, you can use the
AssetKey
to specify dependencies across modules. The
AssetKey
is a way to reference assets by their unique identifier, which is typically the asset name or a list representing a hierarchical path. To import
my_string_asset
and
my_int_asset
as dependencies in another module, you don't need to import the actual Python functions or objects. Instead, you define the dependencies using the
AssetKey
or the
AssetIn
object, which allows you to specify the asset keys of the dependencies. Here's an example of how you might define
my_asset_2
and
my_asset_3
in another module, using
AssetKey
to reference the assets produced by `my_function`:
Copy code
python from dagster import asset, AssetKey, AssetIn  # Assuming 'my_string_asset' and 'my_int_asset' are the asset keys for the assets produced by 'my_function'  @asset(ins={"my_string_asset": AssetIn(key=AssetKey("my_string_asset"))}) def my_asset_2(context, my_string_asset):     call_method()  @asset(ins={     "my_string_asset": AssetIn(key=AssetKey("my_string_asset")),     "my_int_asset": AssetIn(key=AssetKey("my_int_asset")) }) def my_asset_3(context, my_string_asset, my_int_asset):     call_method()
In this example,
AssetIn
is used to specify the input assets for
my_asset_2
and
my_asset_3
. The
key
parameter of
AssetIn
takes an
AssetKey
, which you can use to reference the assets by their keys. By using
AssetKey
or
AssetIn
with the appropriate keys, you tell Dagster about the dependencies without needing to import the actual asset functions from the other module. This approach helps to avoid circular dependencies and keeps your asset definitions modular.
g
@Scout I have this asset.
Copy code
@asset(
    description="asset_description",
    group_name="group_1",
    metadata={"name": MetadataValue.text("goal of the asset"),
              "deps": MetadataValue.text("another_asset"),}
    )
def my_asset_2(context, another_asset):
    do_something()
Now I want to transform it in a multi_asset, but how to deal with metadata? the metadata attribute does not exists for dagster multi_asset.
Copy code
@multi_asset(
    outs={
        "sub_asset_1": AssetOut(is_required=False),
        "sub_asset_2": AssetOut(is_required=False),
    },
    can_subset=True,
    description="goal of the asset",
    group_name="group_1")
def new_asset(context):
    do_something()
I would like to have static metadata for sub_asset_1 and sub_asset_2
s
As of my knowledge cutoff date in early 2023, the
multi_asset
decorator does not support a
metadata
parameter directly. However, you can attach metadata to individual assets within a
multi_asset
by using the
AssetMaterialization
event within the body of the multi-asset computation function. Here's an example of how you might attach metadata to
sub_asset_1
and
sub_asset_2
within the
new_asset
multi-asset function:
Copy code
python from dagster import multi_asset, AssetOut, AssetMaterialization, MetadataValue  @multi_asset(     outs={         "sub_asset_1": AssetOut(is_required=False),         "sub_asset_2": AssetOut(is_required=False),     },     can_subset=True,     description="goal of the asset",     group_name="group_1" ) def new_asset(context):     # Perform your computation logic here     result_1, result_2 = do_something()      # Yield AssetMaterialization events with metadata for each sub-asset     yield AssetMaterialization(         asset_key="sub_asset_1",         metadata={             "name": MetadataValue.text("goal of sub_asset_1"),             "deps": MetadataValue.text("another_asset"),         }     )     yield AssetMaterialization(         asset_key="sub_asset_2",         metadata={             "name": MetadataValue.text("goal of sub_asset_2"),             "deps": MetadataValue.text("another_asset"),         }     )      # Return the results for each sub-asset     return result_1, result_2
In this example,
do_something()
is a placeholder for your actual computation logic, which should return the results for
sub_asset_1
and
sub_asset_2
. The
AssetMaterialization
events are yielded before returning the results, and each event specifies the
asset_key
and a
metadata
dictionary for the corresponding sub-asset. By yielding
AssetMaterialization
events with metadata, you can attach static metadata to each sub-asset produced by the
multi_asset
. This metadata will be recorded in the Dagster event log and will be associated with the materializations of the respective sub-assets.
g
@Scout My multi_asset is this for now:
Copy code
@multi_asset(
    outs={
        "sub_asset_1": AssetOut(is_required=False),
        "sub_asset_2": AssetOut(is_required=False),
    },
    can_subset=True,
    description="goal of the asset",
    group_name="group_1",
    partitions_def=set_partitions_years,
)
def new_asset(context):
    year = int(context.partition_key)
    euro_prep = ClassPreparation(year=year, logger=context.log)
    df_1, df_2 = euro_prep.prepare_data()

    # Prepare the dictionary for the multi-asset metadata function
    df_dict = {
        'sub_asset_1': df_1,
        'sub_asset_2': df_2,
    }
    # Get dynamic metadata and add them to the context
    am.get_asset_metadata_multi(context, df_dict, euro_prep)

    # Yield multiple outputs, one for each asset
    yield Output(df_1, output_name="sub_asset_1")
    yield Output(df_2, output_name="sub_asset_2")
by changing it with your solution above, I will be able to yield Output ? I need to pass sub_asset_1 and sub_asset as parameters of other assets Now that I give you clarifications about my asset, can you give me a working solution to deal with static metadata
s
Given the structure of your
multi_asset
and the need to pass
sub_asset_1
and
sub_asset_2
as parameters to other assets, you can indeed yield
Output
objects for each sub-asset. However, to include static metadata for each sub-asset, you would typically use
AssetMaterialization
events. Since the
multi_asset
decorator does not support static metadata directly in the same way that the
@asset
decorator does, you would need to handle metadata within the body of the
new_asset
function. However, as of my knowledge cutoff date, there is no built-in way to attach static metadata to the outputs of a
multi_asset
directly in the decorator. Here's how you might modify your
multi_asset
function to yield
AssetMaterialization
events with static metadata:
Copy code
python
from dagster import multi_asset, AssetOut, AssetMaterialization, MetadataValue, Output

@multi_asset(
    outs={
        "sub_asset_1": AssetOut(is_required=False),
        "sub_asset_2": AssetOut(is_required=False),
    },
    can_subset=True,
    description="goal of the asset",
    group_name="group_1",
    partitions_def=set_partitions_years,
)
def new_asset(context):
    year = int(context.partition_key)
    euro_prep = ClassPreparation(year=year, logger=context.log)
    df_1, df_2 = euro_prep.prepare_data()

    # Static metadata for each sub-asset
    static_metadata_1 = {
        "name": MetadataValue.text("goal of sub_asset_1"),
        "deps": MetadataValue.text("another_asset"),
    }
    static_metadata_2 = {
        "name": MetadataValue.text("goal of sub_asset_2"),
        "deps": MetadataValue.text("another_asset"),
    }

    # Yield AssetMaterialization events with static metadata for each sub-asset
    yield AssetMaterialization(
        asset_key="sub_asset_1",
        metadata=static_metadata_1
    )
    yield AssetMaterialization(
        asset_key="sub_asset_2",
        metadata=static_metadata_2
    )

    # Yield multiple outputs, one for each asset
    yield Output(df_1, output_name="sub_asset_1")
    yield Output(df_2, output_name="sub_asset_2")
In this example,
static_metadata_1
and
static_metadata_2
are dictionaries containing the static metadata for
sub_asset_1
and
sub_asset_2
, respectively. The
AssetMaterialization
events are yielded with this metadata before the
Output
objects. Please note that the
asset_key
provided to
AssetMaterialization
should match the keys of the assets you are materializing. If your assets have a different naming scheme or are part of a group, you may need to adjust the
asset_key
accordingly. Keep in mind that the code provided here is based on the information available as of my knowledge cutoff date, and Dagster's features may have evolved since then. If there have been updates to Dagster that allow for
a different approach to handling static metadata in
multi_asset
, you should refer to the latest Dagster documentation or community discussions for the most current methods.
g
@Scout Your solution works is not what I want! Using "AssetMaterialization" I can add info to show in the RUN_VIEW on the dagster UI, as AssetMaterialization events. What I want is instead to display in Dagit static Metadata in the Definition page of the asset before Materialize sub_asset_1 and sub_asset_2. I dont want dynamic metadata, but static metadata!
s
I apologize for the confusion. As of my knowledge cutoff date, the
multi_asset
decorator does not support static metadata in the same way that the
@asset
decorator does. Static metadata in Dagster is typically used to describe the asset and its role within an organization, and it is defined at asset creation time rather than dynamically during execution. Since the
multi_asset
decorator lacks a direct way to specify static metadata for each individual asset within it, one approach to include static metadata is to define individual assets with the
@asset
decorator that wrap the outputs of the
multi_asset
. This allows you to attach static metadata to each asset. Here's an example of how you might define wrapper assets with static metadata:
Copy code
python from dagster import asset, AssetKey  # Define the multi_asset without static metadata @multi_asset(     outs={         "sub_asset_1": AssetOut(is_required=False),         "sub_asset_2": AssetOut(is_required=False),     },     can_subset=True,     description="goal of the asset",     group_name="group_1",     partitions_def=set_partitions_years, ) def new_asset(context):     # Your existing logic to produce df_1 and df_2     ...  # Define wrapper assets with static metadata for sub_asset_1 and sub_asset_2 @asset(metadata={"name": "goal of sub_asset_1", "deps": "another_asset"}) def sub_asset_1_wrapper(sub_asset_1):     return sub_asset_1  @asset(metadata={"name": "goal of sub_asset_2", "deps": "another_asset"}) def sub_asset_2_wrapper(sub_asset_2):     return sub_asset_2
In this example,
sub_asset_1_wrapper
and
sub_asset_2_wrapper
are defined as individual assets with the
@asset
decorator, which allows you to specify static metadata. These wrapper assets take the outputs of the
new_asset
multi-asset as inputs, effectively passing through the data while allowing you to define static metadata. This approach allows you to define static metadata that will be displayed on the Definition page of each asset in Dagit before any materializations occur. Please note that this is a workaround given the current capabilities of the
multi_asset
decorator as of my knowledge cutoff date. If Dagster has introduced new features or changes that allow for a different approach to handling static metadata in
multi_asset
, you should consult the latest Dagster documentation or community discussions for updated guidance.
g
@Scout it not possible to modify the dagster source in multi_asset.py library to make it work?
s
As a developer relations assistant, I must advise against modifying the source code of the Dagster library directly to add features like static metadata to the
multi_asset
decorator. Modifying the source code can lead to several issues: 1. Maintainability: Your changes would be overwritten when you upgrade Dagster to a newer version, leading to a maintenance burden as you would need to reapply your changes with each upgrade. 2. Compatibility: Your modifications might not be compatible with other parts of the Dagster codebase, potentially leading to unexpected behavior or errors. 3. Support: If you encounter issues with your modified version of Dagster, it would be difficult for the Dagster community or the maintainers to provide support, as your version would differ from the official release. 4. Best Practices: It's generally best practice to use open-source software as provided by the maintainers, and to contribute changes back to the project via the official channels, such as pull requests. This way, the entire community can benefit from improvements, and changes can be properly vetted and tested. If you believe that static metadata for
multi_asset
would be a valuable feature for Dagster, the recommended approach would be to: - Open a Feature Request: You can open an issue on the Dagster GitHub repository to request this feature. This allows the maintainers and the community to discuss the potential addition and its implications. - Contribute to the Project: If you have the capability, you can contribute code to the Dagster project by forking the repository, implementing the feature, and submitting a pull request. Your contribution would then go through the standard review process. - Use Workarounds: Until such a feature is officially supported, you can use workarounds like the one I previously described, where you define wrapper assets with static metadata. Remember that contributing to open-source projects like Dagster is a collaborative effort, and changes should be made in a way that benefits the entire community and aligns with the project's goals and roadmap.
g
@Scout Tell me mora about multi_assets. Comment this code:
Copy code
@multi_asset(
    outs={
        "ingested_data_1": AssetOut(is_required=False),
        "ingested_data_2": AssetOut(is_required=False),
    })
def big_asset(context):
       do_something()

@asset(
ins={"ingested_data_1": AssetIn(key=AssetKey("ingested_data_1"))},
)
def other_asset(context, ingested_data_1):
       do_something_else()
s
The code you've provided demonstrates the use of the
@multi_asset
decorator to define a computation that produces multiple assets, as well as the use of the
@asset
decorator to define a downstream asset that depends on one of the outputs of the multi-asset. Here's a breakdown of the code: 1.
@multi_asset
decorator: This is used to define a computation that produces more than one asset. In this case,
big_asset
is a multi-asset that produces two assets:
ingested_data_1
and
ingested_data_2
. The
AssetOut
objects are used to define the outputs of the multi-asset, and the
is_required=False
parameter indicates that the production of these assets is optional. This means that the computation may choose not to yield an output for these assets under certain conditions.
Copy code
python @multi_asset(     outs={         "ingested_data_1": AssetOut(is_required=False),         "ingested_data_2": AssetOut(is_required=False),     }) def big_asset(context):     do_something()
2. `do_something()`: This is a placeholder for the actual logic that the
big_asset
multi-asset would perform. This logic would produce the data for
ingested_data_1
and
ingested_data_2
. The actual implementation would likely involve yielding
Output
objects corresponding to each asset. 3.
@asset
decorator: This is used to define a single asset called
other_asset
that depends on the
ingested_data_1
asset produced by the
big_asset
multi-asset. The
ins
parameter is used to define the inputs to the asset, and
AssetIn
is used to specify the dependency on
ingested_data_1
by providing its
AssetKey
.
Copy code
python @asset(     ins={"ingested_data_1": AssetIn(key=AssetKey("ingested_data_1"))}, ) def other_asset(context, ingested_data_1):     do_something_else()
4. `do_something_else()`: This is another placeholder for the actual logic that the
other_asset
would perform, using the data from
ingested_data_1
as input. In summary, this code snippet defines a multi-asset that produces two assets and a downstream asset that depends on one of those assets. The
@multi_asset
decorator is a powerful feature in Dagster that allows you to encapsulate complex computations that produce multiple assets, while the
@asset
decorator allows you to define downstream assets that can depend on the outputs of multi-assets or other single assets. This composability is a key feature of Dagster's asset-based approach to data orchestration.
g
@Scout I have noticed that if I remove the ins={...} attribute attached to the asset decorator of other_asset the materialization still works! why? because the multi_asset yield the Output???
s
In Dagster, when you define an asset using the
@asset
decorator, the system can infer dependencies based on the function's parameters. If you have an asset function that takes an input parameter with the same name as an existing asset, Dagster will automatically infer a dependency between the two assets. In the case of your
other_asset
, if you remove the
ins={...}
attribute and the function signature still includes a parameter named
ingested_data_1
, Dagster will infer that
other_asset
depends on an asset named
ingested_data_1
. This is because Dagster uses the parameter names to match with the corresponding asset names. Here's an example:
Copy code
python # This asset depends on 'ingested_data_1' because of the parameter name @asset def other_asset(ingested_data_1):     do_something_else()
In this example, even without explicitly specifying the
ins
attribute, Dagster understands that
other_asset
depends on the asset
ingested_data_1
because of the parameter name. When the
big_asset
multi-asset yields outputs using the
Output
object, it is effectively materializing the assets
ingested_data_1
and
ingested_data_2
. If
other_asset
is defined in the same repository and has a parameter that matches one of these asset names, Dagster will automatically wire up the dependencies based on the parameter names. This implicit dependency resolution is a convenient feature that reduces boilerplate code when defining assets. However, if you need to reference an asset with a different name or from a different repository, or if you want to make the dependencies more explicit, you can use the
ins
attribute with
AssetIn
and
AssetKey
to specify the dependencies explicitly.
g
@Scout so what is the purpose of "ins" attribute if it is useless?? Normally in Dagster it is not necessaty to import the asset methods from other modules to pass them to other dagster assets. Dagster will handle the dependency retrieving the asset name.
s
The
ins
attribute in the
@asset
decorator is not useless; it serves several important purposes in Dagster's asset model: 1. Explicit Dependencies: While Dagster can infer dependencies based on parameter names, the
ins
attribute allows you to explicitly define dependencies. This can improve code readability and clarity, making it easier for other developers to understand the relationships between assets. 2. Renaming Inputs: If the name of the input parameter in your asset function does not match the name of the upstream asset, or if you prefer to use a different name for the parameter, you can use the
ins
attribute to map the asset key to the parameter name. 3. Cross-Repository Dependencies: If you have assets spread across multiple repositories or modules, you may need to use the
ins
attribute to specify dependencies on assets that are not directly importable or whose names do not match the parameter names. 4. Asset Selection: The
ins
attribute can be used in conjunction with
AssetKey
to select specific assets, especially when dealing with assets that have hierarchical keys or when selecting assets from a group. 5. Advanced Use Cases: The
ins
attribute allows for more advanced configurations, such as specifying different loading behavior with custom IO managers or defining dependencies on partitioned assets. Here's an example of using the
ins
attribute to explicitly define a dependency and rename the input parameter:
Copy code
python from dagster import asset, AssetIn, AssetKey  # Explicitly define a dependency on 'ingested_data_1' with a different parameter name @asset(     ins={"data_input": AssetIn(key=AssetKey("ingested_data_1"))}, ) def other_asset(data_input):     do_something_else(data_input)
In this example, the
other_asset
function takes a parameter named
data_input
, which does not match the name of the upstream asset
ingested_data_1
. By using the
ins
attribute, we explicitly define the dependency and map the
ingested_data_1
asset to the
data_input
parameter. In summary, while Dagster can infer dependencies based on parameter names, the
ins
attribute provides a way to explicitly define and configure dependencies, which is essential for more complex asset setups and when you want to make your asset definitions more explicit and maintainable.
g
@Scout I am exactly referring to the scenario of answer 3. The assets are defined in other modules. in general in my dagster project i dont have to import other asset function from another module if I pass the asset name as parameter to other assets!!! Cross-Module Dependencies. !! maybe the things change with multiple repositories that shares assets
s
I apologize for any confusion. In Dagster, when assets are defined in different modules within the same repository, you do not need to import the asset functions directly to establish dependencies between them. Instead, you can rely on Dagster's ability to infer dependencies based on the names of the input parameters of your asset functions. When you define an asset function and include a parameter with the same name as an existing asset, Dagster will automatically infer that there is a dependency on that asset. This is a feature of Dagster's asset-aware execution engine, which understands the relationships between assets based on their names and the structure of your code. Here's an example to illustrate this:
Copy code
python # In module_a.py from dagster import asset  @asset def ingested_data_1():     # Logic to produce ingested_data_1     ...  # In module_b.py from dagster import asset  @asset def other_asset(ingested_data_1):     # Logic for other_asset that depends on ingested_data_1     ...
In this example,
other_asset
in
module_b.py
has a parameter named
ingested_data_1
, which matches the name of the asset defined in
module_a.py
. Dagster will infer that
other_asset
depends on
ingested_data_1
without the need for explicit imports or the use of the
ins
attribute. The
ins
attribute becomes particularly useful in scenarios where the parameter name does not match the asset name, or when dealing with more complex scenarios such as cross-repository dependencies, partitioned assets, or when you want to make the dependencies explicit for clarity. In the context of multiple repositories that share assets, you would typically use Dagster's
SourceAsset
or similar mechanisms to reference assets across repository boundaries. This is because assets in one repository may not be directly aware of assets in another repository without some form of explicit reference. In summary, within a single repository, Dagster's asset model allows for dependencies to be inferred based on parameter names, simplifying the definition of relationships between assets across different modules. The
ins
attribute is an additional tool that can be used when you need more control over how dependencies are specified.
g
@Scout so in your example above :
Copy code
ins={"data_input": AssetIn(key=AssetKey("ingested_data_1"))},
)
in this case then I dont have to pass the input asset as parameter? Or is still necessary?
s
In the example I provided earlier, the
ins
attribute is used to explicitly define the dependency and map the asset key to a parameter with a different name:
Copy code
python @asset(     ins={"data_input": AssetIn(key=AssetKey("ingested_data_1"))}, ) def other_asset(data_input):     # Logic for other_asset that depends on ingested_data_1     ...
In this case, the
ins
attribute is mapping the asset with the key
ingested_data_1
to the parameter
data_input
. This means that within the body of the
other_asset
function, you should refer to the input data as
data_input
. If you omit the
ins
attribute and the parameter name matches the name of an existing asset, Dagster will automatically infer the dependency:
Copy code
python @asset def other_asset(ingested_data_1):     # Logic for other_asset that depends on ingested_data_1     ...
In this case, the parameter
ingested_data_1
matches the name of the asset, so Dagster infers that
other_asset
depends on the asset
ingested_data_1
, and you do not need to use the
ins
attribute. To answer your question, even when using the
ins
attribute, you still need to pass the input asset as a parameter to the asset function. The
ins
attribute simply allows you to map the asset key to a parameter with a different name or to provide additional configuration for the input, such as a custom IO manager. The parameter itself is necessary for the asset function to receive and use the input data within its logic.
g
@Scout Here is my asset definition:
Copy code
@asset(
    config_schema={
        "dataset_destination": Field(str, description="The destination", default_value='intended'),
        "dataset_type": Field(str, description="The type", default_value='businesses'),
    },
    group_name="Rawdata_Ingestion_Anbsc",
    partitions_def=set_partitions_years,
    required_resource_keys={
        'partition_year_resource',
        'dataset_resource',
    },
)
def ingested_data_1(context):
    """
    Asset method that ingests conf_dest data for intended businesses case.
    :param context: DagsterContext
    :return:
    """
    # Retrieve the year from the partition in use
    year = int(context.partition_key)
    <http://context.log.info|context.log.info>(f'Ingesting data from ANBSC for the year: {year}')
    <http://context.log.info|context.log.info>("Calling ingest_conf_dest method")

    # Get type and destination from config_schema
    data_destination = context.op_config["dataset_destination"]
    data_type = context.op_config["dataset_type"]
    # Get type and destination from resource
    dataset_destination = context.resources.dataset_resource['dataset_destination']
    dataset_type = context.resources.dataset_resource['dataset_type']

    ingested_object = ClassIngestion(year=year, logger=context.log)
    df = ingested_object.ingest_data_method(data_type, data_destination)
    # Get dynamic metadata and add them to the context
    context.add_output_metadata(get_asset_metadata(df, ingested_object))
the dataset_destination and dataset_type are not used since the destination and type that I use are coming from the context_schema, but I would like to have also the possibility to set them from an external resources. How to add a sort of if in my body function that can get a variable sometimes from the context, sometimes from the resource? give me some options ...maybe controlling that the resource is set ?
s
To achieve the flexibility of getting configuration values either from the asset's
config_schema
or from an external resource, you can implement conditional logic within the asset function to determine the source of these values. You can check if the resource is set and use its values, or fall back to the values from the
config_schema
if the resource is not provided or does not contain the expected keys. Here's an example of how you might modify the
ingested_data_1
asset function to incorporate this conditional logic:
Copy code
python
from dagster import asset, Field

@asset(
    config_schema={
        "dataset_destination": Field(str, description="The destination", default_value='intended'),
        "dataset_type": Field(str, description="The type", default_value='businesses'),
    },
    group_name="Rawdata_Ingestion_Anbsc",
    partitions_def=set_partitions_years,
    required_resource_keys={
        'partition_year_resource',
        'dataset_resource',
    },
)
def ingested_data_1(context):
    """
    Asset method that ingests conf_dest data for intended businesses case.
    :param context: DagsterContext
    :return:
    """
    # Retrieve the year from the partition in use
    year = int(context.partition_key)
    <http://context.log.info|context.log.info>(f'Ingesting data from ANBSC for the year: {year}')
    <http://context.log.info|context.log.info>("Calling ingest_conf_dest method")

    # Attempt to get type and destination from resource if available
    dataset_resource = context.resources.get('dataset_resource')
    if dataset_resource and 'dataset_destination' in dataset_resource and 'dataset_type' in dataset_resource:
        data_destination = dataset_resource['dataset_destination']
        data_type = dataset_resource['dataset_type']
    else:
        # Fall back to config_schema values if resource is not set or does not contain the keys
        data_destination = context.op_config["dataset_destination"]
        data_type = context.op_config["dataset_type"]

    ingested_object = ClassIngestion(year=year, logger=context.log)
    df = ingested_object.ingest_data_method(data_type, data_destination)
    # Get dynamic metadata and add them to the context
    context.add_output_metadata(get_asset_metadata(df, ingested_object))
In this modified function, we first attempt to retrieve the
dataset_destination
and
dataset_type
from the
dataset_resource
. If the resource is available and contains the necessary keys, we use the values from the resource. Otherwise, we fall back to the values provided in the
config_schema
. This approach allows you to have a primary source of configuration (in this case, the resource) and a secondary source (the
config_schema
) that acts as a default or fallback. The
context.resources.get('dataset_resource')
method is used to safely access the re
source, and the presence of the keys is checked before using the values. Please note that the
required_resource_keys
set indicates that these resources are expected to be provided. If you want to make the resources optional, you should handle the case where the resources might not be provided, as shown in the example above.
g
@Scout the problem is that the dataset_resource is always provided in the dagster Definition to make it work. So I will always enter in the body of the first if
Copy code
resources={
    "dataset_resource": {
        "dataset_type": current_dataset_type,
        "dataset_destination": current_dataset_destination
    },
how to check instead if current_dateset_type is None for example
s
If the
dataset_resource
is always provided in the Dagster definitions, but you want to check if the specific keys within that resource (
dataset_type
and
dataset_destination
) have values or are set to
None
, you can modify the conditional logic to check for
None
values explicitly. Here's how you can adjust the asset function to check if
current_dataset_type
and
current_dataset_destination
are `None`:
Copy code
python from dagster import asset, Field  @asset(     config_schema={         "dataset_destination": Field(str, description="The destination", default_value='intended'),         "dataset_type": Field(str, description="The type", default_value='businesses'),     },     group_name="Rawdata_Ingestion_Anbsc",     partitions_def=set_partitions_years,     required_resource_keys={         'partition_year_resource',         'dataset_resource',     }, ) def ingested_data_1(context):     """     Asset method that ingests conf_dest data for intended businesses case.     :param context: DagsterContext     :return:     """     # Retrieve the year from the partition in use     year = int(context.partition_key)     <http://context.log.info|context.log.info>(f'Ingesting data from ANBSC for the year: {year}')     <http://context.log.info|context.log.info>("Calling ingest_conf_dest method")      # Get type and destination from resource if they are not None     dataset_resource = context.resources.dataset_resource     data_destination = dataset_resource['dataset_destination'] if dataset_resource.get('dataset_destination') is not None else context.op_config["dataset_destination"]     data_type = dataset_resource['dataset_type'] if dataset_resource.get('dataset_type') is not None else context.op_config["dataset_type"]      ingested_object = ClassIngestion(year=year, logger=context.log)     df = ingested_object.ingest_data_method(data_type, data_destination)     # Get dynamic metadata and add them to the context     context.add_output_metadata(get_asset_metadata(df, ingested_object))
In this updated function, we use the
.get()
method on the
dataset_resource
dictionary to safely access the
dataset_destination
and
dataset_type
keys. If the keys are present and their values are not
None
, we use those values. Otherwise, we fall back to the values provided in the
config_schema
. This approach allows you to prioritize the values from the resource if they are explicitly set, and use the
config_schema
values as defaults when the resource values are
None
.
g
@Scout What to write in the docstring of my asset method? I don t want to say that the function load a table in a database, since this is the docstring for the called load_data_in_db. Give me some ideas to describe the dagster function appropriately.
Copy code
@asset(
    group_name="OutputPreparation",
    config_schema={
        "data_dir": Field(str, description="The directory location", default_value=data_dir),
    },
)
def loaded_data(context):
    """
    Method that  
    :param context: DagsterContext
    :return:
    """
    <http://context.log.info|context.log.info>(f'Starting output generation')
    data_directory = context.op_config['data_dir']
    out = OutputPreparation(data_dir=data_directory, logger=context.log)
    df = out.load_data_in_db()
s
When writing a docstring for a Dagster asset method, it's important to describe the purpose of the asset, what it represents in the context of your data pipeline, and any important behavior or configuration that users of the asset should be aware of. The docstring should provide a clear and concise explanation that helps other developers understand the role of the asset within the system. Here's an example of how you might write the docstring for the
loaded_data
asset method:
Copy code
python @asset(     group_name="OutputPreparation",     config_schema={         "data_dir": Field(str, description="The directory location", default_value=data_dir),     }, ) def loaded_data(context):     """     Prepares and loads the output data into the database.      This asset is responsible for orchestrating the preparation of data and     loading it into the target database. It utilizes the OutputPreparation class     to perform the necessary steps for data processing and invokes the     load_data_in_db method to handle the actual data loading.      The data directory from which the data is sourced can be configured via the     asset's `data_dir` configuration field. This allows for flexibility in     specifying the location of the input data.      Args:         context (DagsterContext): The context object providing runtime information         such as configuration data and logging capabilities.      Returns:         The dataframe that has been loaded into the database. This return value         can be used as an input to downstream assets that depend on this data.     """     <http://context.log.info|context.log.info>('Starting output generation')     data_directory = context.op_config['data_dir']     out = OutputPreparation(data_dir=data_directory, logger=context.log)     df = out.load_data_in_db()     return df
In this docstring: - The first paragraph provides a high-level description of the asset's purpose. - The second paragraph goes into more detail about the behavior of the asset and how it uses the
OutputPreparation
class. - The third paragraph explains how the asset can be configured, specifically mentioning the
data_dir
configuration field. - The
Args
section describes the parameters that the function takes. - The
Returns
section explains what the function returns, which is particularly useful if the return value is intended to be used by other assets. Remember to replace the placeholder text with actual descriptions that match the behavior and purpose of your asset. The goal is to provide enough information to help others understand what the asset does without having to look at the implementation details.
g
@Scout no i just want the first initial phrase for my docstring regarding the purpose of the asset. give me 4 sentences
s
Certainly, here are four different sentences you could use to describe the purpose of your
loaded_data
asset in its docstring: 1. "This asset orchestrates the retrieval and transformation of data for downstream consumption." 2. "Responsible for loading processed data into a designated storage system, this asset acts as a critical step in the data pipeline." 3. "The
loaded_data
asset serves as the bridge between data preparation processes and their final storage destination." 4. "Facilitates the final stage of data processing by ensuring data is correctly formatted and stored for future analysis." Each of these sentences provides a high-level overview of the asset's role within the data pipeline, focusing on its responsibility for managing data at a particular stage. You can choose the one that best fits the context and purpose of your asset.
g
@Scout why this error? dagster._core.errors.DagsterInvalidSubsetError: AssetKey(s) {AssetKey(['ingested_data'])} were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the
Definitions
. everything seem ok
Copy code
from dagster import (
    Definitions,
    load_assets_from_modules,
    define_asset_job,
    multiprocess_executor
)

from workflows import data_preparation, output_preparation, rawdata_ingestion


all_assets = load_assets_from_modules(
    [
        data_preparation,
        output_preparation,
        rawdata_ingestion
    ]
)

pipeline_example_job = define_asset_job(
    name="pipeline_example_job",
    selection=[
        'ingested_data',
        'enriched_data',
        'encoded_data',
        'plotted_data',
        'stored_json_data',
        'loaded_data',
    ],
    executor_def=multiprocess_executor.configured({"max_concurrent": 4})
)


defs = Definitions(
    assets=all_assets,
    jobs=[pipeline_example_job]
)
s
The error message you're encountering indicates that there is a discrepancy between the assets you're trying to select in your
pipeline_example_job
and the assets that are actually available in the modules you're loading. The error suggests that the
AssetsDefinition
for
AssetKey(['ingested_data'])
is not found within the modules provided to
load_assets_from_modules
. Here are a few steps you can take to troubleshoot this issue: 1. Check Spelling and Naming: Ensure that the asset key
'ingested_data'
is spelled correctly and matches the name of the asset defined in one of the modules you're loading (
data_preparation
,
output_preparation
, or
rawdata_ingestion
). Asset keys are case-sensitive and must match exactly. 2. Verify Asset Definitions: Make sure that the asset named
'ingested_data'
is indeed defined as an asset in one of the specified modules. If it's defined in a different module or not defined at all, you'll need to correct this. 3. Check Module Imports: Ensure that the modules
data_preparation
,
output_preparation
, and
rawdata_ingestion
are correctly importing the assets. If these modules are supposed to import assets from other modules, verify that those imports are working as expected. 4. Inspect Asset Registration: Look inside the
data_preparation
,
output_preparation
, and
rawdata_ingestion
modules to confirm that the assets are being registered correctly. If you're using decorators like
@asset
, ensure they are applied correctly and that the functions are indeed being executed. 5. Review Asset Grouping: If you're using asset groups or asset factories, ensure that they are correctly configured and that the
ingested_data
asset is included in the group or generated by the factory. 6. Check for Typos in Module Names: Make sure that the module names
data_preparation
,
output_preparation
, and
rawdata_ingestion
are correct and that the modules contain the expected asset definitions. If you've gone through these steps and the issue persists, it may be helpful to isolate the problem by trying to load and select assets one at a time or by checking the asset definitions in an interactive Python session to ensure they are being registered as expected. Remember that the asset keys used in the
selection
parameter must correspond exactly to the asset names defined in your modules. If there's a mismatch, Dagster won't be able to find the assets, resulting in the error you're seeing.
g
@Scout I have this in my pyproject.toml file
Copy code
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "dagster-template_workflows"
What other insteresting and useful things I can add here ?
s
The
pyproject.toml
file is used to configure your project and its dependencies in a standardized way. For a Dagster project, you've already specified the build system and the Dagster module name. Here are some additional things you might consider adding to your
pyproject.toml
file: 1. Project Metadata: You can include metadata about your project, such as the name, version, description, authors, and more. This helps others understand what your project is about and how to use it.
Copy code
toml [tool.poetry] name = "dagster-template-workflows" version = "0.1.0" description = "A collection of Dagster workflows for data processing." authors = ["Your Name <you@example.com>"]
2. Dependency Management: If you're using
poetry
for dependency management, you can specify your project's dependencies here. This includes both your direct dependencies and development dependencies.
Copy code
toml [tool.poetry.dependencies] python = "^3.8" dagster = "^0.13.0" pandas = "^1.3.0"  [tool.poetry.dev-dependencies] pytest = "^6.2.5" black = "^21.7b0"
3. Scripts: You can define scripts that can be run with
poetry run
. These can be useful for running your Dagster jobs or other command-line tools.
Copy code
toml [tool.poetry.scripts] run-job = "dagster.cli:main"
4. Package Inclusion/Exclusion: Specify which files or directories should be included or excluded in your package distribution.
Copy code
toml [tool.poetry] include = ["README.md", "LICENSE"] exclude = ["tests/", "*.test.py"]
5. Environment Variables: You can set environment variables that are required for your project.
Copy code
toml [tool.dagster.environment] MY_ENV_VAR = "my_value"
6. Dagster-specific Configuration: If you have specific Dagster configurations, such as instance or executor settings, you can include them here.
Copy code
toml [tool.dagster.instance] local_artifact_storage = { base_dir = "/path/to/artifacts" } run_storage = { module = "dagster_postgres.run_storage", class = "PostgresRunStorage", config = { hostname = "localhost", db_name = "dagster" } } event_log_storage = { module = "dagster_postgres.event_log", class = "PostgresEventLogStorage", config = { hostname = "localhost", db_name = "dagster" } }
7. Tool-specific Configuration: If you're using other tools like
flake8
,
mypy
, or
isort
, you can include their configuration settings in the
pyproject.toml
file as well.
Copy code
toml [tool.flake8] max-line-length = 88 exclude = "__init__.py"  [tool.mypy] ignore_missing_imports = true
Remember that the specific entries you add to your
pyproject.toml
file will depend on the tools you're using and the needs of your project. Always refer to the documentation of the respective tools for the correct configuration options.
g
@Scout How to remove dagster_home to return to tmp storage?
s
The
DAGSTER_HOME
environment variable is used by Dagster to determine the location of the Dagster instance directory, where it stores information such as run history, event logs, and configuration. If you have set
DAGSTER_HOME
and want to revert back to using the default temporary storage, you can simply unset the
DAGSTER_HOME
environment variable. Here's how you can unset
DAGSTER_HOME
in different environments: - On Unix-like systems (Linux/macOS): You can unset the environment variable in your current shell session by running:
Copy code
bash   unset DAGSTER_HOME
To make this change persistent across sessions, you would need to remove the export statement from your shell configuration files (like
.bashrc
,
.bash_profile
,
.zshrc
, etc.). - On Windows: You can unset the environment variable for the current Command Prompt session by running:
Copy code
cmd   set DAGSTER_HOME=
To remove the environment variable permanently, you can do so through the System Properties dialog or by using the
setx
command with an empty value:
Copy code
cmd   setx DAGSTER_HOME ""
Note that changes made with
setx
will only take effect in new Command Prompt windows, not the current session. After unsetting
DAGSTER_HOME
, Dagster will revert to using the default temporary storage location for the instance directory. If you're running Dagster through Dagit or the Dagster CLI, you may need to restart these services for the change to take effect. Please be aware that by reverting to temporary storage, you may lose access to the data stored in the previous
DAGSTER_HOME
directory, such as pipeline run history and logs. If you need to retain this data, consider backing it up before making the change.