Manuel Fideles
03/07/2024, 11:42 AMScout
03/07/2024, 11:42 AMAssetExecutionContext
as a parameter, you can use the build_asset_context
function provided by Dagster to construct a context for testing purposes. This function allows you to create a context that mimics the one that would be provided during actual asset execution. Here's an example from the documentation on how to use build_asset_context
in a test: python from dagster import build_asset_context from your_project.assets import your_asset def test_your_asset(): with build_asset_context() as context: result = your_asset(context=context) # Perform assertions on 'result'
In this example, your_asset
is the asset you want to test, and it expects an AssetExecutionContext
as a parameter. By using build_asset_context
, you can create a context that is suitable for testing. You can then invoke your asset with this context and perform any necessary assertions on the result to verify that your asset behaves as expected. Remember to replace your_project.assets
and your_asset
with the actual module path and asset name from your project. Additionally, you can pass arguments to build_asset_context
to customize the context with resources, config, and other dependencies your asset might need for the test.Manuel Fideles
03/07/2024, 11:44 AMScout
03/07/2024, 11:45 AMconfig_schema
defined with default values, you can still use the build_asset_context
function to create a context for testing. When you use build_asset_context
, you can pass in the configuration data that your asset expects. If you want to test with the default values, you can omit those values from the configuration data you pass in, and Dagster will use the defaults specified in your config_schema
. Here's an example of how you might test an asset with a `config_schema`: python from dagster import build_asset_context from your_project.assets import your_asset def test_your_asset_with_default_config(): # Assuming 'your_asset' has a config_schema with default values with build_asset_context() as context: result = your_asset(context=context) # Perform assertions on 'result' using the default config values def test_your_asset_with_custom_config(): custom_config = { "ops": { "your_asset": { "config": { "your_config_key": "your_custom_value", # ... other config values } } } } with build_asset_context(config=custom_config) as context: result = your_asset(context=context) # Perform assertions on 'result' using the custom config values
In the first test function, test_your_asset_with_default_config
, we don't provide any specific configuration, so the asset will use the default values from its config_schema
. In the second test function, test_your_asset_with_custom_config
, we provide a custom configuration dictionary that matches the structure expected by the asset's config_schema
. This allows you to test how your asset behaves with different configuration values. Remember to replace "your_project.assets"
, "your_asset"
, "your_config_key"
, and "your_custom_value"
with the actual values from your project.Manuel Fideles
03/07/2024, 12:24 PMScout
03/07/2024, 12:24 PMS3Resource
from dagster_aws.s3
to an asset for testing, you can use the build_asset_context
function and provide the necessary resources in the resources
parameter. Here's an example of how you might do this: python from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import your_asset def test_your_asset_with_s3_resource(): # Configure the S3Resource with any required parameters s3_resource = S3Resource( s3_bucket="your_bucket_name", # Include any additional parameters needed to configure the S3Resource ) # Use the build_asset_context function to create a context with the S3Resource with build_asset_context(resources={"s3": s3_resource}) as context: result = your_asset(context=context) # Perform assertions on 'result'
In this example, you create an instance of S3Resource
with the necessary configuration for your S3 bucket and any other parameters required. Then, you pass a dictionary to build_asset_context
with a key that matches the name of the resource your asset expects (in this case, "s3"
). The value is the s3_resource
instance you created. When you call your asset within the context manager, it will have access to the S3Resource
as if it were being run within a Dagster pipeline. Remember to replace "your_project.assets"
, "your_asset"
, and "your_bucket_name"
with the actual values from your project. Additionally, ensure that any required configuration for the S3Resource
is provided when you instantiate it.Manuel Fideles
03/07/2024, 12:27 PMScout
03/07/2024, 12:27 PMmoto
to mock an S3 bucket for testing with Dagster, you can integrate it by setting up the mock S3 within your test function and then using the build_asset_context
function to provide the S3Resource
to your asset. Here's a step-by-step example: python import boto3 from moto import mock_s3 from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import your_asset import pytest @pytest.fixture def aws_credentials(): """Mocked AWS Credentials for moto.""" return { "aws_access_key_id": "fake_access_key", "aws_secret_access_key": "fake_secret_key", } @pytest.fixture def s3_bucket(aws_credentials): with mock_s3(): # Set up the mocked S3 environment s3 = boto3.client("s3", **aws_credentials) bucket_name = "my-mocked-bucket" s3.create_bucket(Bucket=bucket_name) # Here you can add any setup code you need (e.g., uploading test files to the bucket) yield bucket_name # This will be used by your tests def test_your_asset_with_mocked_s3(s3_bucket, aws_credentials): # Create a mocked S3Resource using the bucket from the s3_bucket fixture s3_resource = S3Resource( s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials, ) # Use the build_asset_context function to create a context with the mocked S3Resource with build_asset_context(resources={"s3": s3_resource}) as context: result = your_asset(context=context) # Perform assertions on 'result'
In this example, we use pytest
fixtures to set up the mocked AWS credentials and the mocked S3 bucket using moto
. The s3_bucket
fixture creates a new bucket in the mocked S3 environment and yields the bucket name, which is then used in the test function. The test_your_asset_with_mocked_s3
function takes the s3_bucket
and aws_credentials
fixtures as arguments. It creates an S3Resource
instance with the mocked bucket name and credentials. The build_asset_context
function is then used to create a context with the S3Resource
, which is passed to the asset under test. Remember to replace "your_project.assets"
and "your_asset"
with the actual values from your project. Also, ensure that any additional setup code required for your specific test case is added to the s3_bucket
fixture, such as uploading files to the mocked bucket.Manuel Fideles
03/07/2024, 12:52 PMScout
03/07/2024, 12:52 PMbuild_assets_job
function to create a job that includes both assets and then execute that job within your test. This will ensure that the dependent asset is materialized before the asset under test. Here's an example of how you might set up a test for an asset that depends on another asset's materialization: python from dagster import build_assets_job, AssetGroup, AssetIn, asset from your_project.assets import upstream_asset, downstream_asset # Define your assets, assuming upstream_asset is already defined @asset(ins={"upstream_asset": AssetIn()}) def downstream_asset(upstream_asset): # Your logic that depends on upstream_asset pass # Create an AssetGroup and build a job that includes both assets assets = [upstream_asset, downstream_asset] asset_group = AssetGroup(assets) assets_job = build_assets_job("test_assets_job", assets) def test_downstream_asset(): result = assets_job.execute_in_process() assert result.success # You can add more assertions here to validate the downstream_asset's materialization
In this example, upstream_asset
and downstream_asset
are the two assets you're working with. The downstream_asset
is defined with an input that depends on the upstream_asset
. We then create an AssetGroup
with both assets and use build_assets_job
to create a job named "test_assets_job"
. The test_downstream_asset
function executes the job in process using execute_in_process()
. This will run both assets in the correct order, ensuring that upstream_asset
is materialized before downstream_asset
. You can then assert that the job was successful and perform additional checks as needed. Remember to replace "your_project.assets"
, upstream_asset
, and downstream_asset
with the actual values from your project. If you need to pass specific configurations or resources to the assets for the test, you can do so by providing the appropriate arguments to execute_in_process()
.Manuel Fideles
03/07/2024, 12:58 PMScout
03/07/2024, 12:58 PMbuild_asset_context
function to create a context for your asset, and then manually invoke the asset function with the required dependencies. Here's a step-by-step guide on how to do this: 1. Manually create or mock the data that would be produced by the upstream asset. 2. Use the build_asset_context
function to create a context for your downstream asset. 3. Call the downstream asset function directly, passing in the mocked upstream data and the context. Here's an example of how you might write a test for a downstream asset that depends on an upstream asset: python from dagster import build_asset_context, AssetMaterialization, AssetKey from your_project.assets import upstream_asset, downstream_asset def test_downstream_asset(): # Step 1: Manually create or mock the data for the upstream asset upstream_data = create_upstream_data() # Replace with your data creation or mocking logic # Step 2: Use build_asset_context to create a context for the downstream asset with build_asset_context() as context: # Optionally, you can mock the materialization event for the upstream asset context.add_output( AssetMaterialization(asset_key=AssetKey("upstream_asset")), upstream_data ) # Step 3: Call the downstream asset function directly with the mocked data and context result = downstream_asset(upstream_data, context=context) # Perform assertions on 'result' assert result == expected_result # Replace with your expected result
In this example, upstream_asset
and downstream_asset
are your asset functions. The create_upstream_data
function is a placeholder for your logic to create or mock the data that upstream_asset
would produce. The context.add_output
line is optional and can be used to simulate the materialization of the upstream asset if your downstream asset logic requires it. The test_downstream_asset
function creates a context for testing, mocks the upstream data, and then calls the downstream_asset
function with the mocked data and context. Finally, it asserts that the result of the downstream asset is as expected. Remember to replace "your_project.assets"
, upstream_asset
, downstream_asset
, create_upstream_data
, and expected_result
with the actual values and functions from your project.Manuel Fideles
03/07/2024, 1:04 PM# Optionally, you can mock the materialization event for the upstream asset
?Scout
03/07/2024, 1:04 PMcontext.add_output
line mentioned in the previous response is not accurate, as there is no add_output
method on the context
object in Dagster. Instead, you would typically simulate the materialization of an upstream asset by ensuring that the data it would produce is available to the downstream asset during testing.
To mock the materialization event for the upstream asset, you can create the data that the upstream asset would produce and then pass it directly to the downstream asset function in your test. Here's how you can do it:
python
from dagster import build_asset_context
from your_project.assets import downstream_asset
def test_downstream_asset():
# Manually create or mock the data that would be produced by the upstream asset
upstream_data = create_upstream_data() # Replace with your data creation or mocking logic
# Use build_asset_context to create a context for the downstream asset
with build_asset_context() as context:
# Call the downstream asset function directly, passing in the mocked upstream data
result = downstream_asset(upstream_data, context=context)
# Perform assertions on 'result'
assert result == expected_result # Replace with your expected result
In this example, create_upstream_data
is a function that you would define to create or mock the data that the upstream asset would normally produce. The downstream_asset
function is then called with this data as if it were the output of the upstream asset.
If your downstream asset relies on the AssetMaterialization
event of the upstream asset, you would typically not need to mock the event itself in a unit test. Instead, you would focus on providing the necessary data to the downstream asset function and testing its behavior given that data.
However, if you do need to simulate the presence of an AssetMaterialization
event (for example, if your logic checks for the existence of materializations), you could use the instance
object available in the context to record an asset materialization event. Here's an example of how you might do that:
```python
from dagster import build_asset_context, AssetMaterialization, AssetKey
def test_downstream_asset_with_materialization(instance):
# Manually create or mock the data that would be produced by the upstream asset
upstream_data = create_upstream_data() # Replace with your data creation or mocking logic
# Record an asset materialization event for the upstream asset
instance.handle_new_materialization(
AssetMaterialization(asset_key=AssetKey("upstream_asset")),
run_id="example_run_id"
)
# Use build_asset_context to create a context for the downstream asset
with build_asset_context(instancScout
03/07/2024, 1:05 PMinstance
is a DagsterInstance
object that you would need to provide to your test, which can be a real or a test instance. The handle_new_materialization
method is used to record an asset materialization event. Note that this is a more advanced use case and typically not necessary for most unit testing scenarios. Please replace create_upstream_data
, downstream_asset
, and expected_result
with the actual functions and values from your project.Manuel Fideles
03/07/2024, 1:10 PMdef test_manifest_keys(s3_bucket: str, aws_credentials: None):
mock_config = {"account": ACCOUNT, "src_dir": "test/src", "dest_dir": "test/dest"}
s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)
with build_asset_context(
asset_config={"account", ACCOUNT}, resources={"s3": s3_resource}
) as context:
keys = manifest_keys(mock_config, context=context)
assert isinstance(keys, list)
assert len(keys) > 0
But I get the following error:
dagster._core.errors.DagsterInvalidInvocationError: Decorated function 'manifest_keys' has context argument, but no context was provided when invoking.
Scout
03/07/2024, 1:12 PMmanifest_keys
function is expecting a context argument, but it was not provided correctly when invoking the function. This can happen if the manifest_keys
function is decorated with @asset
or similar Dagster decorators that expect a context to be passed in. When using build_asset_context
, you should pass the context object directly to the asset function. However, in your code snippet, you're using asset_config={"account", ACCOUNT}
which seems incorrect because it should be a dictionary, not a set. Also, you're passing asset_config
and resources
to build_asset_context
, but you're not using the context to pass the configuration to the manifest_keys
function. Here's how you can modify your test to pass the context correctly: python from dagster import build_asset_context from dagster_aws.s3 import S3Resource from your_project.assets import manifest_keys def test_manifest_keys(s3_bucket: str, aws_credentials: None): mock_config = {"account": ACCOUNT, "src_dir": "test/src", "dest_dir": "test/dest"} s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials) # Correct the asset_config to be a dictionary asset_config = {"account": ACCOUNT} with build_asset_context( resources={"s3": s3_resource}, asset_config=asset_config # Pass the asset_config as a dictionary ) as context: # Pass the context and the mock_config to the asset function keys = manifest_keys(context=context, mock_config=mock_config) assert isinstance(keys, list) assert len(keys) > 0
In this corrected version, asset_config
is now a dictionary with the correct syntax. The context
object created by build_asset_context
is passed to the manifest_keys
function along with the mock_config
. Make sure that the manifest_keys
function is expecting the mock_config
as a separate parameter. If manifest_keys
is supposed to extract the config from the context, you don't need to pass mock_config
separately; instead, the function will access the config through the context object. Please replace ACCOUNT
and your_project.assets
with the actual values from your project. If manifest_keys
is an asset, ensure that it is defined to accept a context parameter, and adjust the function call accordingly.Manuel Fideles
03/07/2024, 2:31 PMScout
03/07/2024, 2:31 PMbuild_assets_job
function to create a job that includes the graph asset and then execute that job within your test. This allows you to test the asset within the context of a job execution, which is similar to how it would run in a production environment. Here's a step-by-step guide on how to test a graph asset: 1. Import the necessary functions and your graph asset. 2. Use build_assets_job
to create a job that includes your graph asset. 3. Execute the job in process using execute_in_process
and perform assertions on the result. Here's an example of how you might write a test for a graph asset: python import pytest from dagster import build_assets_job, AssetGroup from your_project.graph_assets import your_graph_asset # Create an AssetGroup and build a job that includes your graph asset assets = [your_graph_asset] asset_group = AssetGroup(assets) assets_job = build_assets_job("test_assets_job", assets) def test_your_graph_asset(): result = assets_job.execute_in_process() assert result.success # You can add more assertions here to validate the output of your graph asset
In this example, your_graph_asset
is the graph asset you want to test. We create an AssetGroup
with the graph asset and use build_assets_job
to create a job named "test_assets_job"
. The test_your_graph_asset
function executes the job in process using execute_in_process()
. This will run the graph asset as part of the job execution. You can then assert that the job was successful and perform additional checks as needed to validate the output of your graph asset. Remember to replace "your_project.graph_assets"
and "your_graph_asset"
with the actual values from your project. If you need to pass specific configurations or resources to the graph asset for the test, you can do so by providing the appropriate arguments to execute_in_process()
.Manuel Fideles
03/07/2024, 2:59 PMbuild_assets_job
exists anymoreScout
03/07/2024, 2:59 PMbuild_assets_job
function is not part of the current Dagster API. Instead, you should use the build_job
function to construct a job for testing your graph asset. Here's how you can test a graph asset using the build_job
function: 1. Define your assets using the @asset
decorator. 2. Create an AssetGroup
with the assets you want to include in your test. 3. Use the build_job
method on the AssetGroup
to create a job for testing. 4. Execute the job in process using execute_in_process
and perform assertions on the result. Here's an example: python import pytest from dagster import asset, AssetGroup, AssetOut, AssetsDefinition, Out from your_project.graph_assets import your_graph_asset # Define your assets @asset def my_asset(): # Your asset logic here return "my_asset_data" # Create an AssetGroup with your assets asset_group = AssetGroup(assets=[my_asset]) # Build a job from the AssetGroup test_job = asset_group.build_job(name="test_job") def test_my_asset(): result = test_job.execute_in_process() assert result.success # Add more assertions here to validate the behavior of your asset
In this example, my_asset
is a simple asset defined for demonstration purposes. You would replace this with your actual graph asset and any other assets it depends on. The AssetGroup
is created with the list of assets, and then build_job
is used to create a job named "test_job"
. The test_my_asset
function executes the job in process using execute_in_process()
. This will run the assets as part of the job execution. You can then assert that the job was successful and perform additional checks as needed to validate the behavior of your assets. Remember to replace "your_project.graph_assets"
and "your_graph_asset"
with the actual values from your project. If you need to pass specific configurations or resources to the assets for the test, you can do so by providing the appropriate arguments to execute_in_process()
.Manuel Fideles
03/07/2024, 3:00 PMScout
03/07/2024, 3:00 PMexecute_in_process
method. Here's how you can do it: 1. Define your graph using the @graph
decorator. 2. Create a job from the graph using the to_job
method. 3. Execute the job in process using execute_in_process
and perform assertions on the result. Here's an example of how you might write a test for a graph: python import pytest from dagster import graph, op, Out, Output from dagster.core.test_utils import instance_for_test @op def my_upstream_op(): # Logic for the upstream op return "upstream_data" @op def my_downstream_op(data): # Logic for the downstream op that depends on the upstream op return f"processed_{data}" @graph def my_graph(): my_downstream_op(my_upstream_op()) # Create a job from the graph my_job = my_graph.to_job() def test_my_graph(): with instance_for_test() as instance: result = my_job.execute_in_process(instance=instance) assert result.success # Add more assertions here to validate the behavior of your graph
In this example, my_upstream_op
and my_downstream_op
are operations (ops) that make up the graph my_graph
. The my_graph.to_job()
method is used to create a job from the graph. The test_my_graph
function uses instance_for_test
to create a temporary Dagster instance for testing. The job is then executed in process using execute_in_process()
, and assertions are made to ensure the job ran successfully and the graph behaved as expected. Remember to replace the example ops and graph with your actual Dagster graph and ops. If your graph requires specific configurations or resources, you can pass them to execute_in_process
using the run_config
and resources
parameters, respectively.Manuel Fideles
03/07/2024, 3:03 PMScout
03/07/2024, 3:03 PMexecute_in_process
method. You will need to pass the inputs as part of the run_config
argument. Here's an example of how you might write a test for a graph that takes an input: python import pytest from dagster import graph, op, In, job @op def my_op(input_value: str): # Logic that uses the input_value return f"processed_{input_value}" @graph def my_graph(input_value: str): return my_op(input_value) # Create a job from the graph my_job = my_graph.to_job() def test_my_graph(): input_value = "test_input" run_config = { "ops": { "my_op": { "inputs": { "input_value": input_value, } } } } result = my_job.execute_in_process(run_config=run_config) assert result.success # Add more assertions here to validate the behavior of your graph
In this example, my_op
is an operation that takes an input value, and my_graph
is a graph that also takes an input value and passes it to my_op
. The my_graph.to_job()
method creates a job from the graph. The test_my_graph
function defines an input_value
that you want to test with. The run_config
dictionary is structured to provide the input value to the my_op
operation within the graph. The job is then executed in process using execute_in_process()
, with the run_config
passed as an argument. Finally, assertions are made to ensure the job ran successfully and the graph processed the input as expected. Remember to replace the example operation, graph, and input value with your actual Dagster graph, operations, and test inputs. If your graph requires additional configurations or resources, you can include them in the run_config
and pass them to execute_in_process
as needed.Manuel Fideles
03/07/2024, 3:11 PMdef test_pdf_data(s3_bucket: str, aws_credentials: None):
mock_manifest_keys: list[str] = [
"<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>",
"<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>",
"<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>",
]
s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials)
result = materialize(
[pdf_data],
resources={"s3": s3_resource},
run_config=RunConfig(ops={"pipeline_config": {"config": {"account": ACCOUNT}}}),
)
assert result.sucess
where pdf_data
is decorated with @graph_asset
.
How can I provide mock_manifest_keys
variable to the materialize
so that pdf_data
has access to it?Scout
03/07/2024, 3:11 PMmock_manifest_keys
to the pdf_data
graph asset during materialization, you can use the run_config
parameter to specify the inputs for the asset. However, the materialize
function does not directly accept a run_config
parameter. Instead, you should construct a run_config
dictionary that specifies the inputs for the assets within the graph. Here's how you can modify your test to pass mock_manifest_keys
as an input to the pdf_data
graph asset: python from dagster import materialize def test_pdf_data(s3_bucket: str, aws_credentials: None): mock_manifest_keys: list[str] = [ "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>", ] s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials) # Construct the run_config dictionary run_config = { "ops": { "pdf_data": { # This should match the name of the op within your graph asset "inputs": { "manifest_keys": mock_manifest_keys, # The input name should match the expected input in your graph asset } } }, "resources": { "s3": { "config": { "s3_bucket": s3_bucket, # Include any other necessary S3 configuration here } } } } # Materialize the graph asset with the provided run_config result = materialize( [pdf_data], resources={"s3": s3_resource}, run_config=run_config, ) assert result.success
In this example, the run_config
dictionary is structured to provide the mock_manifest_keys
list as an input to the pdf_data
graph asset. The key "pdf_data"
under "ops"
in the run_config
should match the name of the op within your graph asset that expects the manifest_keys
input. Please note that the materialize
function does not accept a RunConfig
object directly. Instead, you should pass a dictionary as shown above. Also, ensure that the input name in the run_config
("manifest_keys" in this case) matches the name of the input parameter in your pdf_data
graph asset definition. Remember to replace "pdf_data"
with the actual name of the op within your graph asset if it's different. Additionally, adjust the S3 configuration as needed for your specific test setup.Manuel Fideles
03/07/2024, 3:16 PMdagster._core.errors.DagsterInvalidDefinitionError: Input asset '["manifest_keys"]' for asset '["pdf_data"]' is not produced by any of the provided asset ops and is not one of the provided sources
Scout
03/07/2024, 3:16 PMmanifest_keys
to be an asset, but it's not found among the provided assets or sources. When you're working with assets in Dagster, inputs to assets are typically other assets. However, if you need to provide a raw input value (like your mock_manifest_keys
list) that isn't an asset, you should use a SourceAsset
. To resolve this issue, you would define a SourceAsset
for manifest_keys
and include it in your asset's dependencies. Here's how you can modify your test to include mock_manifest_keys
as a `SourceAsset`: python from dagster import SourceAsset, materialize, AssetGroup # Define a SourceAsset for your mock_manifest_keys mock_manifest_keys_asset = SourceAsset( key="manifest_keys", metadata={"mock_data": mock_manifest_keys} ) # Create an AssetGroup including your pdf_data asset and the SourceAsset asset_group = AssetGroup( assets=[pdf_data], source_assets=[mock_manifest_keys_asset] ) # Materialize the assets in the asset_group result = materialize( asset_group, resources={"s3": s3_resource}, run_config={ "ops": { "pdf_data": { "inputs": { "manifest_keys": mock_manifest_keys, } } }, "resources": { "s3": { "config": { "s3_bucket": s3_bucket, # Include any other necessary S3 configuration here } } } } ) assert result.success
In this example, mock_manifest_keys_asset
is a SourceAsset
that represents the mock_manifest_keys
list. The AssetGroup
is created with both your pdf_data
asset and the mock_manifest_keys_asset
as a source asset. When you call materialize
, you pass the asset_group
and the run_config
that includes the mock_manifest_keys
as an input to the pdf_data
asset. Please ensure that the pdf_data
asset is defined to accept manifest_keys
as an input and that the mock_manifest_keys
list is structured in the way your asset expects. Additionally, replace "pdf_data"
with the actual name of your asset if it's different, and adjust the S3 configuration as needed for your specific test setup.Manuel Fideles
03/07/2024, 3:21 PMAssetGroup
isn't on dagster anymoreScout
03/07/2024, 3:21 PMAssetGroup
is no longer part of the Dagster API, you would use the to_job
method on the graph to create a job for testing, and then provide the inputs directly when executing the job. Here's how you can modify your test to pass mock_manifest_keys
as an input to the pdf_data
graph asset: python from dagster import materialize def test_pdf_data(s3_bucket: str, aws_credentials: None): mock_manifest_keys: list[str] = [ "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>", ] s3_resource = S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials) # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset run_config = { "ops": { "pdf_data": { # This should match the name of the op within your graph asset "inputs": { "manifest_keys": mock_manifest_keys, } } }, "resources": { "s3": { "config": { "s3_bucket": s3_bucket, # Include any other necessary S3 configuration here } } } } # Materialize the pdf_data asset with the provided run_config result = materialize( [pdf_data.to_job(resource_defs={"s3": s3_resource})], run_config=run_config, ) assert result.success
In this example, pdf_data
is assumed to be a graph asset that has been decorated with @graph_asset
. The to_job
method is used to create a job from the pdf_data
asset, and the s3_resource
is provided as a resource definition. The run_config
dictionary is structured to provide the mock_manifest_keys
list as an input to the pdf_data
asset. The key "pdf_data"
under "ops"
in the run_config
should match the name of the op within your graph asset that expects the manifest_keys
input. Please replace "pdf_data"
with the actual name of the op within your graph asset if it's different. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the run_config
and pass them to materialize
as needed.Manuel Fideles
03/07/2024, 4:01 PM@op
def pypdf_extract_text(
context: OpExecutionContext, s3: S3Resource, manifest_keys: list[str]
) -> dict:
"""Ask pypdf to extract the text, check if we got something"""
s3_client = s3.get_client()
pdf_uris = set(manifest_keys)
pdfs_data = {}
threads = {}
already_done = 0
processed_by_pypdf = 0
for pdf_uri in pdf_uris:
parsed_pdf_uri: ParseResult = urlparse(pdf_uri)
bucket = parsed_pdf_uri.netloc
pdf_key = parsed_pdf_uri.path.lstrip("/")
base_s3_dir, _ = os.path.split(pdf_key)
response_json_key = os.path.join(base_s3_dir, "response.json")
pdfs_data[pdf_uri] = {
"bucket": bucket,
"pdf_key": pdf_key,
"base_s3_dir": base_s3_dir,
"response_json_key": response_json_key,
"already_done": False,
}
try:
s3_client.head_object(
Bucket=bucket,
Key=response_json_key,
)
pdfs_data[pdf_uri]["already_done"] = True
already_done += 1
continue
except ClientError as error:
if error.response["ResponseMetadata"]["HTTPStatusCode"] != 404:
raise error
t = ThreadWithReturnValue(
target=extract_pdf_text,
args=(s3_client, pdfs_data[pdf_uri]),
)
threads[pdf_uri] = t
t.start()
# 100 per second seems good enough, s3 limit seems to be in thousands
if len(threads) == 100:
for t_pdf_uri, t in threads.items():
texts = t.join()
if len(", ".join(texts)) > 42:
pdfs_data[t_pdf_uri]["texts"] = texts
processed_by_pypdf += 1
threads = {}
time.sleep(1)
for t_pdf_uri, t in threads.items():
texts = t.join()
if len(", ".join(texts)) > 42:
pdfs_data[t_pdf_uri]["texts"] = texts
processed_by_pypdf += 1
context.add_output_metadata(
metadata={
"unique_pdf_uris": len(pdf_uris),
"processed_by_pypdf": processed_by_pypdf,
"already_done": already_done,
}
)
return pdfs_data
@op
def start_textract(
context: OpExecutionContext,
pypdf_extract_text: dict,
) -> dict:
"""Ask Textract to OCR the files in S3, skip the ones with previous result or pypdf result"""
config = BotoConfig(
retries={
"mode": "adaptive", # let boto handle the retries with the latest and greatest
}
)
textract_client = boto3.client("textract", region_name=AWS_REGION, config=config)
pdfs_data = pypdf_extract_text
threads = {}
requested_textract_ocr = 0
for pdf_uri, pdf_data in pdfs_data.items():
if pdf_data["already_done"] is True:
continue
try:
pdf_data["texts"]
continue
except KeyError:
pass
start_textract_thread(
textract_client,
threads,
pdf_uri,
pdf_data,
)
# limit of 10 per second should be safe, i can test and/or ask amazon for more
if len(threads) == 10:
for t_pdf_uri, t in threads.items():
response = t.join()
pdfs_data[t_pdf_uri]["textract_response_dir"] = os.path.join(
pdfs_data[t_pdf_uri]["base_s3_dir"], response["JobId"]
)
requested_textract_ocr += 1
threads = {}
time.sleep(1)
for t_pdf_uri, t in threads.items():
response = t.join()
pdfs_data[t_pdf_uri]["textract_response_dir"] = os.path.join(
pdfs_data[t_pdf_uri]["base_s3_dir"], response["JobId"]
)
requested_textract_ocr += 1
context.add_output_metadata(
metadata={"requested_textract_ocr": requested_textract_ocr}
)
return pdfs_data
@op
def save_results(
context: OpExecutionContext,
s3: S3Resource,
start_textract: dict,
) -> dict:
"""Move Textract responses, or save pypdf responses, to response.jsons in pdfs dirs"""
s3_client = s3.get_client()
threads: list[Thread] = []
pdfs_data = start_textract
save_pypdf_response_tries = 0
move_textract_response_tries = 0
for _, pdf_data in pdfs_data.items():
if pdf_data["already_done"] is True:
continue
try:
pdf_data["texts"]
text_channel = "pypdf"
except KeyError:
text_channel = "textract"
if text_channel == "pypdf":
t = Thread(
target=save_pypdf_response,
args=(s3_client, pdf_data),
)
save_pypdf_response_tries += 1
elif text_channel == "textract":
t = Thread(
target=move_textract_response,
args=(s3_client, pdf_data),
)
move_textract_response_tries += 1
else:
raise ValueError(f"Unknown text_channel: {text_channel}")
threads.append(t)
t.start()
# 100 per second seems good enough, s3 limit seems to be in thousands
if len(threads) == 100:
for t in threads:
t.join()
threads = []
time.sleep(1)
for t in threads:
t.join()
context.add_output_metadata(
metadata={
"move_textract_response_tries": move_textract_response_tries,
"save_pypdf_response_tries": save_pypdf_response_tries,
}
)
return pdfs_data
@op
def note_texts(context: OpExecutionContext, s3: S3Resource, pdfs_data: dict) -> dict:
"""
The texts extracted from the Textract'ed response.json files
"""
s3_client = s3.get_client()
threads = {}
loaded_response_jsons = 0 # already done + textract
missing_keys = []
for uri, pdf_data in pdfs_data.items():
try:
pdf_data["texts"] # noqa
continue
except KeyError:
pass
t = ThreadWithReturnValue(
target=get_response_json_text,
args=(s3_client, pdf_data),
)
threads[uri] = t
t.start()
# 100 per second seems good enough, s3 limit seems to be in thousands
if len(threads) == 100:
for t_uri, t in threads.items():
texts = t.join()
pdfs_data[t_uri]["texts"] = texts
loaded_response_jsons += 1
threads = {}
time.sleep(1)
for t_pdf_uri, t in threads.items():
texts = t.join() # noqa
pdfs_data[t_pdf_uri]["texts"] = texts
loaded_response_jsons += 1
unique_missing_keys = list(set(missing_keys))
context.add_output_metadata(
metadata={
"loaded_response_jsons": <http://MetadataValue.int|MetadataValue.int>(loaded_response_jsons),
"# of missing_keys": <http://MetadataValue.int|MetadataValue.int>(len(unique_missing_keys)),
"preview missing_keys": MetadataValue.json(unique_missing_keys[:5]),
"preview (in dataframe format)": MetadataValue.json(tuple(choice(list(pdfs_data.items())))),
}
)
return pdfs_data
@graph
def pdf_data_graph(manifest_keys: list[str]):
return note_texts(save_results(start_textract(pypdf_extract_text(manifest_keys))))
@graph_asset
def pdf_data(manifest_keys: list[str]) -> dict:
"""
The data extracted w/ Textract from the URIs present
in the manifest.
"""
return pdf_data_graph(manifest_keys)
What run_config should I use when testing with Pytest to ensure the mock_manifest_keys
takes `manifest_keys`' place?Scout
03/07/2024, 4:01 PM@graph_asset
with Pytest and ensure that mock_manifest_keys
is used in place of manifest_keys
, you will need to provide a run_config
that specifies the inputs for your asset. Since pdf_data
is a graph asset, you will need to create a job from it and then execute that job with the appropriate run_config
. Here's an example of how you might write a test for your pdf_data
graph asset: python import pytest from dagster import build_op_context, build_init_resource_context from your_project.assets import pdf_data, pypdf_extract_text, start_textract, save_results, note_texts @pytest.fixture def s3_resource(s3_bucket: str, aws_credentials: None): return S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials) def test_pdf_data(s3_resource): mock_manifest_keys: list[str] = [ "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>", ] # Create a job from the graph asset pdf_data_job = pdf_data.to_job(resource_defs={"s3": s3_resource}) # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset run_config = { "ops": { "pdf_data_graph": { "inputs": { "manifest_keys": mock_manifest_keys, } } } } # Execute the job with the provided run_config result = pdf_data_job.execute_in_process(run_config=run_config) assert result.success # Add more assertions here to validate the behavior of your graph asset
In this example, pdf_data
is your graph asset decorated with @graph_asset
. The to_job
method is used to create a job from the pdf_data
asset, and the s3_resource
is provided as a resource definition. The run_config
dictionary is structured to provide the mock_manifest_keys
list as an input to the pdf_data_graph
operation within the job. The key "pdf_data_graph"
under "ops"
in the run_config
should match the name of the graph within your graph asset that expects the manifest_keys
input. Please replace "your_project.assets"
with the actual module path from your project. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the run_config
and pass them to the job execution as needed.Manuel Fideles
03/07/2024, 4:04 PMpdf_data_job = <http://pdf_data.to|pdf_data.to>_job(resource_defs={"s3": s3_resource})
line of code:
AttributeError: 'AssetsDefinition' object has no attribute 'to_job'
Scout
03/07/2024, 4:04 PMpdf_data
, which is decorated with @graph_asset
, is being treated as an AssetsDefinition
object, which does not have a to_job
method. In Dagster, @graph_asset
is used to define an asset that is composed of a graph of ops. To test this asset, you should materialize it directly rather than converting it to a job. You can do this by using the materialize
function and providing the necessary run_config
and resources. Here's how you can modify your test to materialize the pdf_data
graph asset: python import pytest from dagster import materialize, AssetMaterialization, AssetKey from dagster_aws.s3 import S3Resource from your_project.assets import pdf_data @pytest.fixture def s3_resource(s3_bucket: str, aws_credentials: None): return S3Resource(s3_bucket=s3_bucket, s3_session_kwargs=aws_credentials) def test_pdf_data(s3_bucket: str, aws_credentials: None): mock_manifest_keys: list[str] = [ "<s3://aiml-pipeline-training-data/test/src/0/assets-fake-picture-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/1/assets-test.pdf>", "<s3://aiml-pipeline-training-data/test/src/2/assets-test-2-pages.pdf>", ] # Define the run_config with the mock_manifest_keys as an input to the pdf_data asset run_config = { "ops": { "pdf_data": { # This should match the name of the op within your graph asset "inputs": { "manifest_keys": mock_manifest_keys, } } }, "resources": { "s3": { "config": { "s3_bucket": s3_bucket, # Include any other necessary S3 configuration here } } } } # Materialize the pdf_data asset with the provided run_config result = materialize( [pdf_data], resources={"s3": s3_resource}, run_config=run_config, ) assert result.success
In this example, pdf_data
is assumed to be a graph asset that has been decorated with @graph_asset
. The materialize
function is used to execute the asset with the provided run_config
and resources. Please replace "your_project.assets"
with the actual module path from your project. Additionally, adjust the S3 configuration as needed for your specific test setup. If your asset requires additional configurations or resources, you can include them in the run_config
and pass them to the materialize
function as needed.Manuel Fideles
03/07/2024, 4:13 PMpdf_data
in the run_config
dict? I'm still getting dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["manifest_keys"]' for asset '["pdf_data"]' is not produced by any of the provided asset ops and is not one of the provided sources
error when running the testScout
03/07/2024, 4:13 PMmanifest_keys
to be an asset, but it's not found among the provided assets or sources. Since manifest_keys
is a list of strings and not an asset, you need to provide it as a source to the graph asset. In Dagster, to provide a source to a graph asset, you typically define a source asset and then include it in the job's configuration. However, since @graph_asset
is a relatively new feature and there seems to be some confusion about how to properly test it, I recommend checking the latest Dagster documentation or reaching out to the Dagster community for guidance on the current best practices for testing graph assets with source inputs. As of my knowledge cutoff in early 2023, the typical approach would be to define a job that includes the graph and then execute that job with the appropriate run_config
. However, if @graph_asset
is not behaving as expected with run_config
, this might be an issue that needs to be addressed in the Dagster codebase or documentation. If you are still encountering issues, I would suggest creating a minimal reproducible example and submitting a question to the Dagster community or filing an issue on the Dagster GitHub repository to get more specific guidance on how to test a @graph_asset
with source inputs like manifest_keys
.Manuel Fideles
03/11/2024, 4:24 PMOutput
?Scout
03/11/2024, 4:24 PMOutput
, you can treat the asset as a Python generator and use the next()
function to retrieve the outputs it yields. This allows you to capture each Output
yielded by the asset and perform assertions on them. Here's an example of how you might write a test for an asset that yields `Output`: python from dagster import asset, Output @asset def my_asset(): # Your asset logic here yield Output(1, "output_one") yield Output(2, "output_two") def test_my_asset(): asset_gen = my_asset() output_one = next(asset_gen) assert output_one.value == 1 assert output_one.output_name == "output_one" output_two = next(asset_gen) assert output_two.value == 2 assert output_two.output_name == "output_two" # If you expect no more outputs, you can assert that the generator is exhausted with pytest.raises(StopIteration): next(asset_gen) # Run the test test_my_asset()
In this example, my_asset
is an asset that yields two outputs. The test_my_asset
function invokes my_asset
, which returns a generator object. It then uses next()
to get each Output
and performs assertions on the value and output name. If your asset yields a single Output
, you can simply call next()
once and perform your assertions on that output. If your asset yields multiple outputs, you can call next()
multiple times as shown above. Remember to replace the example asset and assertions with your actual asset logic and expected values. If you're using pytest
, you can also use the pytest.raises
context manager to assert that the generator is exhausted after retrieving all expected outputs.Manuel Fideles
03/11/2024, 4:51 PMOutput
object."Scout
03/11/2024, 4:51 PMOutput
object. In Dagster, when you have an asset that is a generator and you want to yield multiple outputs, each output must be wrapped in an Output
object. Here's an example of how you should define an asset that yields multiple outputs: python from dagster import asset, Output @asset def my_asset(): # Your asset logic here yield Output(value="first_output", output_name="output_one") yield Output(value="second_output", output_name="output_two") # Continue yielding Output objects as needed
Each yield
statement is returning an Output
object with a specified value
and output_name
. The output_name
is optional if you have only one output; it will default to "result"
. When testing this asset, you should consume the generator to retrieve all the outputs: python def test_my_asset(): asset_gen = my_asset() outputs = list(asset_gen) # Consume the generator to get a list of Output objects assert len(outputs) == 2 # Replace with the expected number of outputs assert outputs[0].value == "first_output" assert outputs[1].value == "second_output" # Add more assertions as needed
In the test, you convert the generator to a list to retrieve all the Output
objects, and then you can perform assertions on each one. If you're still encountering issues, please ensure that every yield statement within your asset function is yielding an Output
object and that you're not accidentally yielding raw values or other types of objects. If the problem persists, please provide more context or code snippets, and we can look into it further.Manuel Fideles
03/11/2024, 4:58 PMdef test_training_data_asset(
pdf_data: dict[str, Any], mock_spec: LabelsAssetFactorySpec
):
local_csv = read_retrospective_csv(
LOCAL_LABELS_CSV_PATH,
col_names=mock_spec.parsing_config.col_names,
parse_dates=mock_spec.parsing_config.parse_dates,
cols_to_keep=mock_spec.parsing_config.cols_to_keep,
)
labels = mock_spec.parser(local_csv)
result = list(training_data(labels=labels, pdf_data=pdf_data))
assert isinstance(result, list)
assert len(result) > 0
yields the error:
`dagster._core.errors.DagsterInvariantViolationError: When yielding outputs from a op generator, they should be wrapped in an Output
object.`Scout
03/11/2024, 4:58 PMtraining_data
asset is yielding values that are not wrapped in an Output
object. In Dagster, when an asset function yields values, each value must be wrapped in an Output
object to be correctly handled by the Dagster framework. To resolve this issue, you need to ensure that the training_data
asset is correctly yielding Output
objects. Here's an example of how the training_data
asset should be defined: python from dagster import asset, Output @asset def training_data(labels, pdf_data): # Your asset logic here # For example, if you're yielding a single value: yield Output(value=some_value, output_name="my_output") # If you have multiple outputs, yield them like this: # yield Output(value=first_value, output_name="first_output") # yield Output(value=second_value, output_name="second_output")
Each yield
statement should return an Output
object with the value
set to the value you want to yield and an optional output_name
if you have multiple outputs. Now, in your test, you should be able to consume the generator returned by the training_data
asset and assert that the results are Output
objects: python def test_training_data_asset(pdf_data: dict[str, Any], mock_spec: LabelsAssetFactorySpec): # ... (your existing setup code) results = list(training_data(labels=labels, pdf_data=pdf_data)) assert isinstance(results, list) assert len(results) > 0 for result in results: assert isinstance(result, Output)
In this test, you're converting the generator to a list to retrieve all the Output
objects, and then you're asserting that each item in the list is an instance of Output
. Please ensure that the training_data
asset is updated to yield Output
objects as shown in the example, and then adjust your test accordingly. If you continue to encounter issues, please review the training_data
asset's code to ensure that all yielded values are wrapped in Output
objects and that no raw values are being yielded directly.Manuel Fideles
03/11/2024, 5:11 PMScout
03/11/2024, 5:11 PMOutput
object," despite wrapping the yielded values in Output
objects, it's possible that there might be a path in the asset's logic where a raw value is being yielded without being wrapped, or there could be an issue with how the asset is being invoked during the test. Here are some steps to debug this issue: 1. Review the Asset Code: Double-check the training_data
asset code to ensure that every yield statement is yielding an Output
object. There should be no code path that yields a raw value. 2. Check for Conditional Logic: If there is any conditional logic in the asset that might yield values, ensure that all branches of the condition are yielding Output
objects. 3. Examine the Test Setup: Ensure that the test setup correctly mocks or provides all necessary inputs and resources that the asset expects. If the asset is not receiving the inputs it expects, it might yield unexpected values. 4. Simplify the Test: Try simplifying the test to yield a single, known Output
object to confirm that the basic mechanism of yielding Output
objects works as expected. 5. Use Direct Invocation: If you're using a complex test setup, try invoking the asset directly in a simplified manner to isolate the issue. Here's an example of a simplified test that directly invokes the asset: python from dagster import Output @asset def training_data(labels, pdf_data): # Simplified logic for debugging yield Output(value="dummy_value", output_name="dummy_output") def test_training_data_asset(): # Simplified test inputs labels = "dummy_labels" pdf_data = {"dummy_key": "dummy_value"} results = list(training_data(labels=labels, pdf_data=pdf_data)) assert isinstance(results, list) assert len(results) > 0 for result in results: assert isinstance(result, Output) assert result.output_name == "dummy_output" assert result.value == "dummy_value" # Run the simplified test test_training_data_asset()
If the simplified test passes without errors, it suggests that the issue lies in the more complex logic of the actual training_data
asset or in the test setup. You would then incrementally add back the complexity to identify where the issue arises. If you've confirmed that all yields in the asset are correctly wrapped in Output
objects and the simplified test still fails, it might be worth reaching out to the Dagster community or checking the Dagster GitHub issues for similar problems or potential bugs in the version of Dagster you are using.Manuel Fideles
03/11/2024, 5:13 PM@asset(
check_specs=[
AssetCheckSpec(name="not_many_no_texts", asset="training_data"),
AssetCheckSpec(name="no_training_data_generated", asset='training_data'),
],
)
def training_data(
labels: pd.DataFrame,
pdf_data: dict,
):
"""
Retrospective training data.
"""
training_data = []
uris_of_pdfs_with_no_text = []
for pdf_uri, pdf_data in pdf_data.items():
note_text: list[str] = pdf_data.get("texts")
if note_text is None:
uris_of_pdfs_with_no_text.append(pdf_uri)
else:
note_labels: list[dict] = labels[labels["s3_uri"].eq(pdf_uri)].to_dict(
orient="records"
)
for label in note_labels:
training_sample = NlpTrainSample(
account=label["account"],
note_type=NoteType.OPERATIVE.value,
label_source=LabelSourceType.RETROSPECTIVE.value,
specialty="",
organization=label["organization"],
text=" ".join(note_text),
dos=label["date_of_service"],
integration_s3_uri=pdf_uri,
cpt=label["cpt"],
modifiers=label["modifiers"],
icd10=label["icd10"],
)
training_data.append(training_sample.to_dict())
yield AssetCheckResult(
passed=bool(len(uris_of_pdfs_with_no_text) * 1000 < len(pdf_data)),
check_name='not_many_no_texts',
metadata={
"# of pdfs w/ no text": len(uris_of_pdfs_with_no_text),
},
)
yield AssetCheckResult(
passed=bool(len(training_data) > 0),
check_name='no_training_data_generated'
)
yield Output(
training_data,
metadata={
"Empty pdf URIs": MetadataValue.json(uris_of_pdfs_with_no_text),
"# of training samples created": <http://MetadataValue.int|MetadataValue.int>(len(training_data)),
"Preview": MetadataValue.json(training_data[:5]),
},
)
Scout
03/11/2024, 5:13 PMtraining_data
asset code, it looks like you are correctly yielding AssetCheckResult
objects for the checks and an Output
object for the actual training data. The error message you're encountering should not occur if every yield statement in the asset is indeed yielding either an AssetCheckResult
or an Output
object. However, the error message suggests that somewhere in the execution, a value is being yielded that is not wrapped in an Output
object. Here are a few things to check: 1. Ensure Correct Import: Verify that Output
and AssetCheckResult
are correctly imported from dagster
. 2. Check for Typos: Ensure that there are no typos or copy-paste errors in the asset code that might cause a raw value to be yielded. 3. Check Asset Checks: Ensure that the asset checks (not_many_no_texts
and no_training_data_generated
) are correctly defined elsewhere in your codebase and that they are yielding AssetCheckResult
objects. 4. Check for Shadowing: Ensure that the variable pdf_data
is not being shadowed or overwritten anywhere in the asset code, which could lead to unexpected behavior. 5. Check Asset Execution: When testing, ensure that the asset is being executed in a way that is consistent with how Dagster expects assets to be run. For example, if you're using materialize
to execute the asset, ensure that the run_config
and any other parameters are correctly specified. 6. Simplify the Asset: Temporarily simplify the asset to yield a single Output
object to confirm that the basic mechanism of yielding Output
objects works as expected. 7. Check Dagster Version: Ensure that you are using a version of Dagster that supports the features you are using. If you are using a newer feature like @asset
with check_specs
, make sure your Dagster version is up to date. If after checking these items the issue persists, consider reaching out to the Dagster community with a minimal reproducible example for further assistance. It's possible that there might be a bug or a subtle issue that's not immediately apparent from the code snippet provided.