https://dagster.io/ logo
#ask-ai
Title
# ask-ai
p

Peter Mburu

03/01/2024, 2:24 PM
@Scout how to patch an upstream assets that get external data in testing the entire pipeline from get the data, processing and ingestion into database
s

Scout

03/01/2024, 2:24 PM
When testing a pipeline that involves external data sources, you can use a technique known as "smoke testing" to ensure that each part of your pipeline is functioning correctly without having to rely on the actual external data sources. This can be particularly useful during local development or continuous integration (CI) to prevent bugs from making it into production. To patch an upstream asset that gets external data, you can use stubs or mocks for the external data sources. This allows you to simulate the behavior of the external systems and provide controlled input to your pipeline. Here's a general approach based on the documentation: 1. Create Observable Source Assets: Define observable source assets that represent the external data sources. These assets should have a user-defined function that computes and returns a data version, which can be a hash of the file contents or any other relevant identifier. 2. Stubbing External Data: Instead of connecting to the real external data sources, you can create stubs that return hardcoded or dynamically generated data that mimics the structure and content of the expected external data. This allows you to control the input to your pipeline and ensure it is consistent across tests. 3. Versioning: Add versioning to your
SourceAsset
by making it observable. This allows you to track changes in the external data and ensure that your pipeline behaves correctly when the data changes. 4. Testing the Pipeline: Run your pipeline using the stubbed data sources. You can use the
materialize
function to execute the pipeline with the assets loaded from your module. This will run the pipeline as if it were using the actual external data, but with the controlled input you have provided. 5. Database Ingestion: For the database ingestion part, you can use a similar approach by creating a stubbed database or using an in-memory database that mimics the behavior of your production database. Here's a simplified example of how you might define an observable source asset and use it in a pipeline:
Copy code
python
from dagster import observable_source_asset, SourceAsset, DataVersion

@observable_source_asset
def input_number(context):
    # This function would compute a hash of the file contents and return it as a `DataVersion`.
    # For testing, you can return a hardcoded `DataVersion` or generate one based on stubbed data.
    return DataVersion("stubbed_data_version")

# Define your processing and ingestion assets here, using `input_number` as an upstream dependency.
And then you would run your pipeline like this: ```python from dagster import load_assets_from_current_module, materialize assets = load_assets_from_current_module() materialize(assets, resources={ # Define your resources here
, such as a stubbed database or other dependencies. }) ``` Remember to replace the stubbed data and resources with the actual implementations when you're ready to run your pipeline in a production environment. Please note that the code examples above are based on the concepts described in the documentation and should be adapted to fit the specifics of your pipeline and testing needs.
p

Peter Mburu

03/01/2024, 2:26 PM
@Scout Traceback (most recent call last): File "/home/pmburu/git/fdw/dagster_pipelines/assets/ingestion/price/tests/test_pipelines.py", line 239, in test_import_imf_data result = materialize(assets) File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_core/definitions/materialize.py", line 88, in materialize assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1125, in sequence_param return _check_iterable_items(obj, of_type, "sequence") File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1768, in _check_iterable_items raise CheckError( dagster._check.CheckError: Member of sequence mismatches type. Expected (<class 'dagster._core.definitions.assets.AssetsDefinition'>, <class 'dagster._core.definitions.source_asset.SourceAsset'>). Got <MagicMock name='remote_imf_data' id='140682095545600'> of type <class 'unittest.mock.MagicMock'>.
s

Scout

03/01/2024, 2:26 PM
The error message you're encountering indicates that the
materialize
function is expecting a sequence of
AssetsDefinition
or
SourceAsset
objects, but it received a
MagicMock
object instead. This typically happens when you're trying to pass a mocked object where the function expects a specific type of object related to Dagster's asset system. Here's a step-by-step breakdown of the error message: 1. The
materialize
function is called with an argument named
assets
. 2. The
check.sequence_param
function is used to validate that
assets
is a sequence of objects of type
AssetsDefinition
or
SourceAsset
. 3. The validation fails because an item in the sequence is a
MagicMock
object, which is not one of the expected types. To resolve this issue, you need to ensure that the
assets
argument you pass to the
materialize
function is a sequence of
AssetsDefinition
or
SourceAsset
objects. Here's what you can do: - Review the
assets
variable you're passing to
materialize
and make sure that it contains the correct types. If you're using mocks for testing, you'll need to replace the
MagicMock
with an appropriate
AssetsDefinition
or
SourceAsset
object or a suitable mock that mimics these types. - If you're trying to mock the behavior of an asset for testing purposes, you should create a mock that adheres to the interface of an
AssetsDefinition
or
SourceAsset
. This means the mock should behave like an actual asset in the context of the
materialize
function. - Ensure that any setup code for the test is correctly creating the assets that should be materialized. If you're using a factory or builder pattern to create test assets, double-check that the objects being produced are of the correct type. Here's an example of how you might define a test asset for use with `materialize`:
Copy code
python from dagster import asset  @asset def test_asset():     # Your test asset logic here     return "test_data"
And then you would use it in your test like this:
Copy code
python from dagster import materialize  def test_my_pipeline():     result = materialize([test_asset])     # Assertions on the result of materialization
Remember to replace the
test_asset
with the actual asset you want to test, and ensure that the asset function is decorated with
@asset
to be recognized by Dagster's asset system.
p

Peter Mburu

03/01/2024, 2:29 PM
@Scout How do i create a mock that adheres to the interface of an
AssetsDefinition
or
SourceAsset
s

Scout

03/01/2024, 2:29 PM
Creating a mock that adheres to the interface of an
AssetsDefinition
or
SourceAsset
requires understanding the expected behavior of these objects within the Dagster framework. However, the documentation does not provide explicit examples of mocking these interfaces directly. Instead, it suggests using the
materialize_to_memory
function for testing multiple assets together and provides examples of how to use mocks for resources. Here's a general approach to creating a mock based on the principles of mocking in Python, tailored to the Dagster context: 1. Understand the Interface: Determine which methods and properties of
AssetsDefinition
or
SourceAsset
are being used in your code. You will need to ensure that your mock provides these. 2. Use a Mocking Library: Use a mocking library like
unittest.mock
to create a mock object. You can specify return values for methods or properties that your code accesses. 3. Configure the Mock: Set up the mock to return the expected values or to simulate the behavior of the real object. Here's an example of how you might create a mock for an `AssetsDefinition`:
Copy code
python from unittest.mock import Mock from dagster import AssetKey  # Create a mock object for AssetsDefinition mock_assets_definition = Mock()  # Configure the mock to have an asset key mock_asset_key = AssetKey(["my_mock_asset"]) mock_assets_definition.asset_keys.return_value = [mock_asset_key]  # Configure other properties and methods as needed # For example, if your code checks for resource keys, you might do: mock_assets_definition.required_resource_keys = set()  # Use the mock in your test def test_my_pipeline():     result = materialize([mock_assets_definition])     # Assertions on the result of materialization
Please note that this is a simplified example. The actual configuration of the mock will depend on how your code interacts with the
AssetsDefinition
or
SourceAsset
. You will need to mock additional methods and properties based on the specific requirements of your tests. For more complex scenarios or if you need to mock the behavior of asset computation, you might need to create a custom class that mimics the behavior of an
AssetsDefinition
or
SourceAsset
and use instances of this class in your tests. Keep in mind that the best practice for testing Dagster assets is to use the
materialize_to_memory
function with actual asset definitions, as shown in the documentation. Mocking should be reserved for cases where you cannot use real assets or need to simulate specific conditions that are difficult to replicate with real assets.
p

Peter Mburu

03/01/2024, 2:36 PM
@Scout fix the mock for upstream assets: def test_import_imf_data(self): with conditional_logging(): with patch( "dagster_pipelines.assets.ingestion.price.imf_ingestion.remote_imf_data" ) as mock_process, patch( "dagster_pipelines.assets.ingestion.price.imf_ingestion.get_imf_datasource_documents" ) as mock_filter, patch( "dagster_pipelines.assets.ingestion.price.imf_ingestion.imf_metadata" ) as mock_metadata: mock_process.return_value.ok = True mock_process.return_value = self.mock_data mock_filter.return_value.ok = True mock_filter.return_value = self.datasourcedocument mock_metadata.return_value.ok = True mock_metadata.return_value = self.mock_metadata assets = [ imf_ingestion.remote_imf_data, imf_ingestion.normalize_remote_imf_data, imf_ingestion.validate_imf_data, imf_ingestion.import_imf_data, ] result = -materialize_to_memory(assets)
s

Scout

03/01/2024, 2:37 PM
It looks like you're trying to mock external function calls within your assets and then materialize those assets to memory for testing. However, there's a small typo in your code that needs to be fixed. The
-
before
materialize_to_memory
should be removed. Additionally, you should ensure that the
mock_data
,
datasourcedocument
, and
mock_metadata
are set up correctly to return the expected values. Here's the corrected version of your test function:
Copy code
python def test_import_imf_data(self):     with conditional_logging():         with patch(             "dagster_pipelines.assets.ingestion.price.imf_ingestion.remote_imf_data"         ) as mock_process, patch(             "dagster_pipelines.assets.ingestion.price.imf_ingestion.get_imf_datasource_documents"         ) as mock_filter, patch(             "dagster_pipelines.assets.ingestion.price.imf_ingestion.imf_metadata"         ) as mock_metadata:             mock_process.return_value.ok = True             mock_process.return_value = self.mock_data             mock_filter.return_value.ok = True             mock_filter.return_value = self.datasourcedocument             mock_metadata.return_value.ok = True             mock_metadata.return_value = self.mock_metadata             assets = [                 imf_ingestion.remote_imf_data,                 imf_ingestion.normalize_remote_imf_data,                 imf_ingestion.validate_imf_data,                 imf_ingestion.import_imf_data,             ]             result = materialize_to_memory(assets)
Make sure that
self.mock_data
,
self.datasourcedocument
, and
self.mock_metadata
are initialized before the test runs and that they contain the data structures expected by your assets. When using
materialize_to_memory
, you can also provide mock resources directly if your assets require them. If your assets depend on external resources, you should mock those resources and pass them to
materialize_to_memory
as shown in the documentation. If you encounter any issues with the
materialize_to_memory
function or with mocking, please ensure that you're following the documentation closely and that your mocks are set up to mimic the behavior of the real objects or functions as closely as possible. If the problem persists, you may need to provide more context or review the specific implementation details of your assets and the functions you're trying to mock.
p

Peter Mburu

03/01/2024, 3:32 PM
@Scout ERROR [0.004s]: test_import_imf_data (dagster_pipelines.assets.ingestion.price.tests.test_pipelines.IMFIngestionPipelineTestCase) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/pmburu/git/fdw/dagster_pipelines/assets/ingestion/price/tests/test_pipelines.py", line 239, in test_import_imf_data result = materialize_to_memory(assets) File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_core/definitions/materialize.py", line 172, in materialize_to_memory assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1125, in sequence_param return _check_iterable_items(obj, of_type, "sequence") File "/home/pmburu/.venvs/fntfdwlcl/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1768, in _check_iterable_items raise CheckError( dagster._check.CheckError: Member of sequence mismatches type. Expected (<class 'dagster._core.definitions.assets.AssetsDefinition'>, <class 'dagster._core.definitions.source_asset.SourceAsset'>). Got <MagicMock name='remote_imf_data' id='139829079873760'> of type <class 'unittest.mock.MagicMock'>. ---------------------------------------------------------------------- Ran 1 test in 0.561s FAILED (errors=1) Generating XML reports... Preserving test database for alias 'default'...
s

Scout

03/01/2024, 3:32 PM
The error you're encountering indicates that the
materialize_to_memory
function is expecting a sequence of
AssetsDefinition
or
SourceAsset
objects, but it received a
MagicMock
object instead. This typically happens when you're trying to pass a mocked object where the function expects a specific type of object related to Dagster's asset system. To resolve this issue, you need to ensure that the
assets
argument you pass to the
materialize_to_memory
function is a sequence of
AssetsDefinition
or
SourceAsset
objects. Here's what you can do: - Review the
assets
variable you're passing to
materialize_to_memory
and make sure that it contains the correct types. If you're using mocks for testing, you'll need to replace the
MagicMock
with an appropriate
AssetsDefinition
or
SourceAsset
object or a suitable mock that mimics these types. - If you're trying to mock the behavior of an asset for testing purposes, you should create a mock that adheres to the interface of an
AssetsDefinition
or
SourceAsset
. This means the mock should behave like an actual asset in the context of the
materialize_to_memory
function. - Ensure that any setup code for the test is correctly creating the assets that should be materialized. If you're using a factory or builder pattern to create test assets, double-check that the objects being produced are of the correct type. Here's an example of how you might define a test asset for use with `materialize_to_memory`:
Copy code
python from dagster import asset  @asset def test_asset():     # Your test asset logic here     return "test_data"
And then you would use it in your test like this:
Copy code
python from dagster import materialize_to_memory  def test_my_pipeline():     result = materialize_to_memory([test_asset])     # Assertions on the result of materialization
Remember to replace the
test_asset
with the actual asset you want to test, and ensure that the asset function is decorated with
@asset
to be recognized by Dagster's asset system. If you are still encountering issues, please ensure that you're following the documentation closely and that your mocks are set up to mimic the behavior of the real objects or functions as closely as possible. If the problem persists, you may need to provide more context or review the specific implementation details of your assets and the functions you're trying to mock.
p

Peter Mburu

03/01/2024, 3:34 PM
@Scout how to mock an assets that get external data in dagster testing
s

Scout

03/01/2024, 3:34 PM
To mock an asset that gets external data in Dagster testing, you can follow these steps: 1. Create Mock Data: Define the data that you expect to receive from the external source. This data should mimic the structure and content of the actual data as closely as possible. 2. Create a Mock Asset: Write a new asset function that returns the mock data instead of fetching data from the external source. Use the
@asset
decorator to define this function as an asset. 3. Use
materialize_to_memory
: Use the
materialize_to_memory
function to execute your asset graph in memory, which is useful for testing. This function allows you to materialize the results of assets into memory without interacting with external systems. 4. Write Your Test: In your test, use the mock asset in place of the real asset that fetches external data. Then, call
materialize_to_memory
with the assets you want to test. Here's an example of how you might implement this:
Copy code
python from dagster import asset, materialize_to_memory  # Define your mock data mock_external_data = {     "data": [         # ... your mock data here ...     ] }  # Create a mock asset that returns the mock data @asset def mock_external_data_asset():     return mock_external_data  # Define your downstream assets that use the external data @asset def process_data(mock_external_data_asset):     # ... process the mock data ...     return processed_data  # Write a test for your pipeline def test_pipeline():     # Materialize your assets to memory, using the mock asset     result = materialize_to_memory([mock_external_data_asset, process_data])          # Perform assertions to verify the behavior of your pipeline     assert result.success     # ... additional assertions ...
In this example,
mock_external_data_asset
is a mock asset that you would use in place of the real asset that fetches data from an external source. The
process_data
asset represents a downstream asset that depends on the data provided by the mock asset. When writing tests, you can use the
materialize_to_memory
function to execute the asset graph and verify that the assets behave as expected with the mock data. This approach allows you to test the behavior of your pipeline without relying on external data sources, which can be unpredictable and slow down your tests. Remember to replace the
mock_external_data
and
process_data
with the actual data and assets relevant to your pipeline. The key is to isolate the external dependencies by replacing them with mock assets that provide controlled input for your tests.
2 Views