David Weber
06/19/2023, 2:45 PMpyspark_step_launcher
and DynamicPartitions
/ partitions overall.
Our setup is as follows:
• File partitions based on DynamicPartitionsDefinition
• Asset1: Executed locally, uses partition via partitions_def
in the @asset
decorator
• Asset2: Executed in our databricks instance, using pyspark_step_launcher
, using partition via partitions_def
in the @asset
decorator
◦ this asset depends on Asset1, by having it as an input parameter in the function
• Dagster-Version: 1.3.4.
I will post a minimal example in the comments, it should make it easy to replicate the error.
This is the error message we end up with:
dagster._core.errors.DagsterInvalidInvocationError: Partition range <our_partition_key> to <our_partition_key> is
not a valid range. Nonexistent partition keys:
[<our_partition_key>]
Clearly, Dagster cannot find the partition that we are passing through to Asset2, which is executed on Databricks.
What we think is going on, as my colleague @Döme Lőrinczy was debugging and trying to understand in the last days:
• DagsterInstance.get_dynamic_partitions returns an empty list
• This means that the EventLogStorage cannot find the appropriate partitions (as an event log file is missing in the databricks environment?)
We are aware, that DynamicPartitions are still in Experimental - however would be amazing if we could brainstorm a solution as it is rather important for our setup.David Weber
06/19/2023, 2:47 PMfrom pathlib import Path
from dagster import (
asset,
sensor,
SensorResult,
RunRequest,
DynamicPartitionsDefinition,
Definitions,
AssetSelection,
define_asset_job
)
from dagster_databricks import databricks_pyspark_step_launcher
partition = DynamicPartitionsDefinition(name='test_partition')
@asset(
partitions_def=partition,
required_resource_keys={'pyspark_step_launcher'}
)
def test_asset():
return 1
test_job = define_asset_job(name='test_job', selection=AssetSelection.keys('test_asset'))
@sensor(
name='test_sensor',
job=test_job
)
def test_sensor():
return SensorResult(
run_requests=[RunRequest(partition_key='new_partition')],
dynamic_partitions_requests=[
partition.build_add_request(['new_partition'])
]
)
defs = Definitions(
assets=[test_asset],
sensors=[test_sensor],
jobs=[test_job],
resources={
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
{
"run_config": {
"run_name": "test dagster",
"cluster": {
"existing": "<our_cluster>" # from the Databricks URL
},
# "libraries": [
# {"pypi": { "package": "dagster==1.3.2"} },
# ]
},
'secrets_to_env_variables': [],
"databricks_host": "<dbx_url>",
"databricks_token": {
"env": "DATABRICKS_TOKEN" # It has to be available in the Databricks env as well
},
"local_pipeline_package_path": str(Path(__file__).parent.parent)
}
),
}
)
David Weber
06/19/2023, 2:48 PMDavid Weber
06/19/2023, 3:07 PMNote that, for theCould this be connected to this issue? Do we have to somehow define ADLS (as we are on Azure) so that this event file is stored, reachable by Dagster on Databricks?, either S3 or Azure Data Lake Storage config must be specified for ops to succeed, and the credentials for this storage must also be stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can access storage.databricks_pyspark_step_launcher
David Weber
06/20/2023, 9:29 AMpyspark_step_launcher
...
Maybe you could have a look?David Weber
06/21/2023, 9:36 AMowen
06/21/2023, 6:03 PMrun_step_from_ref
(https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/execution/plan/external_step.py?L237) to detect if you're executing a step with a DynamicPartitionsDefinition, and if so, to add that partition to the external instance. This one seems bit lighter-weight overall, and likely would not be a large volume change. In essence:
1. check context.partition_key
to see if you're in a partitioned run
2. if so, check context._partitions_def_for_output(...)
to see if it's a DynamicPartitionsDefinition
3. if it is, call context.instance.add_dynamic_partitions(partitions_def_name, [context.partition_key])
Would you be interested in prototyping that change on your end? I'd be happy to review, but it might be a bit before I have bandwidth to make that change myselfDavid Weber
06/22/2023, 7:02 AMDavid Weber
06/22/2023, 1:15 PMcontext
, you relate to step_context
? As step_context
is used in the run_step_from_ref
function
Another question pops up - what do you expect me to feed to add_dynamic_partitions
as partitions_def_name
?
partitions_def_name
is the expected first parameter of the function, but not really sure what to pass here....David Weber
06/22/2023, 1:16 PMpartitions_def_name
I would expect to feed the "name" of our dynamic partition. But I am not sure where I could get that from in the context of this function...owen
06/22/2023, 10:23 PMcontext
, that was indeed the step_context
)
for the bit about the partitions_def_name, once you get the PartitionsDefinition
object (step 2), if it's a DynamicPartitionsDefinition, it'll have a name
property which can be passed in for that fieldDavid Weber
06/23/2023, 2:31 PMrun_step_from_ref
is executed on the Databricks cluster, and not in the local Dagster instance. This brings up the question, how can I make changes in the dagster source code that is installed on Databricks, rather than the package I have locally?
I already tried creating my personal package from dagster source code, and installing it on the cluster instead of the official version 1.3.10 available on PyPI. The issue here is, that I cannot create a package with the same name "dagster". Because obviously it is registered on PyPI already.
So the questions is how I could test this? I also didnt find any possibility to modify the installed "dagster" sourcecode on the Databricks cluster directly with your suggested changes.
How do you guys do it? Do you have a suggestion how I could test it?owen
06/23/2023, 8:57 PMpython setup.py bdist_egg
(you could also do this by creating a wheel)
3. this will create a few directories in the folder that you run it, but the file you want is under ./dist/
4. in your cluster's libraries
tab, you'll want to install from "Python Egg", dropping in the file from the previous stepowen
06/23/2023, 8:57 PMDavid Weber
06/26/2023, 3:19 PMrun_step_from_ref
function:
if step_context.has_partition_key:
# great, we deal with a partition
# but what do I have to pass to _partitions_def_for_output?
partitions_def_name = step_context._partitions_def_for_output(step_context.partition_key) #??
if isinstance(partitions_def_name, DynamicPartitionsDefinition):
# nice, it is a DynamicPartition!
# but, partitions_def_name always returns None, even though I have a DynamicPartition...
step_context.instance.add_dynamic_partitions(
partitions_def_name=partitions_def_name,
partition_keys=[step_context.partition_key])
• `step_context.__partitions__def_for_output`: Which parameter does it expect?
◦ apparently a parameter output_name
, but what is meant by that?
• How do I then access the partition_name
that it returns?
◦ for me it returns always None
, and it doesn't have a name
propertyowen
06/26/2023, 10:08 PMif step_context.has_partition_key:
# find the partitions def for this step
partitions_def = next(
step_context._partitions_def_for_output(output_name=output_name) # noqa
for output_name in step_context.op_def.output_dict.keys()
)
# in the case of dynamic partitions, ensure that the relevant partition key is included on
# the remote instance
if (
isinstance(partitions_def, DynamicPartitionsDefinition)
and partitions_def.name is not None
):
step_context.instance.add_dynamic_partitions(
partitions_def_name=partitions_def.name, partition_keys=[step_context.partition_key]
)
One thing I'm realizing here is that this solution will only work if you're in a situation where you'll only ever work with a single dynamic partition at a time. So if you, for example, had a setup where you wanted some downstream unpartitioned asset to read in all available dynamic partitions, and is executed with the step launcher, then this scheme wouldn't work, as we're only ever adding a single dynamic partition to the short-lived remote instance.
If this is fine for your use case, then no big deal, just wanted to call that out.David Weber
06/28/2023, 4:00 PMDavid Weber
06/30/2023, 12:27 PMowen
06/30/2023, 4:41 PMDavid Weber
07/03/2023, 1:36 PM