Thibault
06/22/2022, 12:50 PM# my_ops.py
@ops(required_resource_keys={"my_resource"}, config_schema={"my_config": String})
def op(context):
...
@ops(required_resource_keys={"my_other_resource"}, config_schema={"my_other_config": String})
def other_op(context):
...
I made a job of these ops, using a graph because I want to have op specific resources
# my_job.py
@graph()
def my_job_graph():
op(other_op())
my_job = my_job_graph.to_job(
name='my_job'
resource_defs={"my_resource": my_resource, "my_other_resource": my_other_resource"}
)
Then I import this in a repository and I can use it in dagit by scaffolding the config.
However when I want to add a partitioned schedule, things get tricky. I need to access data from an API to create my partitions. I thought that would be a good use of @dynamic_partitioned_config.
# my_schedule.py
def get_data_and_make_partitions() -> List[str]:
# do stuff and create a list of strings that are my partitions
@dynamic_partitioned_config(partition_fn=get_data_and_make_partitions)
def make_partition_config(partition: str) -> Dict:
# do stuff and return a config dictionary like {"ops": {"my_op": {"config": {"my_config": "some_conf"}}, "my_other_op": {"config":{"my_other_config": some_other_config}}}}
Then if I try to import this function in the job creation, it doesn't work
my_job = my_job_graph.to_job(
name='my_job',
config=make_partition_config,
resource_defs={"my_resource": my_resource, "my_other_resource": my_other_resource"}
)
I get the following error
Object DynamicPartitionsDefinition(partition_fn=<function make_partition_config at 0x7ff06fe029d0>) is not a TimeWindowPartitionsDefinition.
Am I missing something here? Thanks !sandy
06/22/2022, 2:24 PMThibault
06/22/2022, 2:28 PM/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/workspace/context.py:558: UserWarning: Error loading repository location etl:dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function _get_generic_collection_configs_from_enoapp_api_from_env at 0x7ff06fe029d0>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition(partition_fn=<function _get_generic_collection_configs_from_enoapp_api_from_env at 0x7ff06fe029d0>) with type <class 'dagster.core.definitions.partition.DynamicPartitionsDefinition'>.
Stack Trace:
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/server.py", line 209, in __init__
self._loaded_repositories = LoadedRepositories(
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/server.py", line 94, in __init__
loadable_targets = get_loadable_targets(
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets
else loadable_targets_from_python_package(package_name, working_directory)
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/workspace/autodiscovery.py", line 46, in loadable_targets_from_python_package
module = load_python_module(
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module
return importlib.import_module(module_name)
File "/home/thibault/.pyenv/versions/3.8.13/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/__init__.py", line 6, in <module>
from .repository import etl, legacy_etl, etl_test
File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/repository.py", line 20, in <module>
from etl.schedules.generic_power_pv_etl_schedule import generic_power_pv_etl_daily_schedule
File "/home/thibault/PycharmProjects/data-pipelines/dagster-repos/etl/etl/schedules/generic_power_pv_etl_schedule.py", line 72, in <module>
generic_power_pv_etl_daily_schedule = build_schedule_from_partitioned_job(
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/core/definitions/partitioned_schedule.py", line 50, in build_schedule_from_partitioned_job
check.inst(partitioned_config.partitions_def, TimeWindowPartitionsDefinition)
File "/home/thibault/.local/share/virtualenvs/etl-j0EFwzgq/lib/python3.8/site-packages/dagster/check/__init__.py", line 531, in inst
raise _type_mismatch_error(obj, ttype, desc)
warnings.warn(
sandy
06/22/2022, 3:07 PMbuild_schedule_from_partitioned_job
only works with jobs that have a TimeWindowPartitionsDefinition
, not jobs with dynamic partitions.
you could do something like this:
@schedule
def my_schedule(context):
return my_job.run_request_for_partition("some partition key", run_key=None)
Thibault
06/22/2022, 3:33 PMbuild_schedule_from_partitioned_job
in my initial example.
Elaborating on your answer, I wrote the following schedule
@schedule(cron_schedule="0 0 * * *")
def my_schedule(context):
partitions = get_data_and_make_partitions()
for p in partitions:
request = my_job.run_request_for_partition(partition_key=p, run_key=p)
yield request
And now I can indeed find this schedule in the Status > schedule menu
<http://my_job_graph.to|my_job_graph.to>_job
part that ran the job with the partitioned config, as build_schedule_from_partitioned_job
was the issue, I had removed too much code.
I need to double check if everything works as I want it to but at least I don't have any error left ! Thanks a lot 🙂sandy
06/22/2022, 4:01 PMThibault
06/22/2022, 4:08 PM