Scott Peters
06/16/2021, 10:20 PMdagster
will be running as a background service, where end users are simply passing arguments to some kind of cli
. Are there methods or practices within dagster
that lower the front-end complexity such that we don't have to map every argument so explicitly? This is especially necessary for our tooling since the methods will always required user input for every run and will not likely be tied to any schedulerowen
06/16/2021, 10:29 PM@resource(config_schema=Permissive({'thing':str, 'other_thing':str}))
etc.Scott Peters
06/16/2021, 10:30 PMowen
06/16/2021, 10:30 PMScott Peters
06/16/2021, 10:30 PM@pipeline
def pipe():
this_value = do_something()
print(this_value)
do_another_thing(do_something())
do_another_thing(do_something())
it seems to actually unpack that value, in this case a string... is there a reason that values returned by solids are not accessible from the pipeline scope?path <dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7faf265c9ee0>
/my/resolved/path
do_a_thing(do_another_thing(do_other_things())))
AttributeError: 'InvokedSolidOutputHandle' object has no attribute 'split'
InvokedSolidOutputHandle
owen
06/16/2021, 10:45 PMScott Peters
06/16/2021, 10:46 PMget_path() -> str
and
exists() -> bool
pipe():
path = get_path()
exists(path)
when the exists
method get's ahold of the path
value, it attempts to treat it like a string, but it is, of course still an instance of SolidOutput
str
from the first solid?solids
and use them as input for others without having to nest them solid_1(solid_2())
owen
06/16/2021, 10:59 PM@pipeline:
def my_pipe():
path = get_path()
exists(path)
some_other_solid(path)
will work just fineScott Peters
06/16/2021, 11:02 PM@solid(
config_schema={'create_data' : dict},
required_resource_keys={'create_data'}
)
def get_path(context) -> str:
endpoint = context.resources.create_data.get('endpoint')
collection = context.resources.create_data.get('collection')
asset_type = context.resources.create_data.get('asset_type')
asset_name = context.resources.create_data.get('asset_name')
path = f'{endpoint}/{collection}/{asset_type}/{asset_name}'
print('path', path)
return path
tokens = path.split(self.path_delimiter)
AttributeError: 'InvokedSolidOutputHandle' object has no attribute 'split'
def create():
path = get_path()
exists(path)
{'solid_name': 'get_path', 'output_name': 'result'}
owen
06/16/2021, 11:06 PMScott Peters
06/16/2021, 11:08 PMrclone
, so it does some string parsingif __name__ == '__main__':
args = {
'endpoint':'/home/speters',
'collection': 'stage_manager',
'asset_type': 'model',
'asset_name': 'billy'
}
output = execute_pipeline(
create,
run_config={
'resources': {
'create_data':{
'config': args
}
},
'solids':{
'get_path':{
'config':{
'create_data': args
}
}
}
},
instance=DagsterInstance.get()
)
if __name__ == '__main__':
args = {
'endpoint':'/home/speters',
'collection': 'stage_manager',
'asset_type': 'model',
'asset_name': 'billy'
}
output = execute_pipeline(
create,
run_config={
'resources': {
'create_data':{
'config': args
}
},
'solids':{
'get_path':{
'config':{
'create_data': args
}
},
'exists': {'config': {}}
}
},
instance=DagsterInstance.get()
)
exists
in the solids
dictowen
06/16/2021, 11:12 PMScott Peters
06/16/2021, 11:13 PMdef exists(path: str) -> bool:
rco = RcloneObject(path)
return rco.exists
owen
06/16/2021, 11:14 PMScott Peters
06/16/2021, 11:15 PMprinting
values as they come back?path = get_path()
print(path.result)
@solid
decoratorowen
06/16/2021, 11:17 PMScott Peters
06/16/2021, 11:19 PMowen
06/16/2021, 11:19 PMScott Peters
06/16/2021, 11:19 PMowen
06/16/2021, 11:20 PMScott Peters
06/16/2021, 11:20 PMowen
06/16/2021, 11:22 PM