schrockn
03/22/2020, 1:23 AMschrockn
03/22/2020, 1:25 AMschrockn
03/22/2020, 1:25 AMschrockn
03/22/2020, 1:25 AMTravis Cline
03/22/2020, 7:29 PMSimon Späti
03/22/2020, 9:01 PMdagster.core.errors.DagsterInvalidDefinitionError: In @pipeline networkscore_incremental_pipeline received invalid type <class 'dagster.core.definitions.composition.CallableSolidNode'> for input "tag_session" (at position 0) in solid invocation "update_tags". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
My pipeline:
@pipeline(...)
def networkscore_incremental_pipeline():
df_tag_session = load_delta_table_to_df.alias("tag_session_load_delta_table_to_df")
df = update_tags(df_tag_session, sa_simulation_of_input_nps_categories(),)
I found that the error is in load_delta_table_to_df
, if I use something different it works. I figured maybe the return of the solid is not typed as DataFrame
. Tried _`output_defs`_ and other things. What I’m missing?
my very easy update_tags-solid:
@solid(
description='''Update Tag to Session mapping table with new inputs from external applications (e.g. SA)''',
)
def update_tags(context, tag_session: DataFrame, new_tags_sessions: DataFrame) -> DataFrame:
return tag_session
and my error source in load_delta_table_to_df
, but why?
@solid(
required_resource_keys={'spark', 's3'},
description='''Loads given delta coordinates into a spark data frame''',
# output_defs=[OutputDefinition(name='df', dagster_type=DataFrame, is_optional=False),],
)
def load_delta_table_to_df(
context, delta_coordinate: DeltaCoordinate, where_conditions: String,
) -> DataFrame:
delta_path = get_s3a_path(
delta_coordinate['s3_coordinate_bucket'], delta_coordinate['s3_coordinate_key']
)
df = (
context.resources.spark.spark_session.read.format("delta")
.load(delta_path)
.where(where_conditions)
)
return df
Thanks a lot for any help, hint or others.schrockn
03/22/2020, 9:26 PMdf_tag_session = load_delta_table_to_df.alias("tag_session_load_delta_table_to_df")
you need
df_tag_session = load_delta_table_to_df.alias("tag_session_load_delta_table_to_df")()
schrockn
03/22/2020, 9:27 PMschrockn
03/22/2020, 9:27 PMschrockn
03/22/2020, 9:30 PMFrank Dekervel
03/23/2020, 10:52 AMFrank Dekervel
03/23/2020, 10:54 AMChris Roth
03/23/2020, 5:10 PMpipeline_1 | Error: A scheduler must be configured to run schedule commands.
pipeline_1 | You can configure a scheduler on your instance using dagster.yaml.
pipeline_1 | For more information, see:
even thought i have a scheduler defined in dagster.yaml:
scheduler:
module: dagster_cron.cron_scheduler
class: SystemCronScheduler
Chris Roth
03/23/2020, 5:54 PMdagster.check.CheckError: (<class 'dagster.core.launcher.RunLauncher'>, 'class RunLauncher in module dagster.core.launcher', <class 'dagster.core.serdes.ConfigurableClass'>)
Chris Roth
03/23/2020, 5:54 PMdagster.yaml
is:
run_launcher:
module: dagster.core.launcher
class: RunLauncher
Travis Cline
03/24/2020, 12:38 AMWill Brown
03/24/2020, 1:09 PMChris Roth
03/24/2020, 4:36 PMChris Roth
03/24/2020, 4:36 PMTravis Cline
03/24/2020, 6:23 PMWill Brown
03/24/2020, 6:34 PMabhi
03/24/2020, 6:40 PMChris Roth
03/24/2020, 8:28 PMChris Roth
03/24/2020, 8:28 PMMuthu
03/24/2020, 9:44 PMMuthu
03/24/2020, 9:46 PMinstance = DagsterInstance.get()
Auster Cid
03/25/2020, 3:05 PMChris Roth
03/25/2020, 5:30 PMBasil V
03/25/2020, 6:47 PMschrockn
03/25/2020, 6:47 PM