dwall
10/09/2019, 4:26 PM@resource(config={
'token': Field(
String,
description='Your Stitch Connect API Token.',
is_optional=False,
is_secret=True
)},
description='This resource is for connecting to the Stitch Data Connect API.'
)
def stitch_resource(context):
return PyStitch(token=context.resource_config.get("token"))
@solid(required_resource_keys={'stitch'})
def get_stitch_streams(context, sources: List[Int]) -> List:
available_streams = []
for source in context.resources.stitch.list_sources():
for stream in context.resources.stitch.list_streams(source_id=source.get("id")):
data = (source.get("name"), stream.get("stream_name"))
available_streams.append(data)
return available_streams
@pipeline(
mode_defs=[
ModeDefinition(resource_defs={'stitch': stitch_resource})
]
)
def resources_pipeline():
get_stitch_streams()
environment_dict = {
'resources': {
'stitch': {
'config': {
'token': STITCH_TOKEN
}
}
},
'solids': {
'list_stitch_sources': {
'inputs': {
'sources': {
'value': [1234]
}
}
}
}
}
if __name__ == '__main__':
execute_pipeline(
resources_pipeline,
environment_dict=environment_dict
)
agster.core.errors.DagsterInvalidConfigError: Pipeline "resources_pipeline" config errors:
Error 1: Type failure at path "root:solids:list_stitch_sources:inputs:sources" on type "None". Value at path root:solids:list_stitch_sources:inputs:sources must be list. Expected: [{ json: { path: Path } pickle: { path: Path } value: Int }].
List[Int]
via the python API?max
10/09/2019, 4:27 PMdwall
10/09/2019, 4:28 PMmax
10/09/2019, 5:12 PMenvironment_dict = {
'resources': {
'stitch': {
'config': {
'token': STITCH_TOKEN
}
}
},
'solids': {
'list_stitch_sources': {
'inputs': {
'sources': [{'value': 1234}]
}
}
}
}
[{'path': '/foo.csv', 'sep': ';', ...}, ...]