https://dagster.io/ logo
b

Binh Pham

07/02/2020, 8:32 PM
Hi I'm wanting to use dagster_databricks, but also use spark DataFrame serialization for intermediate storage, currently available in dagster_pyspark. Would I have to rewrite the serialization to work with dagster_databricks?
a

alex

07/02/2020, 9:37 PM
@sandy
s

sandy

07/02/2020, 9:39 PM
Hi Binh - dagster_pyspark and dagster_databricks should work fine together
What filesystem / object store are you planning to use for storing intermediates?
b

Binh Pham

07/02/2020, 10:09 PM
Hi @sandy, I'm planning to use s3 for intermediate storage. Can I target a specific cluster using dagster_databricks?
s

sandy

07/02/2020, 10:14 PM
you can configure which databricks cluster to use with
Copy code
cluster:
  existing: <your cluster id>
👍 1
you can also configure which s3 bucket to use
b

Binh Pham

07/02/2020, 10:16 PM
thanks will give it try
@sandy it was able to upload and execute on cluster, but failed to execute cluster driver error log:
Copy code
KeyError                                  Traceback (most recent call last)
<command--1> in <module>
     13 
     14 with open(filename, "rb") as f:
---> 15   exec(f.read())
     16 

<string> in <module>

<string> in main(step_run_ref_filepath, pipeline_zip)

<string> in setup_storage(step_run_ref)

KeyError: 'storage'
s

sandy

07/02/2020, 11:51 PM
in the configuration for the step launcher resource, you need to specify which storage you're using, e.g.
Copy code
resources:
  pyspark_step_launcher:
    config:
      storage: s3
b

Binh Pham

07/02/2020, 11:54 PM
I have it set, and setup scope secrets on databricks
s

sandy

07/03/2020, 12:43 AM
would you be comfortable sharing your full run config?
b

Binh Pham

07/03/2020, 2:53 AM
s

sandy

07/03/2020, 4:36 PM
ah, so this is a little bit annoying, but you actually need to specify storage in two places. you've specified the s3 storage that you can access from within databricks. you also need to specify the storage that's used for storing the intermediate values that are passed between solids. here's an example (
storage
goes at the same level as
resources
):
Copy code
storage:
  s3:
    config:
      s3_bucket: dagster-scratch-80542c2
      s3_prefix: simple-pyspark
oops - and last of all - I believe you'll need an s3 resource in your mode definition, as in this example: https://docs.dagster.io/examples/emr_pyspark
b

Binh Pham

07/06/2020, 5:14 PM
thanks that worked!
s

sandy

07/06/2020, 5:14 PM
awesome
b

Binh Pham

07/06/2020, 7:16 PM
When I'm reexecuting a pipeline in dagit, it throws a parent id not found error. Is this a drawback from using existing clusters?
Copy code
DagsterRunNotFoundError                   Traceback (most recent call last)
<command--1> in <module>
     13 
     14 with open(filename, "rb") as f:
---> 15   exec(f.read())
     16 

<string> in <module>

<string> in main(step_run_ref_filepath, pipeline_zip)

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/plan/external_step.py in run_step_from_ref(step_run_ref)
    188 
    189 def run_step_from_ref(step_run_ref):
--> 190     step_context = step_run_ref_to_step_context(step_run_ref)
    191     return core_dagster_event_sequence_for_step(step_context, step_run_ref.prior_attempts_count)

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/plan/external_step.py in step_run_ref_to_step_context(step_run_ref)
    175         DagsterInstance.ephemeral(),
    176     )
--> 177     for _ in initialization_manager.generate_setup_events():
    178         pass
    179     pipeline_context = initialization_manager.get_object()

/databricks/python/lib/python3.7/site-packages/dagster/utils/__init__.py in generate_setup_events(self)
    373         try:
    374             while self.object is None:
--> 375                 obj = next(self.generator)
    376                 if isinstance(obj, self.object_cls):
    377                     self.object = obj

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/context_creation_pipeline.py in pipeline_initialization_event_generator(execution_plan, run_config, pipeline_run, instance, scoped_resources_builder_cm, system_storage_data, raise_on_error)
    214         else:
    215             # pipeline teardown failure
--> 216             raise dagster_error
    217 
    218         if raise_on_error:

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/context_creation_pipeline.py in pipeline_initialization_event_generator(execution_plan, run_config, pipeline_run, instance, scoped_resources_builder_cm, system_storage_data, raise_on_error)
    188         )
    189 
--> 190         _validate_plan_with_context(pipeline_context, execution_plan)
    191 
    192         yield pipeline_context

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/context_creation_pipeline.py in _validate_plan_with_context(pipeline_context, execution_plan)
    222 # perform any plan validation that is dependent on access to the pipeline context
    223 def _validate_plan_with_context(pipeline_context, execution_plan):
--> 224     validate_reexecution_memoization(pipeline_context, execution_plan)
    225 
    226 

/databricks/python/lib/python3.7/site-packages/dagster/core/execution/memoization.py in validate_reexecution_memoization(pipeline_context, execution_plan)
     24         raise DagsterRunNotFoundError(
     25             'Run id {} set as parent run id was not found in instance'.format(parent_run_id),
---> 26             invalid_run_id=parent_run_id,
     27         )
     28 

DagsterRunNotFoundError: Run id 5c34579a-2876-497c-9127-d1697f134c2d set as parent run id was not found in instance
s

sandy

07/06/2020, 10:31 PM
hmm, I'm not familiar with this as a related issue - what are you using for run storage on your instance?
b

Binh Pham

07/06/2020, 10:59 PM
the base storage config for intermediates? I'm using s3