https://dagster.io/ logo
#ask-ai
Title
# ask-ai
m

Manuel Fideles

03/07/2024, 11:42 AM
@Scout When testing assets with pytest, how can I test an asset that receives a context (AssetExecutionContext) as a param?
s

Scout

03/07/2024, 11:42 AM
When testing an asset that receives a
AssetExecutionContext
as a parameter, you can use the
build_asset_context
function provided by Dagster to construct a context for testing purposes. This function allows you to create a context that mimics the one that would be provided during actual asset execution. Here's an example from the documentation on how to use
build_asset_context
in a test:
Copy code
python from dagster import build_asset_context from your_project.assets import your_asset  def test_your_asset():     with build_asset_context() as context:         result = your_asset(context=context)         # Perform assertions on 'result'
In this example,
your_asset
is the asset you want to test, and it expects an
AssetExecutionContext
as a parameter. By using
build_asset_context
, you can create a context that is suitable for testing. You can then invoke your asset with this context and perform any necessary assertions on the result to verify that your asset behaves as expected. Remember to replace
your_project.assets
and
your_asset
with the actual module path and asset name from your project. Additionally, you can pass arguments to
build_asset_context
to customize the context with resources, config, and other dependencies your asset might need for the test.
m

Manuel Fideles

03/07/2024, 11:44 AM
@Scout What if I have a config_schema defined for the same asset, with default values? How can I test it in that case?
s

Scout

03/07/2024, 11:45 AM
If your asset has a
config_schema
defined with default values, you can still use the
build_asset_context
function to create a context for testing. When you use
build_asset_context
, you can pass in the configuration data that your asset expects. If you want to test with the default values, you can omit those values from the configuration data you pass in, and Dagster will use the defaults specified in your
config_schema
. Here's an example of how you might test an asset with a `config_schema`:
Copy code
python from dagster import build_asset_context from your_project.assets import your_asset  def test_your_asset_with_default_config():     # Assuming 'your_asset' has a config_schema with default values     with build_asset_context() as context:         result = your_asset(context=context)         # Perform assertions on 'result' using the default config values  def test_your_asset_with_custom_config():     custom_config = {         "ops": {             "your_asset": {                 "config": {                     "your_config_key": "your_custom_value",                     # ... other config values                 }             }         }     }     with build_asset_context(config=custom_config) as context:         result = your_asset(context=context)         # Perform assertions on 'result' using the custom config values
In the first test function,
test_your_asset_with_default_config
, we don't provide any specific configuration, so the asset will use the default values from its
config_schema
. In the second test function,
test_your_asset_with_custom_config
, we provide a custom configuration dictionary that matches the structure expected by the asset's
config_schema
. This allows you to test how your asset behaves with different configuration values. Remember to replace
"your_project.assets"
,
"your_asset"
,
"your_config_key"
, and
"your_custom_value"
with the actual values from your project.
m

Manuel Fideles

03/07/2024, 12:24 PM
@Scout How can i pass a resource to an asset to test that asset? i.e If I want to use S3Resource from dagster_aws.s3
s

Scout

03/07/2024, 12:24 PM
To pass a resource such as
S3Resource
from
dagster_aws.s3
to an asset for testing, you can use the
build_asset_context
function and provide the necessary resources in the
resources
parameter. Here's an example of how you might do this:
Copy code
python from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import your_asset  def test_your_asset_with_s3_resource():     # Configure the S3Resource with any required parameters     s3_resource = S3Resource(         s3_bucket="your_bucket_name",         # Include any additional parameters needed to configure the S3Resource     )      # Use the build_asset_context function to create a context with the S3Resource     with build_asset_context(resources={"s3": s3_resource}) as context:         result = your_asset(context=context)         # Perform assertions on 'result'
In this example, you create an instance of
S3Resource
with the necessary configuration for your S3 bucket and any other parameters required. Then, you pass a dictionary to
build_asset_context
with a key that matches the name of the resource your asset expects (in this case,
"s3"
). The value is the
s3_resource
instance you created. When you call your asset within the context manager, it will have access to the
S3Resource
as if it were being run within a Dagster pipeline. Remember to replace
"your_project.assets"
,
"your_asset"
, and
"your_bucket_name"
with the actual values from your project. Additionally, ensure that any required configuration for the
S3Resource
is provided when you instantiate it.
m

Manuel Fideles

03/07/2024, 12:27 PM
@Scout I'm using moto to mock a s3 bucket, how can I integrate that into testing?
s

Scout

03/07/2024, 12:27 PM
When using
moto
to mock an S3 bucket for testing with Dagster, you can integrate it by setting up the mock S3 within your test function and then using the
build_asset_context
function to provide the
S3Resource
to your asset. Here's a step-by-step example:
Copy code
python import boto3 from moto import mock_s3 from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import your_asset import pytest  @pytest.fixture def aws_credentials():     """Mocked AWS Credentials for moto."""     return {         "aws_access_key_id": "fake_access_key",         "aws_secret_access_key": "fake_secret_key",     }  @pytest.fixture def s3_bucket(aws_credentials):     with mock_s3():         # Set up the mocked S3 environment         s3 = boto3.client("s3", **aws_credentials)         bucket_name = "my-mocked-bucket"         s3.create_bucket(Bucket=bucket_name)          # Here you can add any setup code you need (e.g., uploading test files to the bucket)          yield bucket_name  # This will be used by your tests  def test_your_asset_with_mocked_s3(s3_bucket, aws_credentials):     # Create a mocked S3Resource using the bucket from the s3_bucket fixture     s3_resource = S3Resource(         s3_bucket=s3_bucket,         s3_session_kwargs=aws_credentials,     )      # Use the build_asset_context function to create a context with the mocked S3Resource     with build_asset_context(resources={"s3": s3_resource}) as context:         result = your_asset(context=context)         # Perform assertions on 'result'
In this example, we use
pytest
fixtures to set up the mocked AWS credentials and the mocked S3 bucket using
moto
. The
s3_bucket
fixture creates a new bucket in the mocked S3 environment and yields the bucket name, which is then used in the test function. The
test_your_asset_with_mocked_s3
function takes the
s3_bucket
and
aws_credentials
fixtures as arguments. It creates an
S3Resource
instance with the mocked bucket name and credentials. The
build_asset_context
function is then used to create a context with the
S3Resource
, which is passed to the asset under test. Remember to replace
"your_project.assets"
and
"your_asset"
with the actual values from your project. Also, ensure that any additional setup code required for your specific test case is added to the
s3_bucket
fixture, such as uploading files to the mocked bucket.
m

Manuel Fideles

03/07/2024, 12:52 PM
@Scout what if an asset depends on the materialization of another, and I'm trying to test the latter?
s

Scout

03/07/2024, 12:52 PM
When testing an asset that depends on the materialization of another asset, you can use the
build_assets_job
function to create a job that includes both assets and then execute that job within your test. This will ensure that the dependent asset is materialized before the asset under test. Here's an example of how you might set up a test for an asset that depends on another asset's materialization:
Copy code
python from dagster import build_assets_job, AssetGroup, AssetIn, asset from your_project.assets import upstream_asset, downstream_asset  # Define your assets, assuming upstream_asset is already defined @asset(ins={"upstream_asset": AssetIn()}) def downstream_asset(upstream_asset):     # Your logic that depends on upstream_asset     pass  # Create an AssetGroup and build a job that includes both assets assets = [upstream_asset, downstream_asset] asset_group = AssetGroup(assets) assets_job = build_assets_job("test_assets_job", assets)  def test_downstream_asset():     result = assets_job.execute_in_process()     assert result.success     # You can add more assertions here to validate the downstream_asset's materialization
In this example,
upstream_asset
and
downstream_asset
are the two assets you're working with. The
downstream_asset
is defined with an input that depends on the
upstream_asset
. We then create an
AssetGroup
with both assets and use
build_assets_job
to create a job named
"test_assets_job"
. The
test_downstream_asset
function executes the job in process using
execute_in_process()
. This will run both assets in the correct order, ensuring that
upstream_asset
is materialized before
downstream_asset
. You can then assert that the job was successful and perform additional checks as needed. Remember to replace
"your_project.assets"
,
upstream_asset
, and
downstream_asset
with the actual values from your project. If you need to pass specific configurations or resources to the assets for the test, you can do so by providing the appropriate arguments to
execute_in_process()
.
m

Manuel Fideles

03/07/2024, 12:58 PM
@Scout how can I pass an asset dependency to a an asset that needs it in a test?
s

Scout

03/07/2024, 12:58 PM
To pass an asset dependency to an asset that needs it in a test, you can use the
build_asset_context
function to create a context for your asset, and then manually invoke the asset function with the required dependencies. Here's a step-by-step guide on how to do this: 1. Manually create or mock the data that would be produced by the upstream asset. 2. Use the
build_asset_context
function to create a context for your downstream asset. 3. Call the downstream asset function directly, passing in the mocked upstream data and the context. Here's an example of how you might write a test for a downstream asset that depends on an upstream asset:
Copy code
python from dagster import build_asset_context, AssetMaterialization, AssetKey from your_project.assets import upstream_asset, downstream_asset  def test_downstream_asset():     # Step 1: Manually create or mock the data for the upstream asset     upstream_data = create_upstream_data()  # Replace with your data creation or mocking logic      # Step 2: Use build_asset_context to create a context for the downstream asset     with build_asset_context() as context:         # Optionally, you can mock the materialization event for the upstream asset         context.add_output(             AssetMaterialization(asset_key=AssetKey("upstream_asset")),             upstream_data         )          # Step 3: Call the downstream asset function directly with the mocked data and context         result = downstream_asset(upstream_data, context=context)          # Perform assertions on 'result'         assert result == expected_result  # Replace with your expected result
In this example,
upstream_asset
and
downstream_asset
are your asset functions. The
create_upstream_data
function is a placeholder for your logic to create or mock the data that
upstream_asset
would produce. The
context.add_output
line is optional and can be used to simulate the materialization of the upstream asset if your downstream asset logic requires it. The
test_downstream_asset
function creates a context for testing, mocks the upstream data, and then calls the
downstream_asset
function with the mocked data and context. Finally, it asserts that the result of the downstream asset is as expected. Remember to replace
"your_project.assets"
,
upstream_asset
,
downstream_asset
,
create_upstream_data
, and
expected_result
with the actual values and functions from your project.
m

Manuel Fideles

03/07/2024, 1:04 PM
@Scout how can I, as you said in the last answer
# Optionally, you can mock the materialization event for the upstream asset
?
s

Scout

03/07/2024, 1:04 PM
My apologies for any confusion. The
context.add_output
line mentioned in the previous response is not accurate, as there is no
add_output
method on the
context
object in Dagster. Instead, you would typically simulate the materialization of an upstream asset by ensuring that the data it would produce is available to the downstream asset during testing. To mock the materialization event for the upstream asset, you can create the data that the upstream asset would produce and then pass it directly to the downstream asset function in your test. Here's how you can do it:
Copy code
python
from dagster import build_asset_context
from your_project.assets import downstream_asset

def test_downstream_asset():
    # Manually create or mock the data that would be produced by the upstream asset
    upstream_data = create_upstream_data()  # Replace with your data creation or mocking logic

    # Use build_asset_context to create a context for the downstream asset
    with build_asset_context() as context:
        # Call the downstream asset function directly, passing in the mocked upstream data
        result = downstream_asset(upstream_data, context=context)

        # Perform assertions on 'result'
        assert result == expected_result  # Replace with your expected result
In this example,
create_upstream_data
is a function that you would define to create or mock the data that the upstream asset would normally produce. The
downstream_asset
function is then called with this data as if it were the output of the upstream asset. If your downstream asset relies on the
AssetMaterialization
event of the upstream asset, you would typically not need to mock the event itself in a unit test. Instead, you would focus on providing the necessary data to the downstream asset function and testing its behavior given that data. However, if you do need to simulate the presence of an
AssetMaterialization
event (for example, if your logic checks for the existence of materializations), you could use the
instance
object available in the context to record an asset materialization event. Here's an example of how you might do that: ```python from dagster import build_asset_context, AssetMaterialization, AssetKey def test_downstream_asset_with_materialization(instance): # Manually create or mock the data that would be produced by the upstream asset upstream_data = create_upstream_data() # Replace with your data creation or mocking logic # Record an asset materialization event for the upstream asset instance.handle_new_materialization( AssetMaterialization(asset_key=AssetKey("upstream_asset")), run_id="example_run_id" ) # Use build_asset_context to create a context for the downstream asset with build_asset_context(instanc
e=instance) as context: # Call the downstream asset function directly, passing in the mocked upstream data result = downstream_asset(upstream_data, context=context) # Perform assertions on 'result' assert result == expected_result # Replace with your expected result ``` In this example,
instance
is a
DagsterInstance
object that you would need to provide to your test, which can be a real or a test instance. The
handle_new_materialization
method is used to record an asset materialization event. Note that this is a more advanced use case and typically not necessary for most unit testing scenarios. Please replace
create_upstream_data
,
downstream_asset
, and
expected_result
with the actual functions and values from your project.
m

Manuel Fideles

03/07/2024, 1:10 PM
@Scout How do I pass downstream data to a an asset that needs testing, if that asset has a context being passed to it? I'm currently calling the following code:
Copy code
def test_manifest_keys(s3_bucket: str, aws_credentials: None):
    mock_config = {"account": ACCOUNT, "src_dir": "test/src", "dest_dir": "test/dest"}
    s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)
    with build_asset_context(
        asset_config={"account", ACCOUNT}, resources={"s3": s3_resource}
    ) as context:
        keys = manifest_keys(mock_config, context=context)
        assert isinstance(keys, list)
        assert len(keys) > 0
But I get the following error:
dagster._core.errors.DagsterInvalidInvocationError: Decorated function 'manifest_keys' has context argument, but no context was provided when invoking.
s

Scout

03/07/2024, 1:12 PM
The error message you're seeing indicates that the
manifest_keys
function is expecting a context argument, but it was not provided correctly when invoking the function. This can happen if the
manifest_keys
function is decorated with
@asset
or similar Dagster decorators that expect a context to be passed in. When using
build_asset_context
, you should pass the context object directly to the asset function. However, in your code snippet, you're using
asset_config={"account", ACCOUNT}
which seems incorrect because it should be a dictionary, not a set. Also, you're passing
asset_config
and
resources
to
build_asset_context
, but you're not using the context to pass the configuration to the
manifest_keys
function. Here's how you can modify your test to pass the context correctly:
Copy code
python from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import manifest_keys  def test_manifest_keys(s3_bucket: str, aws_credentials: None):     mock_config = {"account": ACCOUNT, "src_dir": "test/src", "dest_dir": "test/dest"}     s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)      # Correct the asset_config to be a dictionary     asset_config = {"account": ACCOUNT}      with build_asset_context(         resources={"s3": s3_resource},         asset_config=asset_config  # Pass the asset_config as a dictionary     ) as context:         # Pass the context and the mock_config to the asset function         keys = manifest_keys(context=context, mock_config=mock_config)         assert isinstance(keys, list)         assert len(keys) > 0
In this corrected version,
asset_config
is now a dictionary with the correct syntax. The
context
object created by
build_asset_context
is passed to the
manifest_keys
function along with the
mock_config
. Make sure that the
manifest_keys
function is expecting the
mock_config
as a separate parameter. If
manifest_keys
is supposed to extract the config from the context, you don't need to pass
mock_config
separately; instead, the function will access the config through the context object. Please replace
ACCOUNT
and
your_project.assets
with the actual values from your project. If
manifest_keys
is an asset, ensure that it is defined to accept a context parameter, and adjust the function call accordingly.
m

Manuel Fideles

03/07/2024, 2:31 PM
@Scout how can I test a graph asset with pytest?
s

Scout

03/07/2024, 2:31 PM
To test a graph asset with pytest, you can use the
build_assets_job
function to create a job that includes the graph asset and then execute that job within your test. This allows you to test the asset within the context of a job execution, which is similar to how it would run in a production environment. Here's a step-by-step guide on how to test a graph asset: 1. Import the necessary functions and your graph asset. 2. Use
build_assets_job
to create a job that includes your graph asset. 3. Execute the job in process using
execute_in_process
and perform assertions on the result. Here's an example of how you might write a test for a graph asset:
Copy code
python import pytest from dagster import build_assets_job, AssetGroup from your_project.graph_assets import your_graph_asset  # Create an AssetGroup and build a job that includes your graph asset assets = [your_graph_asset] asset_group = AssetGroup(assets) assets_job = build_assets_job("test_assets_job", assets)  def test_your_graph_asset():     result = assets_job.execute_in_process()     assert result.success     # You can add more assertions here to validate the output of your graph asset
In this example,
your_graph_asset
is the graph asset you want to test. We create an
AssetGroup
with the graph asset and use
build_assets_job
to create a job named
"test_assets_job"
. The
test_your_graph_asset
function executes the job in process using
execute_in_process()
. This will run the graph asset as part of the job execution. You can then assert that the job was successful and perform additional checks as needed to validate the output of your graph asset. Remember to replace
"your_project.graph_assets"
and
"your_graph_asset"
with the actual values from your project. If you need to pass specific configurations or resources to the graph asset for the test, you can do so by providing the appropriate arguments to
execute_in_process()
.
m

Manuel Fideles

03/07/2024, 2:59 PM
@Scout I don't think
build_assets_job
exists anymore
s

Scout

03/07/2024, 2:59 PM
You are correct; my previous response contained outdated information. The
build_assets_job
function is not part of the current Dagster API. Instead, you should use the
build_job
function to construct a job for testing your graph asset. Here's how you can test a graph asset using the
build_job
function: 1. Define your assets using the
@asset
decorator. 2. Create an
AssetGroup
with the assets you want to include in your test. 3. Use the
build_job
method on the
AssetGroup
to create a job for testing. 4. Execute the job in process using
execute_in_process
and perform assertions on the result. Here's an example:
Copy code
python import pytest from dagster import asset, AssetGroup, AssetOut, AssetsDefinition, Out from your_project.graph_assets import your_graph_asset  # Define your assets @asset def my_asset():     # Your asset logic here     return "my_asset_data"  # Create an AssetGroup with your assets asset_group = AssetGroup(assets=[my_asset])  # Build a job from the AssetGroup test_job = asset_group.build_job(name="test_job")  def test_my_asset():     result = test_job.execute_in_process()     assert result.success     # Add more assertions here to validate the behavior of your asset
In this example,
my_asset
is a simple asset defined for demonstration purposes. You would replace this with your actual graph asset and any other assets it depends on. The
AssetGroup
is created with the list of assets, and then
build_job
is used to create a job named
"test_job"
. The
test_my_asset
function executes the job in process using
execute_in_process()
. This will run the assets as part of the job execution. You can then assert that the job was successful and perform additional checks as needed to validate the behavior of your assets. Remember to replace
"your_project.graph_assets"
and
"your_graph_asset"
with the actual values from your project. If you need to pass specific configurations or resources to the assets for the test, you can do so by providing the appropriate arguments to
execute_in_process()
.
m

Manuel Fideles

03/07/2024, 3:00 PM
@Scout I don't think AssetGroup exists either
s

Scout

03/07/2024, 3:00 PM
I apologize for the confusion. To test a graph in Dagster, you would typically define a job that includes the graph and then execute that job within your test using the
execute_in_process
method. Here's how you can do it: 1. Define your graph using the
@graph
decorator. 2. Create a job from the graph using the
to_job
method. 3. Execute the job in process using
execute_in_process
and perform assertions on the result. Here's an example of how you might write a test for a graph:
Copy code
python import pytest from dagster import graph, op, Out, Output from dagster.core.test_utils import instance_for_test  @op def my_upstream_op():     # Logic for the upstream op     return "upstream_data"  @op def my_downstream_op(data):     # Logic for the downstream op that depends on the upstream op     return f"processed_{data}"  @graph def my_graph():     my_downstream_op(my_upstream_op())  # Create a job from the graph my_job = my_graph.to_job()  def test_my_graph():     with instance_for_test() as instance:         result = my_job.execute_in_process(instance=instance)         assert result.success         # Add more assertions here to validate the behavior of your graph
In this example,
my_upstream_op
and
my_downstream_op
are operations (ops) that make up the graph
my_graph
. The
my_graph.to_job()
method is used to create a job from the graph. The
test_my_graph
function uses
instance_for_test
to create a temporary Dagster instance for testing. The job is then executed in process using
execute_in_process()
, and assertions are made to ensure the job ran successfully and the graph behaved as expected. Remember to replace the example ops and graph with your actual Dagster graph and ops. If your graph requires specific configurations or resources, you can pass them to
execute_in_process
using the
run_config
and
resources
parameters, respectively.
m

Manuel Fideles

03/07/2024, 3:03 PM
@Scout What if my graph has an input value?
s

Scout

03/07/2024, 3:03 PM
If your graph has an input value, you can provide that input when you execute the job created from the graph using the
execute_in_process
method. You will need to pass the inputs as part of the
run_config
argument. Here's an example of how you might write a test for a graph that takes an input:
Copy code
python import pytest from dagster import graph, op, In, job  @op def my_op(input_value: str):     # Logic that uses the input_value     return f"processed_{input_value}"  @graph def my_graph(input_value: str):     return my_op(input_value)  # Create a job from the graph my_job = my_graph.to_job()  def test_my_graph():     input_value = "test_input"     run_config = {         "ops": {             "my_op": {                 "inputs": {                     "input_value": input_value,                 }             }         }     }     result = my_job.execute_in_process(run_config=run_config)     assert result.success     # Add more assertions here to validate the behavior of your graph
In this example,
my_op
is an operation that takes an input value, and
my_graph
is a graph that also takes an input value and passes it to
my_op
. The
my_graph.to_job()
method creates a job from the graph. The
test_my_graph
function defines an
input_value
that you want to test with. The
run_config
dictionary is structured to provide the input value to the
my_op
operation within the graph. The job is then executed in process using
execute_in_process()
, with the
run_config
passed as an argument. Finally, assertions are made to ensure the job ran successfully and the graph processed the input as expected. Remember to replace the example operation, graph, and input value with your actual Dagster graph, operations, and test inputs. If your graph requires additional configurations or resources, you can include them in the
run_config
and pass them to
execute_in_process
as needed.
m

Manuel Fideles

03/07/2024, 3:11 PM
@Scout I have the following test
Copy code
def test_pdf_data(s3_bucket: str, aws_credentials: None):
    mock_manifest_keys: list[str] = [
        "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",
        "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",
        "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",
    ]
    s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)

    result = materialize(
        [pdf_data],
        resources={"s3": s3_resource},
        run_config=RunConfig(ops={"pipeline_config": {"config": {"account": ACCOUNT}}}),
    )
    assert result.sucess
where
pdf_data
is decorated with
@graph_asset
. How can I provide
mock_manifest_keys
variable to the
materialize
so that
pdf_data
has access to it?
s

Scout

03/07/2024, 3:11 PM
To provide
mock_manifest_keys
to the
pdf_data
graph asset during materialization, you can use the
run_config
parameter to specify the inputs for the asset. However, the
materialize
function does not directly accept a
run_config
parameter. Instead, you should construct a
run_config
dictionary that specifies the inputs for the assets within the graph. Here's how you can modify your test to pass
mock_manifest_keys
as an input to the
pdf_data
graph asset:
Copy code
python from dagster import materialize  def test_pdf_data(s3_bucket: str, aws_credentials: None):     mock_manifest_keys: list[str] = [         "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",     ]     s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)      # Construct the run_config dictionary     run_config = {         "ops": {             "pdf_data": {  # This should match the name of the op within your graph asset                 "inputs": {                     "manifest_keys": mock_manifest_keys,  # The input name should match the expected input in your graph asset                 }             }         },         "resources": {             "s3": {                 "config": {                     "s3_bucket": s3_bucket,                     # Include any other necessary S3 configuration here                 }             }         }     }      # Materialize the graph asset with the provided run_config     result = materialize(         [pdf_data],         resources={"s3": s3_resource},         run_config=run_config,     )     assert result.success
In this example, the
run_config
dictionary is structured to provide the
mock_manifest_keys
list as an input to the
pdf_data
graph asset. The key
"pdf_data"
under
"ops"
in the
run_config
should match the name of the op within your graph asset that expects the
manifest_keys
input. Please note that the
materialize
function does not accept a
RunConfig
object directly. Instead, you should pass a dictionary as shown above. Also, ensure that the input name in the
run_config
("manifest_keys" in this case) matches the name of the input parameter in your
pdf_data
graph asset definition. Remember to replace
"pdf_data"
with the actual name of the op within your graph asset if it's different. Additionally, adjust the S3 configuration as needed for your specific test setup.
m

Manuel Fideles

03/07/2024, 3:16 PM
@Scout I get the following error message
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["manifest_keys"]' for asset '["pdf_data"]' is not produced by any of the provided asset ops and is not one of the provided sources
s

Scout

03/07/2024, 3:16 PM
The error message you're encountering suggests that Dagster is expecting
manifest_keys
to be an asset, but it's not found among the provided assets or sources. When you're working with assets in Dagster, inputs to assets are typically other assets. However, if you need to provide a raw input value (like your
mock_manifest_keys
list) that isn't an asset, you should use a
SourceAsset
. To resolve this issue, you would define a
SourceAsset
for
manifest_keys
and include it in your asset's dependencies. Here's how you can modify your test to include
mock_manifest_keys
as a `SourceAsset`:
Copy code
python from dagster import SourceAsset, materialize, AssetGroup  # Define a SourceAsset for your mock_manifest_keys mock_manifest_keys_asset = SourceAsset(     key="manifest_keys",     metadata={"mock_data": mock_manifest_keys} )  # Create an AssetGroup including your pdf_data asset and the SourceAsset asset_group = AssetGroup(     assets=[pdf_data],     source_assets=[mock_manifest_keys_asset] )  # Materialize the assets in the asset_group result = materialize(     asset_group,     resources={"s3": s3_resource},     run_config={         "ops": {             "pdf_data": {                 "inputs": {                     "manifest_keys": mock_manifest_keys,                 }             }         },         "resources": {             "s3": {                 "config": {                     "s3_bucket": s3_bucket,                     # Include any other necessary S3 configuration here                 }             }         }     } )  assert result.success
In this example,
mock_manifest_keys_asset
is a
SourceAsset
that represents the
mock_manifest_keys
list. The
AssetGroup
is created with both your
pdf_data
asset and the
mock_manifest_keys_asset
as a source asset. When you call
materialize
, you pass the
asset_group
and the
run_config
that includes the
mock_manifest_keys
as an input to the
pdf_data
asset. Please ensure that the
pdf_data
asset is defined to accept
manifest_keys
as an input and that the
mock_manifest_keys
list is structured in the way your asset expects. Additionally, replace
"pdf_data"
with the actual name of your asset if it's different, and adjust the S3 configuration as needed for your specific test setup.
m

Manuel Fideles

03/07/2024, 3:21 PM
@Scout The
AssetGroup
isn't on dagster anymore
s

Scout

03/07/2024, 3:21 PM
I apologize for the confusion earlier. Since
AssetGroup
is no longer part of the Dagster API, you would use the
to_job
method on the graph to create a job for testing, and then provide the inputs directly when executing the job. Here's how you can modify your test to pass
mock_manifest_keys
as an input to the
pdf_data
graph asset:
Copy code
python from dagster import materialize  def test_pdf_data(s3_bucket: str, aws_credentials: None):     mock_manifest_keys: list[str] = [         "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",     ]     s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)      # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset     run_config = {         "ops": {             "pdf_data": {  # This should match the name of the op within your graph asset                 "inputs": {                     "manifest_keys": mock_manifest_keys,                 }             }         },         "resources": {             "s3": {                 "config": {                     "s3_bucket": s3_bucket,                     # Include any other necessary S3 configuration here                 }             }         }     }      # Materialize the pdf_data asset with the provided run_config     result = materialize(         [pdf_data.to_job(resource_defs={"s3": s3_resource})],         run_config=run_config,     )     assert result.success
In this example,
pdf_data
is assumed to be a graph asset that has been decorated with
@graph_asset
. The
to_job
method is used to create a job from the
pdf_data
asset, and the
s3_resource
is provided as a resource definition. The
run_config
dictionary is structured to provide the
mock_manifest_keys
list as an input to the
pdf_data
asset. The key
"pdf_data"
under
"ops"
in the
run_config
should match the name of the op within your graph asset that expects the
manifest_keys
input. Please replace
"pdf_data"
with the actual name of the op within your graph asset if it's different. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the
run_config
and pass them to
materialize
as needed.
m

Manuel Fideles

03/07/2024, 4:01 PM
@Scout I have the following @graph_asset definition
Copy code
@op
def pypdf_extract_text(
    context: OpExecutionContext, s3: S3Resource, manifest_keys: list[str]
) -> dict:
    """Ask pypdf to extract the text, check if we got something"""
    s3_client = s3.get_client()
    pdf_uris = set(manifest_keys)

    pdfs_data = {}
    threads = {}
    already_done = 0
    processed_by_pypdf = 0
    for pdf_uri in pdf_uris:
        parsed_pdf_uri: ParseResult = urlparse(pdf_uri)
        bucket = parsed_pdf_uri.netloc
        pdf_key = parsed_pdf_uri.path.lstrip("/")
        base_s3_dir, _ = os.path.split(pdf_key)
        response_json_key = os.path.join(base_s3_dir, "response.json")
        pdfs_data[pdf_uri] = {
            "bucket": bucket,
            "pdf_key": pdf_key,
            "base_s3_dir": base_s3_dir,
            "response_json_key": response_json_key,
            "already_done": False,
        }
        try:
            s3_client.head_object(
                Bucket=bucket,
                Key=response_json_key,
            )
            pdfs_data[pdf_uri]["already_done"] = True
            already_done += 1
            continue
        except ClientError as error:
            if error.response["ResponseMetadata"]["HTTPStatusCode"] != 404:
                raise error

        t = ThreadWithReturnValue(
            target=extract_pdf_text,
            args=(s3_client, pdfs_data[pdf_uri]),
        )
        threads[pdf_uri] = t
        t.start()

        # 100 per second seems good enough, s3 limit seems to be in thousands
        if len(threads) == 100:
            for t_pdf_uri, t in threads.items():
                texts = t.join()
                if len(", ".join(texts)) > 42:
                    pdfs_data[t_pdf_uri]["texts"] = texts
                    processed_by_pypdf += 1
            threads = {}
            time.sleep(1)

    for t_pdf_uri, t in threads.items():
        texts = t.join()
        if len(", ".join(texts)) > 42:
            pdfs_data[t_pdf_uri]["texts"] = texts
            processed_by_pypdf += 1

    context.add_output_metadata(
        metadata={
            "unique_pdf_uris": len(pdf_uris),
            "processed_by_pypdf": processed_by_pypdf,
            "already_done": already_done,
        }
    )

    return pdfs_data


@op
def start_textract(
    context: OpExecutionContext,
    pypdf_extract_text: dict,
) -> dict:
    """Ask Textract to OCR the files in S3, skip the ones with previous result or pypdf result"""
    config = BotoConfig(
        retries={
            "mode": "adaptive",  # let boto handle the retries with the latest and greatest
        }
    )
    textract_client = boto3.client("textract", region_name=AWS_REGION, config=config)

    pdfs_data = pypdf_extract_text
    threads = {}
    requested_textract_ocr = 0
    for pdf_uri, pdf_data in pdfs_data.items():
        if pdf_data["already_done"] is True:
            continue
        try:
            pdf_data["texts"]
            continue
        except KeyError:
            pass

        start_textract_thread(
            textract_client,
            threads,
            pdf_uri,
            pdf_data,
        )

        # limit of 10 per second should be safe, i can test and/or ask amazon for more
        if len(threads) == 10:
            for t_pdf_uri, t in threads.items():
                response = t.join()
                pdfs_data[t_pdf_uri]["textract_response_dir"] = os.path.join(
                    pdfs_data[t_pdf_uri]["base_s3_dir"], response["JobId"]
                )
                requested_textract_ocr += 1
            threads = {}
            time.sleep(1)

    for t_pdf_uri, t in threads.items():
        response = t.join()
        pdfs_data[t_pdf_uri]["textract_response_dir"] = os.path.join(
            pdfs_data[t_pdf_uri]["base_s3_dir"], response["JobId"]
        )
        requested_textract_ocr += 1

    context.add_output_metadata(
        metadata={"requested_textract_ocr": requested_textract_ocr}
    )

    return pdfs_data


@op
def save_results(
    context: OpExecutionContext,
    s3: S3Resource,
    start_textract: dict,
) -> dict:
    """Move Textract responses, or save pypdf responses, to response.jsons in pdfs dirs"""
    s3_client = s3.get_client()
    threads: list[Thread] = []
    pdfs_data = start_textract
    save_pypdf_response_tries = 0
    move_textract_response_tries = 0
    for _, pdf_data in pdfs_data.items():
        if pdf_data["already_done"] is True:
            continue

        try:
            pdf_data["texts"]
            text_channel = "pypdf"
        except KeyError:
            text_channel = "textract"

        if text_channel == "pypdf":
            t = Thread(
                target=save_pypdf_response,
                args=(s3_client, pdf_data),
            )
            save_pypdf_response_tries += 1
        elif text_channel == "textract":
            t = Thread(
                target=move_textract_response,
                args=(s3_client, pdf_data),
            )
            move_textract_response_tries += 1
        else:
            raise ValueError(f"Unknown text_channel: {text_channel}")

        threads.append(t)
        t.start()

        # 100 per second seems good enough, s3 limit seems to be in thousands
        if len(threads) == 100:
            for t in threads:
                t.join()
            threads = []
            time.sleep(1)

    for t in threads:
        t.join()

    context.add_output_metadata(
        metadata={
            "move_textract_response_tries": move_textract_response_tries,
            "save_pypdf_response_tries": save_pypdf_response_tries,
        }
    )

    return pdfs_data


@op
def note_texts(context: OpExecutionContext, s3: S3Resource, pdfs_data: dict) -> dict:
    """
    The texts extracted from the Textract'ed response.json files
    """
    s3_client = s3.get_client()
    threads = {}
    loaded_response_jsons = 0  # already done + textract
    missing_keys = []
    for uri, pdf_data in pdfs_data.items():
        try:
            pdf_data["texts"]  # noqa
            continue
        except KeyError:
            pass

        t = ThreadWithReturnValue(
            target=get_response_json_text,
            args=(s3_client, pdf_data),
        )
        threads[uri] = t
        t.start()

        # 100 per second seems good enough, s3 limit seems to be in thousands
        if len(threads) == 100:
            for t_uri, t in threads.items():
                texts = t.join()
                pdfs_data[t_uri]["texts"] = texts
                loaded_response_jsons += 1
            threads = {}
            time.sleep(1)

    for t_pdf_uri, t in threads.items():
        texts = t.join()  # noqa
        pdfs_data[t_pdf_uri]["texts"] = texts
        loaded_response_jsons += 1

    unique_missing_keys = list(set(missing_keys))

    context.add_output_metadata(
        metadata={
            "loaded_response_jsons": <http://MetadataValue.int|MetadataValue.int>(loaded_response_jsons),
            "# of missing_keys": <http://MetadataValue.int|MetadataValue.int>(len(unique_missing_keys)),
            "preview missing_keys": MetadataValue.json(unique_missing_keys[:5]),
            "preview (in dataframe format)": MetadataValue.json(tuple(choice(list(pdfs_data.items())))),
        }
    )

    return pdfs_data


@graph
def pdf_data_graph(manifest_keys: list[str]):
    return note_texts(save_results(start_textract(pypdf_extract_text(manifest_keys))))


@graph_asset
def pdf_data(manifest_keys: list[str]) -> dict:
    """
    The data extracted w/ Textract from the URIs present
    in the manifest.
    """
    return pdf_data_graph(manifest_keys)
What run_config should I use when testing with Pytest to ensure the
mock_manifest_keys
takes `manifest_keys`' place?
s

Scout

03/07/2024, 4:01 PM
To test your
@graph_asset
with Pytest and ensure that
mock_manifest_keys
is used in place of
manifest_keys
, you will need to provide a
run_config
that specifies the inputs for your asset. Since
pdf_data
is a graph asset, you will need to create a job from it and then execute that job with the appropriate
run_config
. Here's an example of how you might write a test for your
pdf_data
graph asset:
Copy code
python import pytest from dagster import build_op_context, build_init_resource_context from your_project.assets import pdf_data, pypdf_extract_text, start_textract, save_results, note_texts  @pytest.fixture def s3_resource(s3_bucket: str, aws_credentials: None):     return S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)  def test_pdf_data(s3_resource):     mock_manifest_keys: list[str] = [         "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",     ]      # Create a job from the graph asset     pdf_data_job = pdf_data.to_job(resource_defs={"s3": s3_resource})      # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset     run_config = {         "ops": {             "pdf_data_graph": {                 "inputs": {                     "manifest_keys": mock_manifest_keys,                 }             }         }     }      # Execute the job with the provided run_config     result = pdf_data_job.execute_in_process(run_config=run_config)     assert result.success     # Add more assertions here to validate the behavior of your graph asset
In this example,
pdf_data
is your graph asset decorated with
@graph_asset
. The
to_job
method is used to create a job from the
pdf_data
asset, and the
s3_resource
is provided as a resource definition. The
run_config
dictionary is structured to provide the
mock_manifest_keys
list as an input to the
pdf_data_graph
operation within the job. The key
"pdf_data_graph"
under
"ops"
in the
run_config
should match the name of the graph within your graph asset that expects the
manifest_keys
input. Please replace
"your_project.assets"
with the actual module path from your project. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the
run_config
and pass them to the job execution as needed.
m

Manuel Fideles

03/07/2024, 4:04 PM
@Scout i got the following error on the
pdf_data_job = <http://pdf_data.to|pdf_data.to>_job(resource_defs={"s3": s3_resource})
line of code:
AttributeError: 'AssetsDefinition' object has no attribute 'to_job'
s

Scout

03/07/2024, 4:04 PM
I apologize for the confusion. The error you're encountering indicates that
pdf_data
, which is decorated with
@graph_asset
, is being treated as an
AssetsDefinition
object, which does not have a
to_job
method. In Dagster,
@graph_asset
is used to define an asset that is composed of a graph of ops. To test this asset, you should materialize it directly rather than converting it to a job. You can do this by using the
materialize
function and providing the necessary
run_config
and resources. Here's how you can modify your test to materialize the
pdf_data
graph asset:
Copy code
python import pytest from dagster import materialize, AssetMaterialization, AssetKey from dagster_aws.s3 import S3Resource from your_project.assets import pdf_data  @pytest.fixture def s3_resource(s3_bucket: str, aws_credentials: None):     return S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)  def test_pdf_data(s3_bucket: str, aws_credentials: None):     mock_manifest_keys: list[str] = [         "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",         "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",     ]      # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset     run_config = {         "ops": {             "pdf_data": {  # This should match the name of the op within your graph asset                 "inputs": {                     "manifest_keys": mock_manifest_keys,                 }             }         },         "resources": {             "s3": {                 "config": {                     "s3_bucket": s3_bucket,                     # Include any other necessary S3 configuration here                 }             }         }     }      # Materialize the pdf_data asset with the provided run_config     result = materialize(         [pdf_data],         resources={"s3": s3_resource},         run_config=run_config,     )     assert result.success
In this example,
pdf_data
is assumed to be a graph asset that has been decorated with
@graph_asset
. The
materialize
function is used to execute the asset with the provided
run_config
and resources. Please replace
"your_project.assets"
with the actual module path from your project. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the
run_config
and pass them to the
materialize
function as needed.
m

Manuel Fideles

03/07/2024, 4:13 PM
@Scout What should I put in place of
pdf_data
in the
run_config
dict? I'm still getting
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["manifest_keys"]' for asset '["pdf_data"]' is not produced by any of the provided asset ops and is not one of the provided sources
error when running the test
s

Scout

03/07/2024, 4:13 PM
The error message you're encountering suggests that Dagster is expecting
manifest_keys
to be an asset, but it's not found among the provided assets or sources. Since
manifest_keys
is a list of strings and not an asset, you need to provide it as a source to the graph asset. In Dagster, to provide a source to a graph asset, you typically define a source asset and then include it in the job's configuration. However, since
@graph_asset
is a relatively new feature and there seems to be some confusion about how to properly test it, I recommend checking the latest Dagster documentation or reaching out to the Dagster community for guidance on the current best practices for testing graph assets with source inputs. As of my knowledge cutoff in early 2023, the typical approach would be to define a job that includes the graph and then execute that job with the appropriate
run_config
. However, if
@graph_asset
is not behaving as expected with
run_config
, this might be an issue that needs to be addressed in the Dagster codebase or documentation. If you are still encountering issues, I would suggest creating a minimal reproducible example and submitting a question to the Dagster community or filing an issue on the Dagster GitHub repository to get more specific guidance on how to test a
@graph_asset
with source inputs like
manifest_keys
.
m

Manuel Fideles

03/11/2024, 4:24 PM
@Scout how do I test an asset that yields
Output
?
s

Scout

03/11/2024, 4:24 PM
To test an asset that yields
Output
, you can treat the asset as a Python generator and use the
next()
function to retrieve the outputs it yields. This allows you to capture each
Output
yielded by the asset and perform assertions on them. Here's an example of how you might write a test for an asset that yields `Output`:
Copy code
python from dagster import asset, Output  @asset def my_asset():     # Your asset logic here     yield Output(1, "output_one")     yield Output(2, "output_two")  def test_my_asset():     asset_gen = my_asset()          output_one = next(asset_gen)     assert output_one.value == 1     assert output_one.output_name == "output_one"          output_two = next(asset_gen)     assert output_two.value == 2     assert output_two.output_name == "output_two"          # If you expect no more outputs, you can assert that the generator is exhausted     with pytest.raises(StopIteration):         next(asset_gen)  # Run the test test_my_asset()
In this example,
my_asset
is an asset that yields two outputs. The
test_my_asset
function invokes
my_asset
, which returns a generator object. It then uses
next()
to get each
Output
and performs assertions on the value and output name. If your asset yields a single
Output
, you can simply call
next()
once and perform your assertions on that output. If your asset yields multiple outputs, you can call
next()
multiple times as shown above. Remember to replace the example asset and assertions with your actual asset logic and expected values. If you're using
pytest
, you can also use the
pytest.raises
context manager to assert that the generator is exhausted after retrieving all expected outputs.
m

Manuel Fideles

03/11/2024, 4:51 PM
@Scout I'm getting the error: "When yielding outputs from a op generator, they should be wrapped in an
Output
object."
s

Scout

03/11/2024, 4:51 PM
The error message you're encountering indicates that you're yielding something from an asset that is not wrapped in an
Output
object. In Dagster, when you have an asset that is a generator and you want to yield multiple outputs, each output must be wrapped in an
Output
object. Here's an example of how you should define an asset that yields multiple outputs:
Copy code
python from dagster import asset, Output  @asset def my_asset():     # Your asset logic here     yield Output(value="first_output", output_name="output_one")     yield Output(value="second_output", output_name="output_two")     # Continue yielding Output objects as needed
Each
yield
statement is returning an
Output
object with a specified
value
and
output_name
. The
output_name
is optional if you have only one output; it will default to
"result"
. When testing this asset, you should consume the generator to retrieve all the outputs:
Copy code
python def test_my_asset():     asset_gen = my_asset()          outputs = list(asset_gen)  # Consume the generator to get a list of Output objects     assert len(outputs) == 2  # Replace with the expected number of outputs     assert outputs[0].value == "first_output"     assert outputs[1].value == "second_output"     # Add more assertions as needed
In the test, you convert the generator to a list to retrieve all the
Output
objects, and then you can perform assertions on each one. If you're still encountering issues, please ensure that every yield statement within your asset function is yielding an
Output
object and that you're not accidentally yielding raw values or other types of objects. If the problem persists, please provide more context or code snippets, and we can look into it further.
m

Manuel Fideles

03/11/2024, 4:58 PM
@Scout Running the following test:
Copy code
def test_training_data_asset(
    pdf_data: dict[str, Any], mock_spec: LabelsAssetFactorySpec
):
    local_csv = read_retrospective_csv(
        LOCAL_LABELS_CSV_PATH,
        col_names=mock_spec.parsing_config.col_names,
        parse_dates=mock_spec.parsing_config.parse_dates,
        cols_to_keep=mock_spec.parsing_config.cols_to_keep,
    )
    labels = mock_spec.parser(local_csv)

    result = list(training_data(labels=labels, pdf_data=pdf_data))
    assert isinstance(result, list)
    assert len(result) > 0
yields the error: `dagster._core.errors.DagsterInvariantViolationError: When yielding outputs from a op generator, they should be wrapped in an
Output
object.`
s

Scout

03/11/2024, 4:58 PM
The error message you're encountering indicates that the
training_data
asset is yielding values that are not wrapped in an
Output
object. In Dagster, when an asset function yields values, each value must be wrapped in an
Output
object to be correctly handled by the Dagster framework. To resolve this issue, you need to ensure that the
training_data
asset is correctly yielding
Output
objects. Here's an example of how the
training_data
asset should be defined:
Copy code
python from dagster import asset, Output  @asset def training_data(labels, pdf_data):     # Your asset logic here     # For example, if you're yielding a single value:     yield Output(value=some_value, output_name="my_output")     # If you have multiple outputs, yield them like this:     # yield Output(value=first_value, output_name="first_output")     # yield Output(value=second_value, output_name="second_output")
Each
yield
statement should return an
Output
object with the
value
set to the value you want to yield and an optional
output_name
if you have multiple outputs. Now, in your test, you should be able to consume the generator returned by the
training_data
asset and assert that the results are
Output
objects:
Copy code
python def test_training_data_asset(pdf_data: dict[str, Any], mock_spec: LabelsAssetFactorySpec):     # ... (your existing setup code)      results = list(training_data(labels=labels, pdf_data=pdf_data))     assert isinstance(results, list)     assert len(results) > 0     for result in results:         assert isinstance(result, Output)
In this test, you're converting the generator to a list to retrieve all the
Output
objects, and then you're asserting that each item in the list is an instance of
Output
. Please ensure that the
training_data
asset is updated to yield
Output
objects as shown in the example, and then adjust your test accordingly. If you continue to encounter issues, please review the
training_data
asset's code to ensure that all yielded values are wrapped in
Output
objects and that no raw values are being yielded directly.
m

Manuel Fideles

03/11/2024, 5:11 PM
@Scout I'm already converting it to a list, but I'm still getting the same error
s

Scout

03/11/2024, 5:11 PM
If you're receiving the error "When yielding outputs from an op generator, they should be wrapped in an
Output
object," despite wrapping the yielded values in
Output
objects, it's possible that there might be a path in the asset's logic where a raw value is being yielded without being wrapped, or there could be an issue with how the asset is being invoked during the test. Here are some steps to debug this issue: 1. Review the Asset Code: Double-check the
training_data
asset code to ensure that every yield statement is yielding an
Output
object. There should be no code path that yields a raw value. 2. Check for Conditional Logic: If there is any conditional logic in the asset that might yield values, ensure that all branches of the condition are yielding
Output
objects. 3. Examine the Test Setup: Ensure that the test setup correctly mocks or provides all necessary inputs and resources that the asset expects. If the asset is not receiving the inputs it expects, it might yield unexpected values. 4. Simplify the Test: Try simplifying the test to yield a single, known
Output
object to confirm that the basic mechanism of yielding
Output
objects works as expected. 5. Use Direct Invocation: If you're using a complex test setup, try invoking the asset directly in a simplified manner to isolate the issue. Here's an example of a simplified test that directly invokes the asset:
Copy code
python from dagster import Output  @asset def training_data(labels, pdf_data):     # Simplified logic for debugging     yield Output(value="dummy_value", output_name="dummy_output")  def test_training_data_asset():     # Simplified test inputs     labels = "dummy_labels"     pdf_data = {"dummy_key": "dummy_value"}      results = list(training_data(labels=labels, pdf_data=pdf_data))     assert isinstance(results, list)     assert len(results) > 0     for result in results:         assert isinstance(result, Output)         assert result.output_name == "dummy_output"         assert result.value == "dummy_value"  # Run the simplified test test_training_data_asset()
If the simplified test passes without errors, it suggests that the issue lies in the more complex logic of the actual
training_data
asset or in the test setup. You would then incrementally add back the complexity to identify where the issue arises. If you've confirmed that all yields in the asset are correctly wrapped in
Output
objects and the simplified test still fails, it might be worth reaching out to the Dagster community or checking the Dagster GitHub issues for similar problems or potential bugs in the version of Dagster you are using.
m

Manuel Fideles

03/11/2024, 5:13 PM
@Scout The training_data asset code looks like this:
Copy code
@asset(
    check_specs=[
        AssetCheckSpec(name="not_many_no_texts", asset="training_data"),
        AssetCheckSpec(name="no_training_data_generated", asset='training_data'),
    ],
)
def training_data(
    labels: pd.DataFrame,
    pdf_data: dict,
):
    """
    Retrospective training data.
    """
    training_data = []
    uris_of_pdfs_with_no_text = []

    for pdf_uri, pdf_data in pdf_data.items():
        note_text: list[str] = pdf_data.get("texts")
        if note_text is None:
            uris_of_pdfs_with_no_text.append(pdf_uri)
        else:
            note_labels: list[dict] = labels[labels["s3_uri"].eq(pdf_uri)].to_dict(
                orient="records"
            )
            for label in note_labels:
                training_sample = NlpTrainSample(
                    account=label["account"],
                    note_type=NoteType.OPERATIVE.value,
                    label_source=LabelSourceType.RETROSPECTIVE.value,
                    specialty="",
                    organization=label["organization"],
                    text=" ".join(note_text),
                    dos=label["date_of_service"],
                    integration_s3_uri=pdf_uri,
                    cpt=label["cpt"],
                    modifiers=label["modifiers"],
                    icd10=label["icd10"],
                )
                training_data.append(training_sample.to_dict())

    yield AssetCheckResult(
        passed=bool(len(uris_of_pdfs_with_no_text) * 1000 < len(pdf_data)),
        check_name='not_many_no_texts',
        metadata={
            "# of pdfs w/ no text": len(uris_of_pdfs_with_no_text),
        },
    )

    yield AssetCheckResult(
        passed=bool(len(training_data) > 0),
        check_name='no_training_data_generated'
    )

    yield Output(
        training_data,
        metadata={
            "Empty pdf URIs": MetadataValue.json(uris_of_pdfs_with_no_text),
            "# of training samples created": <http://MetadataValue.int|MetadataValue.int>(len(training_data)),
            "Preview": MetadataValue.json(training_data[:5]),
        },
    )
s

Scout

03/11/2024, 5:13 PM
Based on the provided
training_data
asset code, it looks like you are correctly yielding
AssetCheckResult
objects for the checks and an
Output
object for the actual training data. The error message you're encountering should not occur if every yield statement in the asset is indeed yielding either an
AssetCheckResult
or an
Output
object. However, the error message suggests that somewhere in the execution, a value is being yielded that is not wrapped in an
Output
object. Here are a few things to check: 1. Ensure Correct Import: Verify that
Output
and
AssetCheckResult
are correctly imported from
dagster
. 2. Check for Typos: Ensure that there are no typos or copy-paste errors in the asset code that might cause a raw value to be yielded. 3. Check Asset Checks: Ensure that the asset checks (
not_many_no_texts
and
no_training_data_generated
) are correctly defined elsewhere in your codebase and that they are yielding
AssetCheckResult
objects. 4. Check for Shadowing: Ensure that the variable
pdf_data
is not being shadowed or overwritten anywhere in the asset code, which could lead to unexpected behavior. 5. Check Asset Execution: When testing, ensure that the asset is being executed in a way that is consistent with how Dagster expects assets to be run. For example, if you're using
materialize
to execute the asset, ensure that the
run_config
and any other parameters are correctly specified. 6. Simplify the Asset: Temporarily simplify the asset to yield a single
Output
object to confirm that the basic mechanism of yielding
Output
objects works as expected. 7. Check Dagster Version: Ensure that you are using a version of Dagster that supports the features you are using. If you are using a newer feature like
@asset
with
check_specs
, make sure your Dagster version is up to date. If after checking these items the issue persists, consider reaching out to the Dagster community with a minimal reproducible example for further assistance. It's possible that there might be a bug or a subtle issue that's not immediately apparent from the code snippet provided.
12 Views