dwall
02/14/2020, 12:18 AMalex
02/14/2020, 12:19 AMdwall
02/14/2020, 12:19 AMalex
02/14/2020, 12:19 AMinput_hydration_config
- which is how you define loading inputs via the config systemdwall
02/14/2020, 12:22 AMmax
02/14/2020, 12:23 AMdwall
02/14/2020, 12:24 AMdef google_oauth_credentials_type_check(_, value):
try:
pickle.load(value)
except:
return False
else:
return True
@input_hydration_config(
config_cls=Selector({"credentials": Field(Path)})
)
def google_oauth_credentials_hydration_config(context, selector):
return pickle.load(selector["credentials"])
GoogleCredentials = DagsterType(
name="GoogleCredentials",
type_check_fn=google_oauth_credentials_type_check,
input_hydration_config=google_oauth_credentials_hydration_config,
description="A dagster type representing a pickled Google OAuth [Credentials object](<https://google-auth.readthedocs.io/en/stable/reference/google.oauth2.credentials.html#google.oauth2.credentials.Credentials>)."
)
@resource(
config={
"client_id": Field(config=String, is_required=True, description="A valid google client id."),
"client_secret": Field(config=String, is_required=True, description="A valid google client secret."),
"credentials": Field(config=GoogleCredentials, is_required=True, description="A pickled google oauth credentials object.")
}
)
def google_sheets_resource(context) -> GoogleSheetsClient:
return GoogleSheetsClient(
client_id=context.resource_config["client_id"], client_secret=context.resource_config["client_secret"]
)
@pipeline(
description="A dagster pipeline orchestrating the loading of a variety of VC-related google sheets into Snowflake with subsequent running of dependent dbt models.",
mode_defs=[
ModeDefinition(
name="dev",
description="A mode meant to be used during development.",
resource_defs={"google_sheets": google_sheets_resource},
logger_defs={"console": colored_console_logger},
),
],
preset_defs=[
PresetDefinition(
name="dev",
environment_dict={
"loggers": {"console": {"config": {"log_level": "DEBUG"}}},
"resources": {
"google_sheets": {
"config": {
"client_id": os.getenv("GOOGLE_CLIENT_ID"),
"client_secret": os.getenv("GOOGLE_CLIENT_SECRET"),
"credentials": "google.token.fake.pickle"
}
}
},
"solids": {
"gsheet_to_dataframe": {
"config": {
"spreadsheet_id": "1yoigoSE2CGNAHPNqse7Ml7BCWujfKISWeu4HPIDkbhU",
"range": "!TEST:C1:D1",
},
},
},
},
mode="dev",
),
],
)
def test_gsheet_client():
alex
02/14/2020, 5:33 PMdwall
02/14/2020, 5:33 PMTraceback (most recent call last):
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/bin/dagster", line 8, in <module>
sys.exit(main())
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/__init__.py", line 38, in main
cli(obj={}) # pylint:disable=E1123
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/pipeline.py", line 313, in pipeline_execute_command
return execute_execute_command_with_preset(preset, kwargs, mode)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/cli/pipeline.py", line 326, in execute_execute_command_with_preset
pipeline = handle_for_pipeline_cli_args(cli_args).build_pipeline_definition()
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 531, in build_pipeline_definition
obj = self.entrypoint.perform_load()
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 554, in entrypoint
return self.data.get_pipeline_entrypoint(from_handle=self)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 661, in get_pipeline_entrypoint
return LoaderEntrypoint.from_file_target(
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 232, in from_file_target
module = imp.load_source(module_name, python_file)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 702, in _load
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/dwall/repos/dataland-dagster/pipelines/test_gsheet_client.py", line 6, in <module>
from dagster_google import google_sheets_resource, gsheet_to_dataframe
File "/Users/dwall/repos/dataland-dagster/dagster_google/__init__.py", line 1, in <module>
from .resources import google_sheets_resource
File "/Users/dwall/repos/dataland-dagster/dagster_google/resources.py", line 19, in <module>
"credentials": Field(config=GoogleCredentials, is_required=True, description="A pickled google oauth credentials object.")
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 208, in __init__
self.config_type = check.inst(self._resolve_config_arg(config), ConfigType)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 187, in _resolve_config_arg
config_type = resolve_to_config_type(config)
File "/Users/dwall/.local/share/virtualenvs/dataland-dagster-Z2VR7MFq/lib/python3.8/site-packages/dagster/config/field.py", line 93, in resolve_to_config_type
raise DagsterInvariantViolationError(
dagster.core.errors.DagsterInvariantViolationError: Cannot resolve Dagster Type GoogleCredentials to a config type. Repr of type: <dagster.core.types.dagster_type.DagsterType object at 0x10715a280>
alex
02/14/2020, 5:34 PM“credentials”: Field(config=GoogleCredentialsthis line from your resource is the source of the error
dwall
02/14/2020, 5:36 PMalex
02/14/2020, 5:37 PMCannot resolve Dagster Type GoogleCredentials to a config typeis trying to indicate that you are passing a
Dagster Type
in a place expecting config schema
which is not validdwall
02/14/2020, 5:37 PMalex
02/14/2020, 5:38 PMGoogleCredentials
is to be used in solid
sdwall
02/14/2020, 5:38 PMalex
02/14/2020, 5:39 PMcredentials_path
and do the pickle load in the resource funcitonsolid
dwall
02/14/2020, 5:41 PMalex
02/14/2020, 5:45 PM# given your dagster type GoogleCredentials from above, you can use it in a solid
@solid(required_resource_keys={'google_sheets'}, config={'spreadsheet_id'})
def example(context, credentials: GoogleCredentials):
# use credentials somehow
context.resources.google_sheets.do_a_thing(context.solid_config['spreadsheet_id'])
# the input hydration means you can now provide the input via config like this:
environment_config={
'solids': {
'example': {
'inputs': {
'credentials': {
'credentials': 'path/to/thing' # might want to name the selector key "path" instead
}
}
}
}
}
# or you can return it from another solid and pass it over
@lambda_solid
def load_creds() -> GoogleCredentials:
return ...
dwall
02/14/2020, 5:46 PMalex
02/14/2020, 5:46 PMdwall
02/14/2020, 5:46 PMTypeCheck
eventsalex
02/14/2020, 5:47 PMyield
any events from those resource functionsdwall
02/14/2020, 5:47 PMalex
02/14/2020, 5:47 PMdwall
02/14/2020, 5:47 PMalex
02/14/2020, 5:49 PMEric
02/18/2020, 7:02 PMresources:
db_info:
config:
db2_username: username
db2_password: password
db2_db_name: db_name
solids:
get_sql_data:
inputs:
query:
value: >-
SELECT
*
FROM
TABLE
and are aiming for something like this ?:
resources:
googlecreds:
config:
path: ...
dwall
02/18/2020, 7:07 PM@attr.s
class GoogleClient(object):
credentials: Path = attr.ib()
@credentials.validator
def google_credentials_validator(self, attribute, value):
if not os.path.exists(value):
raise FileNotFoundError(f"Credentials file at path {value} could not be found.")
else:
try:
with open(value, "rb") as f:
creds = pickle.load(f)
except EOFError as e:
raise EOFError(
f"Could not open credentials file at path {value}. Check to make sure file is not blank: {e}"
)
except TypeError as e:
raise TypeError(f"Credentials file at path {value} must be pickled: {e}")
except pickle.UnpicklingError as e:
raise pickle.UnpicklingError(f"Credentials file at path {value} not pickled correctly: {e}")
if not isinstance(creds, Credentials):
raise TypeError(
f"Credentials file at path {value} must be a pickled instance of a google auth Credentials object."
alex
02/18/2020, 7:09 PMdwall
02/18/2020, 7:10 PMalex
02/18/2020, 7:10 PMEric
02/18/2020, 7:28 PM# start with a custom data type. In this case a named tuple that stores the sqlalchemy engine
from collections import namedtuple
DbInfo = namedtuple('DbInfo', 'engine url dialect host port db_name')
# functions to be used when creating the custom resource
def create_db2_db_url(username, password, hostname, port, db_name):
return 'db2://{username}:{password}@{hostname}:{port}/{db_name}'.format(
username=username, password=password, hostname=hostname, port=port, db_name=db_name
)
def create_sql_engine(db_url):
return sqlalchemy.create_engine(db_url)
# define the resource that returns an instance of our custom data type.
# in this case, connect to a SQL db. but instead of username, password, etc it could be the path to a pickle for google creds with type checking. e.g
#
# @resource(
# config={
# 'path': Field(str),
# }
# )
@resource(
config={
'db2_username': Field(str),
'db2_password': Field(str),
'db2_hostname': Field(str, default_value="servername.domain", is_optional=True),
'db2_port': Field(int, default_value=1234, is_optional=True),
'db2_db_name': Field(str)
}
)
def db2_db_info_resource(init_context):
host = init_context.resource_config['db2_hostname']
db_name = init_context.resource_config['db2_db_name']
port = init_context.resource_config['db2_port']
db_url = create_db2_db_url(
init_context.resource_config['db2_username'],
init_context.resource_config['db2_password'],
host,
port,
db_name
)
# return instance of the custom datatype. note that engine is a sqlalchemy engine
return DbInfo(
url=db_url,
engine=create_sql_engine(db_url),
dialect='db2',
host=host,
port=port,
db_name=db_name
)
# finally, when defining the pipeline add an entry to the 'resources_defs' keyword arg
@pipeline(
mode_defs=[
ModeDefinition(
name="local",
resource_defs={
'db_info': db2_db_info_resource
}
)
]
)
def hello_db2_pipeline():
get_sql_data = sql_solid.alias("get_sql_data")
sql_df_row_count(get_sql_data())
the yaml config for this pipeline would then look something like this where you can specify the custom resource:
resources:
db_info:
config:
db2_username: username
db2_password: password
db2_db_name: db_name
solids:
get_sql_data:
inputs:
query:
value: >-
SELECT
*
FROM
TABLE
alex
02/18/2020, 7:33 PMenv: ENV_VAR
to load from the environment https://dagster.phacility.com/D1956 . We’re not confident in the “custom” config type api yet so reasonable chance this will break / move.