Justin Bricker
02/14/2024, 8:02 PMobservable_source_asset
to a downstream asset through the asset's function parameters?Scout
02/14/2024, 8:02 PMobservable_source_asset
directly to a downstream asset through the asset's function parameters. In Dagster, assets can depend on other assets, but the dependencies are defined through the asset's metadata and not passed as function parameters. When you define a downstream asset, you can specify its dependencies on other assets, including observable_source_asset
, by using the deps
parameter or by using the asset's key as an argument name if you are using the implicit dependency via argument passing. However, the actual data from the upstream asset is not passed as a function parameter; instead, it is managed by Dagster's execution engine, which ensures that the upstream asset is materialized before the downstream asset is computed. Here's an example from the documentation showing how to define a dependency on a `SourceAsset`: python from dagster import AssetKey, SourceAsset, asset my_source_asset = SourceAsset(key=AssetKey("a_source_asset")) @asset(deps=[my_source_asset]) def my_derived_asset(): return execute_query("SELECT * from a_source_asset").as_list() + [4]
In this example, my_derived_asset
depends on my_source_asset
, but my_source_asset
is not passed as a function parameter to my_derived_asset
. Instead, the dependency is declared in the metadata, and Dagster takes care of loading the data from my_source_asset
when executing my_derived_asset
.Justin Bricker
02/14/2024, 8:05 PMfrom dagster import AssetKey, Definitions, SourceAsset, asset
code_location_1_source_asset = SourceAsset(key=AssetKey("code_location_1_asset"))
@asset
def code_location_2_asset(code_location_1_asset):
return code_location_1_asset + 6
defs = Definitions(
assets=[code_location_2_asset, code_location_1_source_asset],
)
Scout
02/14/2024, 8:05 PMcode_location_2_asset
) that depends on a SourceAsset
(code_location_1_source_asset
). In this case, the SourceAsset
is defined with a specific AssetKey
, and the downstream asset function code_location_2_asset
takes an argument with the same name as the AssetKey
of the SourceAsset
.
Here's how it works:
1. code_location_1_source_asset
is defined as a SourceAsset
with an AssetKey
of "code_location_1_asset"
.
2. code_location_2_asset
is defined as an asset function that takes an argument named code_location_1_asset
. Dagster uses the argument name to infer that code_location_2_asset
has a dependency on an asset with the key AssetKey("code_location_1_asset")
.
3. When code_location_2_asset
is executed, Dagster will ensure that the data for the code_location_1_asset
is available and will provide it to the function code_location_2_asset
as the argument code_location_1_asset
.
The key point here is that the argument name in the function definition of code_location_2_asset
matches the AssetKey
of the SourceAsset
. This is how Dagster knows to pass the data from code_location_1_source_asset
to code_location_2_asset
when it runs.
The Definitions
object is then used to bundle these assets together, which allows Dagster to understand the relationship between them and manage their execution accordingly.
So, in this context, the code_location_1_asset
argument in the code_location_2_asset
function is not a direct function parameter in the traditional sense. Instead, it's a way to specify asset dependencies in Dagster, and the actual data passing is handled by Dagster's execution engine based on these dependencies. Software-defined assets | Dagster button Dagster Docs button dagster-dbt integration reference button Asset Key and Asset Groups impact on ass... button Thank you for your feedback!Justin Bricker
02/14/2024, 10:33 PMScout
02/14/2024, 10:33 PMobservable_source_asset
to represent and track the version of a file on Google Cloud Storage (GCS) based on its modification time (mtime) and size. To achieve this, you would define an observation function that computes a DataVersion
based on the file's metadata.
To ensure that a downstream asset can act on this file without having to redefine the file's location, you can leverage Dagster's asset dependency system. When you define a downstream asset, you can specify its dependency on the observable_source_asset
. This way, the location of the source file is only defined once, within the observable_source_asset
, and the downstream asset implicitly knows about this location through the dependency.
Here's an example of how you might set this up:
python
from dagster import observable_source_asset, asset, IOManager, OutputContext, InputContext
from google.cloud import storage
import os
# Define an IOManager that knows how to handle GCS assets
class GCSIOManager(IOManager):
def handle_output(self, context: OutputContext, obj):
# Logic to upload data to GCS
pass
def load_input(self, context: InputContext):
# Logic to download data from GCS
pass
# Define an observable_source_asset that tracks a GCS file's version
@observable_source_asset(
io_manager_key="gcs_io_manager",
metadata={"gcs_bucket": "my-bucket", "gcs_key": "path/to/myfile"}
)
def gcs_file_asset(context):
client = storage.Client()
bucket = client.get_bucket(context.metadata["gcs_bucket"])
blob = bucket.get_blob(context.metadata["gcs_key"])
# Use the blob's metadata to compute a DataVersion
mtime = blob.updated
size = blob.size
version = f"{mtime}-{size}"
return DataVersion(version)
# Define a downstream asset that depends on the gcs_file_asset
@asset(required_resource_keys={"gcs_io_manager"}, non_argument_deps={gcs_file_asset})
def downstream_asset(context):
# The GCSIOManager will provide the data from gcs_file_asset
data = context.resources.gcs_io_manager.load_input(context.upstream_output(gcs_file_asset))
# Perform operations on the data
processed_data = do_something_with(data)
return processed_data
# Define the resources for the IOManager
resources = {
"gcs_io_manager": GCSIOManager,
}
# Define the repository to include the assets and resources
@repository
def my_repo():
return [gcs_file_asset, downstream_asset], resources
In this example:
- gcs_file_asset
is an observable_source_asset
that represents a file on GCS. It uses an IOManager
to interact with GCS, and its observation function computes a DataVersion
based on the file's mtime and size.
- downstream_asset
is a downstream asset that depends on gcs_file_asset
. It usesScout
02/14/2024, 10:33 PMGCSIOManager
to load the data from GCS. The location of the file is not redefined in `downstream_asset`; it is implicitly known through the dependency on gcs_file_asset
. - The GCSIOManager
is a custom IOManager
that knows how to handle GCS assets. It is responsible for uploading and downloading data to and from GCS. By setting up your assets and dependencies in this way, you avoid duplicating the file location definition and ensure that the downstream asset can act on the data from the observable_source_asset
without needing to know the specifics of the file location.