https://dagster.io/ logo
a

Alexandre Miyazaki

12/04/2020, 7:11 PM
Hey guys, quick question, I am trying to execute two solids synchronously and having some trouble. Here is my code where
generate_job_params
is a solid (which is a simple date manipulation) and
spark_solid
is another solid. What happens when I run is that I get an error because
spark_solid
needs APPLICATION_PARAMS to be defined, and it is but with a
<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x10920c790>
where I expected to be a formatted datetime.
Copy code
@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"spark": spark_resource})]
)
def pipeline():
    yd = generate_job_params()
    os.environ["APPLICATION_PARAMS"] = f"--app-name CountCSV --input-csv-path /tmp/aksmiyazaki/dummy.csv --group-column name --output-data-path /tmp/aksmiyazaki/{yd}"
    spark_solid = create_spark_solid("spark_process_simple_csv", "GroupedCount")
    spark_solid()
a

alex

12/04/2020, 7:43 PM
the
@pipeline
function is only run at initialization time, and is used to capture the structure of the DAG. The decorator replaces the function with an instance of
PipelineDefinition
I think the best path is probably to stop using
create_spark_solid
, and implement your own solid. If you look at what that solid does, all of the real work is in the resource https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-spark/dagster_spark/solids.py
so you should be able to create your own that accepts the
yd
as a proper parameter
a

Alexandre Miyazaki

12/04/2020, 7:48 PM
First of all, thanks for your reply alex! I will try to create the solid by hand.
a

alex

12/04/2020, 7:51 PM
ya if you start by copying whats linked above, all you need to do is change the input_defs section, add the corresponding input to the function, and then use it before invoking
run_spark_job
on the resource
a

Alexandre Miyazaki

12/04/2020, 7:53 PM
Nice! I will try it here. Thanks!