https://dagster.io/ logo
Title
d

dansasbu

08/27/2021, 2:44 PM
Hi, seems that the latest version of great-expectations (0.13.x) doesn't work with Dagster, Is there any suggestion? I would like to use the new features of GE, like modular expectations definitions. I just started my development so, I don't want to redo it again in the near future.
a

alex

08/27/2021, 3:07 PM
(0.13.x) doesn’t work with Dagster
what exactly are you seeing? we are pinned against a few specific versions that had incompatibility, but newest should work https://github.com/dagster-io/dagster/blame/master/python_modules/libraries/dagster-ge/setup.py#L38
d

dansasbu

08/27/2021, 3:24 PM
I tried with the latest one, then I tried with 0.13.25 but received the same. This is the first error for the demo from Dagster docs:
AttributeError: 'DataContextConfig' object has no attribute 'validation_operators'
If I add
validation_operator_name='action_list_operator'
to the arguments of the function ge_validation_solid_factory, I receive this one
AttributeError: 'Datasource' object has no attribute 'get_batch'
I see, for some reason, the function _get_data_context_version on data_context.py, is returning None instead of 'v3'. Since it is None it tries to use ._get_batch_v2
a

alex

08/27/2021, 3:35 PM
cc @owen
👀 2
o

owen

08/27/2021, 4:03 PM
ah ok I was finally able to reproduce this error. It seems like we're running tests against a ge project created with the v2 api, and the solids produced by the factory simply do not work for projects with a v3 config version. I'll poke around and see if there's a way to work around this, but otherwise I can put out a fix for the library for next week's release
❤️ 1
d

dansasbu

08/27/2021, 4:06 PM
Yes, it seems that the get_batch method from data_context.py is parsing in the wrong way the function arguments.
o

owen

08/27/2021, 4:20 PM
I'm not seeing a great temporary workaround for this issue, because
_get_data_context_version
is just basing the decision to use v2 or v3 on how get_batch() was called (which is determined entirely by the dagster-ge code, not the config yaml). I'll work on putting out a change that allows you to choose which config version to run your validation against (which will just take the form of a parameter on the solid factory).
❤️ 1
d

dansasbu

09/09/2021, 2:32 PM
Hi @owen did you be able to do the change?
I guess that if, in the future, validators operators are going to be replaced completely with checkpoints, the
ge_validation_solid_factory
would need to be rewrite? As part of the new modular expectations API in Great Expectations, Validation Operators are evolving into Checkpoints. At some point in the future Validation Operators will be fully deprecated. https://docs.greatexpectations.io/docs/reference/checkpoints_and_actions
o

owen

09/09/2021, 3:44 PM
hi @dansasbu -- sorry for the slower-than-advertised turnaround, I just got back from vacation. I'm working on a PR right now that will add a different v3 solid factory (with different configuration options to align better with the v3 apis). It does seem like there are a fair number of differences in how you'll need to set up your ge project in order for this to work (although maybe I'm just not familiar enough with GE?). In particular, it seems like it's now required to specify a RuntimeDataConnector in order to validate a pandas/pyspark dataframe, which did not seem to be required with the v2 apis. Again, not super familiar w/ GE, so I might be misinterpreting this.
👍 1
d

dansasbu

09/10/2021, 4:21 PM
Hi @owen In the meantime I made this solid, it works but I couldn't make it work the rendering with the ValidationResultsPageRenderer to show the results on Dagit. This is the solid
@solid(tags={"kind": "ge"})
def run_ge_validation(_context,
                      ge_context,
                      df,
                      datasource_name,
                      data_connector_name,
                      data_asset_name,
                      expectation_suite_name,
                      ):
    checkpoint = SimpleCheckpoint(name='ge_checkpoint',
                                  data_context=ge_context,
                                  batch_request={'datasource_name': datasource_name,
                                                 'data_connector_name': data_connector_name,
                                                 'data_asset_name': data_asset_name,
                                                 'batch_identifiers': {
                                                     'default_identifier_name': 'default_identifier'}})

    results = checkpoint.run(run_name=f'{expectation_suite_name} run',
                             validations=[
                                 {
                                     "batch_request": {'runtime_parameters': {'batch_data': df}},
                                     "expectation_suite_name": expectation_suite_name,
                                 }
                             ])

    yield Output(results.to_json_dict())
Do you know how can I render the results like the way you do for the validation operator? Thanks!
o

owen

09/10/2021, 7:05 PM
I think something like
validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
        rendered_document_content_list = validation_results_page_renderer.render(
            validation_results=results
        )
        md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list))

        meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results")
        yield ExpectationResult(
            success=bool(results["success"]),
            metadata_entries=[meta_stats],
        )
        yield Output(results.to_json_dict())
should work!
d

dansasbu

09/10/2021, 8:18 PM
This is the whole solid:
@solid(tags={"kind": "ge"})
def run_ge_validation(_context,
                      ge_context,
                      df,
                      datasource_name,
                      data_connector_name,
                      data_asset_name,
                      expectation_suite_name,
                      ):
    checkpoint = SimpleCheckpoint(name='ge_checkpoint',
                                  data_context=ge_context,
                                  batch_request={'datasource_name': datasource_name,
                                                 'data_connector_name': data_connector_name,
                                                 'data_asset_name': data_asset_name,
                                                 'batch_identifiers': {
                                                     'default_identifier_name': 'default_identifier'}})

    results = checkpoint.run(run_name=f'{expectation_suite_name} run',
                             validations=[
                                 {
                                     "batch_request": {'runtime_parameters': {'batch_data': df}},
                                     "expectation_suite_name": expectation_suite_name,
                                 }
                             ])

    validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)
    rendered_document_content_list = validation_results_page_renderer.render(
        validation_results=results
    )
    md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list))

    meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results")
    yield ExpectationResult(
        success=bool(results["success"]),
        metadata_entries=[meta_stats],
    )
    yield Output(results.to_json_dict())
But I receive this error:
AttributeError: 'CheckpointResult' object has no attribute 'meta'
  File "C:\Users\s2795861\Documents\envs\caps\lib\site-packages\dagster\core\execution\plan\utils.py", line 42, in solid_execution_error_boundary
    yield
  File "C:\Users\s2795861\Documents\envs\caps\lib\site-packages\dagster\utils\__init__.py", line 383, in iterate_with_context
    next_output = next(iterator)
  File "c:\users\s2795861\documents\caps\caps\CAPS\solids\solids_common_validation.py", line 176, in run_ge_validation
    rendered_document_content_list = validation_results_page_renderer.render(
  File "C:\Users\s2795861\Documents\envs\caps\lib\site-packages\great_expectations\render\renderer\page_renderer.py", line 84, in render
    run_id = validation_results.meta["run_id"]
Looks like checkpoints have that meta data in in a different way.
o

owen

09/10/2021, 8:21 PM
ah weird. sorry about that -- in that case I'm not sure how to get that info 😞