https://dagster.io/ logo
d

dwall

02/14/2020, 12:18 AM
can user-defined typed be used for solid config?
a

alex

02/14/2020, 12:19 AM
in short no, config schema and dagster types are separate
d

dwall

02/14/2020, 12:19 AM
darn. So I take that to mean I can't do any kind of custom type checking on solid config either
a

alex

02/14/2020, 12:19 AM
can you sketch out an example of what you are trying to do?
you might be looking for
input_hydration_config
- which is how you define loading inputs via the config system
d

dwall

02/14/2020, 12:22 AM
of course. I'm writing a number of solids that leverage a Google resource (basically a wrapper around a custom GoogleClient I wrote). Part of this clients job is handle oauth flow and I want to essentially write the resource such that the user provides a pickled google oauth Credentials object (https://google-auth.readthedocs.io/en/stable/reference/google.oauth2.credentials.html#google.oauth2.credentials.Credentials) as config
I just wanted to see if I could write a custom dagster type for that google Credentials object and do some type checking on the resource config
everywhere above where I said solid config I meant resource config btw
m

max

02/14/2020, 12:23 AM
i believe that's all very achievable with the input hydration config
d

dwall

02/14/2020, 12:24 AM
interesting - I'll check it out
Okay, can't quite figure out what I'm doing wrong here. Here is the code for my custom Dagster type:
Copy code
def google_oauth_credentials_type_check(_, value):
    try:
        pickle.load(value)
    except:
        return False
    else:
        return True

@input_hydration_config(
    config_cls=Selector({"credentials": Field(Path)})
)
def google_oauth_credentials_hydration_config(context, selector):
    return pickle.load(selector["credentials"])


GoogleCredentials = DagsterType(
    name="GoogleCredentials",
    type_check_fn=google_oauth_credentials_type_check,
    input_hydration_config=google_oauth_credentials_hydration_config,
    description="A dagster type representing a pickled Google OAuth [Credentials object](<https://google-auth.readthedocs.io/en/stable/reference/google.oauth2.credentials.html#google.oauth2.credentials.Credentials>)."
)
and here is the code referencing the type in config:
Copy code
@resource(
    config={
        "client_id": Field(config=String, is_required=True, description="A valid google client id."),
        "client_secret": Field(config=String, is_required=True, description="A valid google client secret."),
        "credentials": Field(config=GoogleCredentials, is_required=True, description="A pickled google oauth credentials object.")
    }
)
def google_sheets_resource(context) -> GoogleSheetsClient:
    return GoogleSheetsClient(
        client_id=context.resource_config["client_id"], client_secret=context.resource_config["client_secret"]
    )

@pipeline(
    description="A dagster pipeline orchestrating the loading of a variety of VC-related google sheets into Snowflake with subsequent running of dependent dbt models.",
    mode_defs=[
        ModeDefinition(
            name="dev",
            description="A mode meant to be used during development.",
            resource_defs={"google_sheets": google_sheets_resource},
            logger_defs={"console": colored_console_logger},
        ),
    ],
    preset_defs=[
        PresetDefinition(
            name="dev",
            environment_dict={
                "loggers": {"console": {"config": {"log_level": "DEBUG"}}},
                "resources": {
                    "google_sheets": {
                        "config": {
                            "client_id": os.getenv("GOOGLE_CLIENT_ID"),
                            "client_secret": os.getenv("GOOGLE_CLIENT_SECRET"),
                            "credentials": "google.token.fake.pickle"
                        }
                    }
                },
                "solids": {
                    "gsheet_to_dataframe": {
                        "config": {
                            "spreadsheet_id": "1yoigoSE2CGNAHPNqse7Ml7BCWujfKISWeu4HPIDkbhU",
                            "range": "!TEST:C1:D1",
                        },
                    },
                },
            },
            mode="dev",
        ),
    ],
)
def test_gsheet_client():
a

alex

02/14/2020, 5:33 PM
whats the error youre getting
d

dwall

02/14/2020, 5:33 PM
yep incoming
Copy code
Traceback (most recent call last):
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/__init__.py", line 38, in main
    cli(obj={})  # pylint:disable=E1123
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/pipeline.py", line 313, in pipeline_execute_command
    return execute_execute_command_with_preset(preset, kwargs, mode)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/pipeline.py", line 326, in execute_execute_command_with_preset
    pipeline = handle_for_pipeline_cli_args(cli_args).build_pipeline_definition()
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 531, in build_pipeline_definition
    obj = self.entrypoint.perform_load()
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 554, in entrypoint
    return self.data.get_pipeline_entrypoint(from_handle=self)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 661, in get_pipeline_entrypoint
    return LoaderEntrypoint.from_file_target(
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 232, in from_file_target
    module = imp.load_source(module_name, python_file)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 702, in _load
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/dwall/repos/dataland-dagster/pipelines/test_gsheet_client.py", line 6, in <module>
    from dagster_google import google_sheets_resource, gsheet_to_dataframe
  File "/Users/dwall/repos/dataland-dagster/dagster_google/__init__.py", line 1, in <module>
    from .resources import google_sheets_resource
  File "/Users/dwall/repos/dataland-dagster/dagster_google/resources.py", line 19, in <module>
    "credentials": Field(config=GoogleCredentials, is_required=True, description="A pickled google oauth credentials object.")
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 208, in __init__
    self.config_type = check.inst(self._resolve_config_arg(config), ConfigType)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 187, in _resolve_config_arg
    config_type = resolve_to_config_type(config)
  File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 93, in resolve_to_config_type
    raise DagsterInvariantViolationError(
dagster.core.errors.DagsterInvariantViolationError: Cannot resolve Dagster Type GoogleCredentials to a config type. Repr of type: <dagster.core.types.dagster_type.DagsterType object at 0x10715a280>
a

alex

02/14/2020, 5:34 PM
“credentials”: Field(config=GoogleCredentials
this line from your resource is the source of the error
it doesn’t look like you are using it either
d

dwall

02/14/2020, 5:36 PM
right but is there any indication of why what I'm doing isn't allowed?
Im failing to understand how to leverage input hydration config to load resource configs into user-defined types I think
a

alex

02/14/2020, 5:37 PM
this (should be improved error message
Cannot resolve Dagster Type GoogleCredentials to a config type
is trying to indicate that you are passing a
Dagster Type
in a place expecting
config schema
which is not valid
d

dwall

02/14/2020, 5:37 PM
ohhhh
a

alex

02/14/2020, 5:38 PM
youre custom type
GoogleCredentials
is to be used in
solid
s
one sec
d

dwall

02/14/2020, 5:38 PM
ah okay I think thats the misunderstanding. So I can't use custom types to load resource config
a

alex

02/14/2020, 5:39 PM
right - you could just do more complicated stuff in your resource ie take the
credentials_path
and do the pickle load in the resource funciton
or you could use your custom type as an input to your
solid
d

dwall

02/14/2020, 5:41 PM
got it got it
Okay this makes sense
so all of my type checking and stuff like that would need to be done in the instantiation of the resource
a

alex

02/14/2020, 5:45 PM
Copy code
# given your dagster type GoogleCredentials from above, you can use it in a solid

@solid(required_resource_keys={'google_sheets'}, config={'spreadsheet_id'})
def example(context, credentials: GoogleCredentials):
    # use credentials somehow
    context.resources.google_sheets.do_a_thing(context.solid_config['spreadsheet_id'])

# the input hydration means you can now provide the input via config like this:
environment_config={
  'solids': {
    'example': {
        'inputs': {
            'credentials': {
                'credentials': 'path/to/thing' # might want to name the selector key "path" instead
            }
        }
    }
  }
}

# or you can return it from another solid and pass it over
@lambda_solid
def load_creds() -> GoogleCredentials:
    return ...
d

dwall

02/14/2020, 5:46 PM
yeah, that makes sense. However, it doesnt intuitively make sense to have credentials be an input to a solid. I feel like it makes more sense to have the credentials be a config to a resource
which is why I was aiming to be able to define custom typed for resource config
a

alex

02/14/2020, 5:46 PM
agree
d

dwall

02/14/2020, 5:46 PM
but its cool - I can do the type checking at instantiation I think
doesnt seem like there are limitations for when you can do
TypeCheck
events
a

alex

02/14/2020, 5:47 PM
we dont let you
yield
any events from those resource functions
d

dwall

02/14/2020, 5:47 PM
ah womp
a

alex

02/14/2020, 5:47 PM
so they wont make there way to the event stream - but you could use some of those utilities - its just up to you to throw if the resource init fails
d

dwall

02/14/2020, 5:47 PM
all right well I'll probably punt on this then. Thanks for the help @alex
a

alex

02/14/2020, 5:49 PM
ya see how just doing the checks you need in the resource function goes
e

Eric

02/18/2020, 7:02 PM
@dwall I've been working with Dagster in an attempt to corral some of our script soup. One usecase we have is executing a SQL query and pulling data from our data warehouse nightly. After reading through the docs and examples I created a custom resource which accepts the credentials for connecting to our data warehouse. I could show you a bit more of how it all connects but before I do here is an example of the pipeline_config.yaml . Obviously, instead of supplying the actual username and password looks like you were trying to give it a path to a pickle but Is this essentially what you were trying to get at with your Google creds ? example of our custom resource (taken from the dagster pandas example)
Copy code
resources:
  db_info:
    config:
      db2_username: username
      db2_password: password
      db2_db_name: db_name
solids:
  get_sql_data:
    inputs:
      query:
        value: >-
          SELECT
            *
          FROM
            TABLE
and are aiming for something like this ?:
Copy code
resources:
  googlecreds:
    config:
      path: ...
d

dwall

02/18/2020, 7:07 PM
yep - essentially. I was hoping to do some type checking that just makes sure that the file at the Path is a pickled Google Credentials object. I'm achieving approx. the same functionality just by validating at instantiation:
Copy code
@attr.s
class GoogleClient(object):
    credentials: Path = attr.ib()

    @credentials.validator
    def google_credentials_validator(self, attribute, value):
        if not os.path.exists(value):
            raise FileNotFoundError(f"Credentials file at path {value} could not be found.")
        else:
            try:
                with open(value, "rb") as f:
                    creds = pickle.load(f)
            except EOFError as e:
                raise EOFError(
                    f"Could not open credentials file at path {value}. Check to make sure file is not blank: {e}"
                )
            except TypeError as e:
                raise TypeError(f"Credentials file at path {value} must be pickled: {e}")
            except pickle.UnpicklingError as e:
                raise pickle.UnpicklingError(f"Credentials file at path {value} not pickled correctly: {e}")

            if not isinstance(creds, Credentials):
                raise TypeError(
                    f"Credentials file at path {value} must be a pickled instance of a google auth Credentials object."
a

alex

02/18/2020, 7:09 PM
did you verify that these errors get propagated up through dagster effectively? I believe that they should.
you should get a PIPELINE_INIT_FAILURE with the right error class, message & stack trace
d

dwall

02/18/2020, 7:10 PM
yep they do
a

alex

02/18/2020, 7:10 PM
sweeeeeeet
e

Eric

02/18/2020, 7:28 PM
Looks like you've got it all figured out already which is great ! As a summary for others that might be lurking, here is a condensed example which again was largely copied from the examples already in the dagster git repo.
Copy code
# start with a custom data type. In this case a named tuple that stores the sqlalchemy engine
from collections import namedtuple

DbInfo = namedtuple('DbInfo', 'engine url dialect host port db_name')


# functions to be used when creating the custom resource
def create_db2_db_url(username, password, hostname, port, db_name):
    return 'db2://{username}:{password}@{hostname}:{port}/{db_name}'.format(
        username=username, password=password, hostname=hostname, port=port, db_name=db_name
    )

def create_sql_engine(db_url):
    return sqlalchemy.create_engine(db_url)

# define the resource that returns an instance of our custom data type.
# in this case, connect to a SQL db. but instead of username, password, etc it could be the path to a pickle for google creds with type checking. e.g
#
# @resource(
#     config={
#         'path': Field(str),
#     }
# )
@resource(
    config={
        'db2_username': Field(str),
        'db2_password': Field(str),
        'db2_hostname': Field(str, default_value="servername.domain", is_optional=True),
        'db2_port': Field(int, default_value=1234, is_optional=True),
        'db2_db_name': Field(str)
    }
)
def db2_db_info_resource(init_context):
    host = init_context.resource_config['db2_hostname']
    db_name = init_context.resource_config['db2_db_name']
    port = init_context.resource_config['db2_port']

    db_url = create_db2_db_url(
        init_context.resource_config['db2_username'],
        init_context.resource_config['db2_password'],
        host,
        port,
        db_name
    )
    
    # return instance of the custom datatype. note that engine is a sqlalchemy engine
    return DbInfo(
        url=db_url,
        engine=create_sql_engine(db_url),
        dialect='db2',
        host=host,
        port=port,
        db_name=db_name
    )

# finally, when defining the pipeline add an entry to the 'resources_defs' keyword arg
@pipeline(
    mode_defs=[
        ModeDefinition(
            name="local",
            resource_defs={
                'db_info': db2_db_info_resource 
            }
        )
    ]
)
def hello_db2_pipeline():
    get_sql_data = sql_solid.alias("get_sql_data")
    sql_df_row_count(get_sql_data())
the yaml config for this pipeline would then look something like this where you can specify the custom resource:
Copy code
resources:
  db_info:
    config:
      db2_username: username
      db2_password: password
      db2_db_name: db_name
solids:
  get_sql_data:
    inputs:
      query:
        value: >-
          SELECT
            *
          FROM
            TABLE
a

alex

02/18/2020, 7:33 PM
If you are feeling adventurous you could checkout this custom config type that allows a value or
env: ENV_VAR
to load from the environment https://dagster.phacility.com/D1956 . We’re not confident in the “custom” config type api yet so reasonable chance this will break / move.
5 Views