https://dagster.io/ logo
#ask-ai
Title
# ask-ai
j

Justin Bricker

02/14/2024, 8:02 PM
@Scout Can you pass an
observable_source_asset
to a downstream asset through the asset's function parameters?
s

Scout

02/14/2024, 8:02 PM
No, you cannot pass an
observable_source_asset
directly to a downstream asset through the asset's function parameters. In Dagster, assets can depend on other assets, but the dependencies are defined through the asset's metadata and not passed as function parameters. When you define a downstream asset, you can specify its dependencies on other assets, including
observable_source_asset
, by using the
deps
parameter or by using the asset's key as an argument name if you are using the implicit dependency via argument passing. However, the actual data from the upstream asset is not passed as a function parameter; instead, it is managed by Dagster's execution engine, which ensures that the upstream asset is materialized before the downstream asset is computed. Here's an example from the documentation showing how to define a dependency on a `SourceAsset`:
Copy code
python from dagster import AssetKey, SourceAsset, asset  my_source_asset = SourceAsset(key=AssetKey("a_source_asset"))  @asset(deps=[my_source_asset]) def my_derived_asset():     return execute_query("SELECT * from a_source_asset").as_list() + [4]
In this example,
my_derived_asset
depends on
my_source_asset
, but
my_source_asset
is not passed as a function parameter to
my_derived_asset
. Instead, the dependency is declared in the metadata, and Dagster takes care of loading the data from
my_source_asset
when executing
my_derived_asset
.
j

Justin Bricker

02/14/2024, 8:05 PM
@Scout if thats true, then how does the following example from the documentation work:
Copy code
from dagster import AssetKey, Definitions, SourceAsset, asset

code_location_1_source_asset = SourceAsset(key=AssetKey("code_location_1_asset"))


@asset
def code_location_2_asset(code_location_1_asset):
    return code_location_1_asset + 6


defs = Definitions(
    assets=[code_location_2_asset, code_location_1_source_asset],
)
s

Scout

02/14/2024, 8:05 PM
The example you've provided from the documentation demonstrates how to define an asset (
code_location_2_asset
) that depends on a
SourceAsset
(
code_location_1_source_asset
). In this case, the
SourceAsset
is defined with a specific
AssetKey
, and the downstream asset function
code_location_2_asset
takes an argument with the same name as the
AssetKey
of the
SourceAsset
. Here's how it works: 1.
code_location_1_source_asset
is defined as a
SourceAsset
with an
AssetKey
of
"code_location_1_asset"
. 2.
code_location_2_asset
is defined as an asset function that takes an argument named
code_location_1_asset
. Dagster uses the argument name to infer that
code_location_2_asset
has a dependency on an asset with the key
AssetKey("code_location_1_asset")
. 3. When
code_location_2_asset
is executed, Dagster will ensure that the data for the
code_location_1_asset
is available and will provide it to the function
code_location_2_asset
as the argument
code_location_1_asset
. The key point here is that the argument name in the function definition of
code_location_2_asset
matches the
AssetKey
of the
SourceAsset
. This is how Dagster knows to pass the data from
code_location_1_source_asset
to
code_location_2_asset
when it runs. The
Definitions
object is then used to bundle these assets together, which allows Dagster to understand the relationship between them and manage their execution accordingly. So, in this context, the
code_location_1_asset
argument in the
code_location_2_asset
function is not a direct function parameter in the traditional sense. Instead, it's a way to specify asset dependencies in Dagster, and the actual data passing is handled by Dagster's execution engine based on these dependencies. Software-defined assets | Dagster button Dagster Docs button dagster-dbt integration reference button Asset Key and Asset Groups impact on ass... button Thank you for your feedback!
j

Justin Bricker

02/14/2024, 10:33 PM
@Scout I want to use an observable_source_asset to represent the location of a file on gcs and to track its version based on the mtime and size of the file. I want to be able to pass this asset to a downstream asset to act on, but I don't want to have to specify the location of the source file in the downstream asset (therefore defining the same asset file location in two places)
s

Scout

02/14/2024, 10:33 PM
In Dagster, you can use an
observable_source_asset
to represent and track the version of a file on Google Cloud Storage (GCS) based on its modification time (mtime) and size. To achieve this, you would define an observation function that computes a
DataVersion
based on the file's metadata. To ensure that a downstream asset can act on this file without having to redefine the file's location, you can leverage Dagster's asset dependency system. When you define a downstream asset, you can specify its dependency on the
observable_source_asset
. This way, the location of the source file is only defined once, within the
observable_source_asset
, and the downstream asset implicitly knows about this location through the dependency. Here's an example of how you might set this up:
Copy code
python
from dagster import observable_source_asset, asset, IOManager, OutputContext, InputContext
from google.cloud import storage
import os

# Define an IOManager that knows how to handle GCS assets
class GCSIOManager(IOManager):
    def handle_output(self, context: OutputContext, obj):
        # Logic to upload data to GCS
        pass

    def load_input(self, context: InputContext):
        # Logic to download data from GCS
        pass

# Define an observable_source_asset that tracks a GCS file's version
@observable_source_asset(
    io_manager_key="gcs_io_manager",
    metadata={"gcs_bucket": "my-bucket", "gcs_key": "path/to/myfile"}
)
def gcs_file_asset(context):
    client = storage.Client()
    bucket = client.get_bucket(context.metadata["gcs_bucket"])
    blob = bucket.get_blob(context.metadata["gcs_key"])

    # Use the blob's metadata to compute a DataVersion
    mtime = blob.updated
    size = blob.size
    version = f"{mtime}-{size}"
    return DataVersion(version)

# Define a downstream asset that depends on the gcs_file_asset
@asset(required_resource_keys={"gcs_io_manager"}, non_argument_deps={gcs_file_asset})
def downstream_asset(context):
    # The GCSIOManager will provide the data from gcs_file_asset
    data = context.resources.gcs_io_manager.load_input(context.upstream_output(gcs_file_asset))
    # Perform operations on the data
    processed_data = do_something_with(data)
    return processed_data

# Define the resources for the IOManager
resources = {
    "gcs_io_manager": GCSIOManager,
}

# Define the repository to include the assets and resources
@repository
def my_repo():
    return [gcs_file_asset, downstream_asset], resources
In this example: -
gcs_file_asset
is an
observable_source_asset
that represents a file on GCS. It uses an
IOManager
to interact with GCS, and its observation function computes a
DataVersion
based on the file's mtime and size. -
downstream_asset
is a downstream asset that depends on
gcs_file_asset
. It uses
the
GCSIOManager
to load the data from GCS. The location of the file is not redefined in `downstream_asset`; it is implicitly known through the dependency on
gcs_file_asset
. - The
GCSIOManager
is a custom
IOManager
that knows how to handle GCS assets. It is responsible for uploading and downloading data to and from GCS. By setting up your assets and dependencies in this way, you avoid duplicating the file location definition and ensure that the downstream asset can act on the data from the
observable_source_asset
without needing to know the specifics of the file location.
5 Views