Alexandre Miyazaki
12/04/2020, 7:11 PMgenerate_job_params
is a solid (which is a simple date manipulation) and spark_solid
is another solid. What happens when I run is that I get an error because spark_solid
needs APPLICATION_PARAMS to be defined, and it is but with a <dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x10920c790>
where I expected to be a formatted datetime.
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"spark": spark_resource})]
)
def pipeline():
yd = generate_job_params()
os.environ["APPLICATION_PARAMS"] = f"--app-name CountCSV --input-csv-path /tmp/aksmiyazaki/dummy.csv --group-column name --output-data-path /tmp/aksmiyazaki/{yd}"
spark_solid = create_spark_solid("spark_process_simple_csv", "GroupedCount")
spark_solid()
alex
12/04/2020, 7:43 PM@pipeline
function is only run at initialization time, and is used to capture the structure of the DAG. The decorator replaces the function with an instance of PipelineDefinition
alex
12/04/2020, 7:44 PMcreate_spark_solid
, and implement your own solid. If you look at what that solid does, all of the real work is in the resource
https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-spark/dagster_spark/solids.pyalex
12/04/2020, 7:44 PMyd
as a proper parameterAlexandre Miyazaki
12/04/2020, 7:48 PMalex
12/04/2020, 7:51 PMrun_spark_job
on the resourceAlexandre Miyazaki
12/04/2020, 7:53 PM