Hello team, we are experiencing some weird behavio...
# ask-community
d
Hello team, we are experiencing some weird behaviour with the dagster
pyspark_step_launcher
and
DynamicPartitions
/ partitions overall. Our setup is as follows: • File partitions based on DynamicPartitionsDefinition • Asset1: Executed locally, uses partition via
partitions_def
in the
@asset
decorator • Asset2: Executed in our databricks instance, using
pyspark_step_launcher
, using partition via
partitions_def
in the
@asset
decorator ◦ this asset depends on Asset1, by having it as an input parameter in the function • Dagster-Version: 1.3.4. I will post a minimal example in the comments, it should make it easy to replicate the error. This is the error message we end up with:
Copy code
dagster._core.errors.DagsterInvalidInvocationError: Partition range <our_partition_key> to <our_partition_key> is
                not a valid range. Nonexistent partition keys:
                [<our_partition_key>]
Clearly, Dagster cannot find the partition that we are passing through to Asset2, which is executed on Databricks. What we think is going on, as my colleague @Döme Lőrinczy was debugging and trying to understand in the last days: • DagsterInstance.get_dynamic_partitions returns an empty list • This means that the EventLogStorage cannot find the appropriate partitions (as an event log file is missing in the databricks environment?) We are aware, that DynamicPartitions are still in Experimental - however would be amazing if we could brainstorm a solution as it is rather important for our setup.
This is a minimal code example to replicate this issue:
Copy code
from pathlib import Path
from dagster import (
    asset, 
    sensor, 
    SensorResult, 
    RunRequest,
    DynamicPartitionsDefinition,
    Definitions,
    AssetSelection,
    define_asset_job
)
from dagster_databricks import databricks_pyspark_step_launcher

partition = DynamicPartitionsDefinition(name='test_partition')

@asset(
    partitions_def=partition,
    required_resource_keys={'pyspark_step_launcher'}
)
def test_asset():
    return 1

test_job = define_asset_job(name='test_job', selection=AssetSelection.keys('test_asset'))

@sensor(
    name='test_sensor',
    job=test_job
)
def test_sensor():
    return SensorResult(
        run_requests=[RunRequest(partition_key='new_partition')],
        dynamic_partitions_requests=[
            partition.build_add_request(['new_partition'])
        ]
    )

defs = Definitions(
    assets=[test_asset],
    sensors=[test_sensor],
    jobs=[test_job],
    resources={
        "pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
            {
                "run_config": {
                    "run_name": "test dagster",
                    "cluster": {
                        "existing": "<our_cluster>" # from the Databricks URL
                    },
                    # "libraries": [
                    #     {"pypi": { "package": "dagster==1.3.2"} },
                    # ]
                },
                'secrets_to_env_variables': [],
                "databricks_host": "<dbx_url>",
                "databricks_token": {
                    "env": "DATABRICKS_TOKEN" # It has to be available in the Databricks env as well
                },
                "local_pipeline_package_path": str(Path(__file__).parent.parent)
            }
        ),
    }
)
From my understanding, the following could be a solution: • Dagster creates a zip file of the relevant code, that is then sent to Databricks and executed there, however the Daster instance on Databricks cannot communicate to your "original" starting point. • This event-file, where Dagster temporarily stores / logs the partition key it works with, must be available to the Daster environment on Databricks. Meaning, it should be part of the zip that is sent to Databricks.
In the documentation about <https://docs.dagster.io/_apidocs/libraries/dagster-databricks%7CDagster <> Databricks> it says:
Note that, for the
databricks_pyspark_step_launcher
, either S3 or Azure Data Lake Storage config must be specified for ops to succeed, and the credentials for this storage must also be stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can access storage.
Could this be connected to this issue? Do we have to somehow define ADLS (as we are on Azure) so that this event file is stored, reachable by Dagster on Databricks?
Hi @owen, sorry to tag you out of the blue - but you were able to troubleshoot with us last time in the context of the
pyspark_step_launcher
... Maybe you could have a look?
Hey guys, we are still stuck on this issue... would be great to receive some input... Thanks!
o
Hi @David Weber! What's going on here (basically what you stated) is that the instance that your databricks step is running against is different than your main dagster instance (basically, it's just a temporary instance in the local directory). The StepRunRef contains all of the information that's sent over to the external instance, and this does not include dynamic partitions (as it predates the concept of dynamic partitions). There are two basic solutions I can imagine here. The first one would be something like you're suggesting, where the full set of dynamic partitions is passed through the StepRunRef to the external instance. This could get somewhat heavyweight though, as there may be a large number of partitions, and we just need the one that we're trying to execute. Another possibility, which wouldn't require updating the StepRunRef would be to update
run_step_from_ref
(https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/execution/plan/external_step.py?L237) to detect if you're executing a step with a DynamicPartitionsDefinition, and if so, to add that partition to the external instance. This one seems bit lighter-weight overall, and likely would not be a large volume change. In essence: 1. check
context.partition_key
to see if you're in a partitioned run 2. if so, check
context._partitions_def_for_output(...)
to see if it's a DynamicPartitionsDefinition 3. if it is, call
context.instance.add_dynamic_partitions(partitions_def_name, [context.partition_key])
Would you be interested in prototyping that change on your end? I'd be happy to review, but it might be a bit before I have bandwidth to make that change myself
d
Hi @owen! Thanks for your suggestion, we will try it out on our end and report back - thanks!
Hey @owen, already spent some time on the implementation. When you talk about
context
, you relate to
step_context
? As
step_context
is used in the
run_step_from_ref
function Another question pops up - what do you expect me to feed to
add_dynamic_partitions
as
partitions_def_name
?
partitions_def_name
is the expected first parameter of the function, but not really sure what to pass here....
For
partitions_def_name
I would expect to feed the "name" of our dynamic partition. But I am not sure where I could get that from in the context of this function...
o
great! (when I was talking about
context
, that was indeed the
step_context
) for the bit about the partitions_def_name, once you get the
PartitionsDefinition
object (step 2), if it's a DynamicPartitionsDefinition, it'll have a
name
property which can be passed in for that field
d
Hi @owen, after several hours not finding out why nothing changes about the error, although I made changes in the source code locally, I realized something important: The whole function
run_step_from_ref
is executed on the Databricks cluster, and not in the local Dagster instance. This brings up the question, how can I make changes in the dagster source code that is installed on Databricks, rather than the package I have locally? I already tried creating my personal package from dagster source code, and installing it on the cluster instead of the official version 1.3.10 available on PyPI. The issue here is, that I cannot create a package with the same name "dagster". Because obviously it is registered on PyPI already. So the questions is how I could test this? I also didnt find any possibility to modify the installed "dagster" sourcecode on the Databricks cluster directly with your suggested changes. How do you guys do it? Do you have a suggestion how I could test it?
o
ah sorry for not mentioning that bit! it's bit a while since I've made edits to this package, so I forgot that it's somewhat tricky to get working when testing the basic pattern I used was: 1. go to python_modules/libraries/dagster-databricks 2. run
python setup.py bdist_egg
(you could also do this by creating a wheel) 3. this will create a few directories in the folder that you run it, but the file you want is under
./dist/
4. in your cluster's
libraries
tab, you'll want to install from "Python Egg", dropping in the file from the previous step
you may need to restart the cluster for the new version of dagster-databricks to be available
d
Hi @owen, thanks - my colleague showed me a second way - that works quite nicely, happy to share it later with you (doesn't require cluster restarts). However, with the implementation I still have an issue. This is the stuff I added to the
run_step_from_ref
function:
Copy code
if step_context.has_partition_key:
    # great, we deal with a partition
    # but what do I have to pass to _partitions_def_for_output?
    partitions_def_name = step_context._partitions_def_for_output(step_context.partition_key) #??

    if isinstance(partitions_def_name, DynamicPartitionsDefinition):
        # nice, it is a DynamicPartition!
        # but, partitions_def_name always returns None, even though I have a DynamicPartition...

        step_context.instance.add_dynamic_partitions(
            partitions_def_name=partitions_def_name,
            partition_keys=[step_context.partition_key])
• `step_context.__partitions__def_for_output`: Which parameter does it expect? ◦ apparently a parameter
output_name
, but what is meant by that? • How do I then access the
partition_name
that it returns? ◦ for me it returns always
None
, and it doesn't have a
name
property
o
ah glad you have a nicer workflow there! Here's what I'm imagining:
Copy code
if step_context.has_partition_key:
        # find the partitions def for this step
        partitions_def = next(
            step_context._partitions_def_for_output(output_name=output_name)  # noqa
            for output_name in step_context.op_def.output_dict.keys()
        )

        # in the case of dynamic partitions, ensure that the relevant partition key is included on
        # the remote instance
        if (
            isinstance(partitions_def, DynamicPartitionsDefinition)
            and partitions_def.name is not None
        ):
            step_context.instance.add_dynamic_partitions(
                partitions_def_name=partitions_def.name, partition_keys=[step_context.partition_key]
            )
One thing I'm realizing here is that this solution will only work if you're in a situation where you'll only ever work with a single dynamic partition at a time. So if you, for example, had a setup where you wanted some downstream unpartitioned asset to read in all available dynamic partitions, and is executed with the step launcher, then this scheme wouldn't work, as we're only ever adding a single dynamic partition to the short-lived remote instance. If this is fine for your use case, then no big deal, just wanted to call that out.
d
Hi @owen! Here is the corresponding Github Issue and PR: https://github.com/dagster-io/dagster/issues/14987
Hi @owen, would be amazing if you could have a look at the code and merge the PR if possible 🙂
o
Ah yes! Thanks for putting this together -- I just approved the PR, but I think it could use one extra comment (as noted in the review). I'll get it merged in once that's added
d
Hi @owen - done! 🙂 I added the note