https://dagster.io/ logo
#dagster-support
Title
# dagster-support
t

Thibault

06/22/2022, 12:50 PM
Hi folks ! I'm trying to configure partitions for a job with resources but I don't know how and I couldn't figure it out in the docs. I have multiple ops that require resources
Copy code
# my_ops.py

@ops(required_resource_keys={"my_resource"}, config_schema={"my_config": String})
def op(context):
  ...

@ops(required_resource_keys={"my_other_resource"}, config_schema={"my_other_config": String})
def other_op(context):
  ...
I made a job of these ops, using a graph because I want to have op specific resources
Copy code
# my_job.py

@graph()
def my_job_graph():
  op(other_op())

my_job = my_job_graph.to_job(
  name='my_job'
  resource_defs={"my_resource": my_resource, "my_other_resource": my_other_resource"}
)
Then I import this in a repository and I can use it in dagit by scaffolding the config. However when I want to add a partitioned schedule, things get tricky. I need to access data from an API to create my partitions. I thought that would be a good use of @dynamic_partitioned_config.
Copy code
# my_schedule.py

def get_data_and_make_partitions() -> List[str]:
  # do stuff and create a list of strings that are my partitions

@dynamic_partitioned_config(partition_fn=get_data_and_make_partitions)
def make_partition_config(partition: str) -> Dict:
  # do stuff and return a config dictionary like {"ops": {"my_op": {"config": {"my_config": "some_conf"}}, "my_other_op": {"config":{"my_other_config": some_other_config}}}}
Then if I try to import this function in the job creation, it doesn't work
Copy code
my_job = my_job_graph.to_job(
  name='my_job',
  config=make_partition_config,
  resource_defs={"my_resource": my_resource, "my_other_resource": my_other_resource"}
)
I get the following error
Copy code
Object DynamicPartitionsDefinition(partition_fn=<function make_partition_config at 0x7ff06fe029d0>) is not a TimeWindowPartitionsDefinition.
Am I missing something here? Thanks !
🤖 1
s

sandy

06/22/2022, 2:24 PM
Hi @Thibault - that's unexpected. Would you be able to share the full stack trace?
t

Thibault

06/22/2022, 2:28 PM
Hi Sandy, sure, here it is 🙂 The naming is going to be a little be different from the exemple I provided (I tried to simplify it)
Copy code
/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/workspace/context.py:558: UserWarning: Error loading repository location etl:dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function _get_generic_collection_configs_from_enoapp_api_from_env at 0x7ff06fe029d0>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition(partition_fn=<function _get_generic_collection_configs_from_enoapp_api_from_env at 0x7ff06fe029d0>) with type <class 'dagster.core.definitions.partition.DynamicPartitionsDefinition'>.

Stack Trace:
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/server.py", line 209, in __init__
    self._loaded_repositories = LoadedRepositories(
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/server.py", line 94, in __init__
    loadable_targets = get_loadable_targets(
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets
    else loadable_targets_from_python_package(package_name, working_directory)
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/workspace/autodiscovery.py", line 46, in loadable_targets_from_python_package
    module = load_python_module(
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module
    return importlib.import_module(module_name)
  File "/home/thibault/.pyenv/versions/3.8.13/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/__init__.py", line 6, in <module>
    from .repository import etl, legacy_etl, etl_test
  File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/repository.py", line 20, in <module>
    from etl.schedules.generic_power_pv_etl_schedule import generic_power_pv_etl_daily_schedule
  File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/schedules/generic_power_pv_etl_schedule.py", line 72, in <module>
    generic_power_pv_etl_daily_schedule = build_schedule_from_partitioned_job(
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/definitions/partitioned_schedule.py", line 50, in build_schedule_from_partitioned_job
    check.inst(partitioned_config.partitions_def, TimeWindowPartitionsDefinition)
  File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/check/__init__.py", line 531, in inst
    raise _type_mismatch_error(obj, ttype, desc)

  warnings.warn(
s

sandy

06/22/2022, 3:07 PM
ah, got it.
build_schedule_from_partitioned_job
only works with jobs that have a
TimeWindowPartitionsDefinition
, not jobs with dynamic partitions. you could do something like this:
Copy code
@schedule
def my_schedule(context):
    return my_job.run_request_for_partition("some partition key", run_key=None)
t

Thibault

06/22/2022, 3:33 PM
Thanks for helping out Sandy ! And sorry I forgot the part with
build_schedule_from_partitioned_job
in my initial example. Elaborating on your answer, I wrote the following schedule
Copy code
@schedule(cron_schedule="0 0 * * *")
def my_schedule(context):
    partitions = get_data_and_make_partitions()
    for p in partitions:
        request = my_job.run_request_for_partition(partition_key=p, run_key=p)
        yield request
And now I can indeed find this schedule in the Status > schedule menu Is there a way to turn that schedule into partitions in order to get the backfill tab ? Edit: I had commented the
<http://my_job_graph.to|my_job_graph.to>_job
part that ran the job with the partitioned config, as
build_schedule_from_partitioned_job
was the issue, I had removed too much code. I need to double check if everything works as I want it to but at least I don't have any error left ! Thanks a lot 🙂
s

sandy

06/22/2022, 4:01 PM
As long as the job has partitions on it, the partitions tab should show up on the job page. Is that not occurring for you? Or are you looking for something different?
t

Thibault

06/22/2022, 4:08 PM
I was editing my previous post, hoping that you wouldn't read it before... That didn't go as planned 😅 The issue was that I had removed the partitioned job thinking that the scheduling solution you suggested was replacing that part also
👍 1
5 Views