schrockn
03/25/2020, 6:48 PMschrockn
03/25/2020, 6:48 PMBasil V
03/25/2020, 6:48 PMschrockn
03/25/2020, 6:49 PMschrockn
03/25/2020, 6:49 PMBasil V
03/25/2020, 6:50 PMBasil V
03/25/2020, 11:02 PMexecute_pipeline
with step_keys_to_execute
defined in the RunConfig
however, I'm running into this error:
dagster.core.errors.DagsterStepOutputNotFoundError: When executing transform.compute discovered required outputs missing from previous step: [StepOutputHandle(step_key='load.copy_django_logs_to_redshift.compute', output_name='result')]
I thought maybe I could fix it by passing in rexecution_config
to RunConfig, but when I do I get this error:
TypeError: __new__() got an unexpected keyword argument 'rexecution_config'
Could someone give me pointers on how to run a specific pipeline step that depends on previous steps? Thanks so much really appreciate all the help.Basil V
03/26/2020, 12:08 AMBasil V
03/26/2020, 12:09 AMGaetan DELBART
03/26/2020, 9:15 AMGaetan DELBART
03/26/2020, 10:54 AMChris Roth
03/26/2020, 8:25 PM/opt/dagster/app
if you also follow the docker tutorial. i also noticed that in an older version of one of the tutorials if i'm not mistaken, the example was to create an /opt/dagster/data
directory. what is the best practice for storing data / intermediate files on the filesystem? assuming s3 or something like that is the best practice, but if i have to use a local dirChris Roth
03/26/2020, 8:31 PMChris Roth
03/26/2020, 8:31 PMOliver Mannion
03/29/2020, 12:17 AMKen
03/31/2020, 12:53 AMUndefined field "inputs" at path root:solids:augment_5 Expected: "{ outputs?: [{ result?: { json: { path: Path } pickle: { path: Path } } }] }"
Details posted in the Thread.Antoine
03/31/2020, 9:32 AMBen Smith
03/31/2020, 11:19 PMError 1: Missing required field "solids" at document config root. Available Fields: "['execution', 'loggers', 'resources', 'solids', 'storage']".
Still on serial_pipeline.py, Windows 10, Anaconda, pip installed
@pipeline
def serial_pipeline():
sort_by_calories(load_cereals())
def test_serial_pipeline():
res = execute_pipeline(serial_pipeline, environment_dict=ed)
assert res.success
if __name__ == '__main__':
result = execute_pipeline(serial_pipeline)
assert result.success
Muthu
04/01/2020, 8:33 AMOwen
04/01/2020, 2:17 PMWill Brown
04/01/2020, 5:01 PMsephi
04/01/2020, 7:48 PMdagster
with pyspark
but succeed to run dagit
with the exact same pipeline.
The below code succeeds with dagit:
(conda_env) myuser@myserver:~/projects/formatter> dagit -f dagster_pyspark/compsite_solids_types.py -n pyspark_pipeline -p 3001
Loading repository...
Serving on <http://127.0.0.1:3001> in process 73783
2020-04-01 20:45:35 - dagster - DEBUG - pyspark_pipeline - c56443eb-fd20-4d61-863a-7af6a1bfc4a4 - ENGINE_EVENT - Starting initialization of resources [spark].
event_specific_data = {"error": null, "marker_end": null, "marker_start": "resources", "metadata_entries": []}
WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2) overrides detected (/opt/cloudera/parcels/SPARK2/lib/spark2/).
WARNING: Running spark-class from user-defined location.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2020-04-01 20:45:45 - dagster - DEBUG - pyspark_pipeline - c56443eb-fd20-4d61-863a-7af6a1bfc4a4 - ENGINE_EVENT - Finished initialization of resources [spark].
event_specific_data = {"error": null, "marker_end": "resources", "marker_start": null, "metadata_entries": [["spark", "Initialized in 10.05s", ["dagster_pyspark.resources", "SystemPySparkResource"]]]}
2020-04-01 20:45:45 - dagster - DEBUG - pyspark_pipeline - c56443eb-fd20-4d61-863a-7af6a1bfc4a4 - PIPELINE_START - Started execution of pipeline "pyspark_pipeline".
But when running the same pipeline with dagster
the pyspark
initialising fails .
(conda_env) myuser@myserver:~/projects/formatter> dagster pipelione execute -f dagster_pyspark/compsite_solids_types.py -n pyspark_pipeline -e dagster_pyspark/test_pyspark.yamlsephi
04/01/2020, 7:48 PM> dagster - ERROR - pyspark_pipeline - 1d12e4e7-15e0-49f8-a394-27dcf29ef9cb - PIPELINE_INIT_FAILURE - Pipeline failure during initialization of pipeline "pyspark_pipeline". This may be due to a failure in initializing a resource or logger.
event_specific_data = {"error": ["py4j.protocol.Py4JError: An error occurred while calling <http://None.org|None.org>.apache.spark.api.python.PythonAccumulatorV2. Trace:\npy4j.Py4JException: Constructor org.apache.spark.api.python.PythonAccumulatorV2([class java.lang.String, class java.lang.Integer, class java.lang.String]) does not exist\n\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)\n\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)\n\tat py4j.Gateway.invoke(Gateway.java:237)\n\tat py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\n\tat py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n\n\n", [" File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster/core/errors.py\", line 161, in user_code_error_boundary\n yield\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster/core/execution/resources_init.py\", line 138, in single_resource_event_generator\n resource = next(gen)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 66, in pyspark_resource\n pyspark = SystemPySparkResource(init_context.resource_config['spark_conf'])\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 30, in __init__\n self._spark_session = spark_session_from_config(spark_conf)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/dagster_pyspark/resources.py\", line 25, in spark_session_from_config\n return builder.getOrCreate()\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/sql/session.py\", line 173, in getOrCreate\n sc = SparkContext.getOrCreate(sparkConf)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 367, in getOrCreate\n SparkContext(conf=conf or SparkConf())\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 136, in __init__\n conf, jsc, profiler_cls)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/pyspark/context.py\", line 207, in _do_init\n self._javaAccumulator = self._jvm.PythonAccumulatorV2(host, port, auth_token)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/py4j/java_gateway.py\", line 1525, in __call__\n answer, self._gateway_client, None, self._fqn)\n", " File \"/opt/anaconda3/envs/dask_pyarrow/lib/python3.7/site-packages/py4j/protocol.py\", line 332, in get_return_value\n format(target_id, \".\", name, value))\n"], "Py4JError", null]}
We tried to play with SPARK_HOME
and tried to use findspark in the dagster_pyspark/resources.py
file but had no success .
Any suggestions?Chris Roth
04/01/2020, 8:18 PMChris Roth
04/01/2020, 9:54 PMUndefined field "s3" at path root:storage Expected
while trying to set up s3 for intermediate storage:
running solid with this config:
storage:
s3:
config:
s3_bucket: <my bucket name>
Chris Roth
04/01/2020, 9:54 PMChris Roth
04/01/2020, 9:55 PMsystem_storage_defs=s3_plus_default_storage_defs,
Muthu
04/01/2020, 10:15 PMBasil V
04/02/2020, 12:03 AMOwen
04/02/2020, 7:28 AM