Paul Herzog
02/26/2024, 7:26 AMScout
02/26/2024, 7:26 AMbuild_schedule_from_partitioned_job
function to construct a schedule whose interval matches the spacing of partitions in your job. Here's a step-by-step guide based on the documentation: 1. Define your partitioned asset with a partitions_def
argument that specifies how the asset is partitioned. For example, you can use DailyPartitionsDefinition
or HourlyPartitionsDefinition
to partition your asset by day or hour, respectively. 2. Define an asset job using define_asset_job
, passing in the partitioned asset and the partitions_def
. 3. Use build_schedule_from_partitioned_job
to create a schedule from the partitioned job. This function will automatically create a schedule that triggers at the interval defined by your partitions. Here is an example code snippet from the documentation: python from dagster import ( asset, build_schedule_from_partitioned_job, define_asset_job, DailyPartitionsDefinition, ) partitions_def = DailyPartitionsDefinition(start_date="2023-01-21") @asset(partitions_def=partitions_def) def my_daily_asset(): ... partitioned_asset_job = define_asset_job( name="my_daily_asset_job", selection=[my_daily_asset], partitions_def=partitions_def, ) my_daily_asset_schedule = build_schedule_from_partitioned_job( partitioned_asset_job, )
In this example, my_daily_asset
is a daily-partitioned asset, and my_daily_asset_schedule
is the schedule that will trigger the job to materialize the asset according to the daily partitions. Remember that the schedule created by build_schedule_from_partitioned_job
will fill in the latest partition in the partition set that exists as of the tick time. If you need to adjust which partition is considered the latest, you can use the end_offset
parameter in your partitions definition. For more specific use cases or configurations, you should refer to the Dagster documentation for additional details and examples.Paul Herzog
02/26/2024, 7:29 AMScout
02/26/2024, 7:29 AMbuild_schedule_from_partitioned_job
, which is typically used for time-based partitions. Instead, you will manually create a schedule that specifies which partition to run.
Here's a step-by-step guide on how to create a schedule for your static partitioned job:
1. Define your static partitioned asset or job with the StaticPartitionsDefinition
you've provided.
2. Create a schedule that targets the specific partition you want to run. You can do this by defining a function that returns a RunRequest
with the desired partition_key
.
Here's an example based on the documentation:
python
from dagster import ScheduleDefinition, RunRequest, job, op, static_partitioned_config
# Assuming you have a list of project IDs
project_ids = [job_traveler_id, hello_jobs_id]
# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
[str(project_id) for project_id in project_ids]
)
# Define your job with the static partitioned config
@static_partitioned_config(partition_keys=[str(project_id) for project_id in project_ids])
def my_partitioned_config(partition_key: str):
# Return the configuration for the given partition key
return {"ops": {"my_op": {"config": {"project_id": partition_key}}}}
# Define an op that will use the project_id from the config
@op(config_schema={"project_id": str})
def my_op(context):
project_id = context.op_config["project_id"]
<http://context.log.info|context.log.info>(f"Processing project_id: {project_id}")
# Define the job with the partitioned config
@job(config=my_partitioned_config)
def my_partitioned_job():
my_op()
# Create a schedule for the static partitioned job
def my_schedule(context):
# You can determine which partition to run based on some logic or a fixed value
partition_to_run = "some_logic_to_determine_partition_key"
run_config = my_partitioned_config(partition_to_run)
return RunRequest(run_key=None, run_config=run_config)
# Define the schedule
my_static_partition_schedule = ScheduleDefinition(
job=my_partitioned_job,
cron_schedule="0 0 * * *", # This is a daily schedule at midnight, adjust as needed
execution_fn=my_schedule,
)
In this example, my_partitioned_job
is the job that uses the static partitioned config. The my_schedule
function is where you define the logic to select which partition to run. The my_static_partition_schedule
is the schedule definition that will trigger the job according to the cron schedule you specify.
Please adjust the cron_schedule
to match the frequency at which you want to run the job and the logic inside `my_Scout
02/26/2024, 7:29 AMPaul Herzog
02/26/2024, 7:31 AMScout
02/26/2024, 7:31 AMaccuranker_job
that targets assets in the "Accuranker" group and uses a static partition definition based on project IDs. You've also created a schedule named accuranker_schedule
that is set to run daily at 6:00 AM in the "Europe/Vienna" timezone, and the schedule is set to be running by default. However, there is one aspect that needs attention. The ScheduleDefinition
does not directly accept a partitions_def
argument. The partition definition is associated with the assets themselves or the job when you define it using define_asset_job
. The schedule will trigger the job, and the job will know which partitions to materialize based on its own definition. Here's a simplified version of your setup: python from dagster import define_asset_job, AssetSelection, ScheduleDefinition, DefaultScheduleStatus # Assuming accuranker_partition_defintion_by_project_id is defined elsewhere as you showed before # Define the asset job accuranker_job = define_asset_job( "accuranker_job", description="Accuranker updates data at 05:55-ish every day.", selection=AssetSelection.groups("Accuranker"), partitions_def=accuranker_partition_defintion_by_project_id # This line is not needed if your assets are already partitioned ) # Define the schedule accuranker_schedule = ScheduleDefinition( job=accuranker_job, cron_schedule="0 6 * * *", execution_timezone="Europe/Vienna", default_status=DefaultScheduleStatus.RUNNING )
Please note that if your assets within the "Accuranker" group are already defined with the accuranker_partition_defintion_by_project_id
, you do not need to specify the partitions_def
again in the define_asset_job
call. The job will inherit the partitioning information from the assets it is meant to materialize. If your assets are not already partitioned, you would need to define the partitioning at the asset level, not at the job level. The job will then materialize the correct partitions based on the schedule's execution time and the partition mapping logic. Make sure that your assets are correctly partitioned and that the job is set up to materialize those partitions. The schedule will then take care of running the job at the specified times.Paul Herzog
02/26/2024, 7:35 AMScout
02/26/2024, 7:35 AMdagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
, suggests that there was an attempt to access a partition_key
in a context where the run was not associated with a partition. This can happen if the job expects to be run with a partition but was triggered without specifying one.
To resolve this issue, ensure that the job is correctly associated with the partition definition and that when the schedule triggers the job, it provides the necessary partition information. Here are a few things to check:
1. Asset Partitioning: Verify that the assets you are trying to materialize with the job are correctly partitioned. The partitioning should be defined at the asset level.
2. Job Definition: When defining the job with define_asset_job
, you should not need to pass the partitions_def
if the assets are already partitioned. The job will use the partitioning information from the assets.
3. Schedule Triggering: When the schedule triggers the job, it needs to specify which partition to materialize. This is typically done by providing a run_config
that includes the partition_key
for the job to run. However, since you are using a static partition scheme, you may need to implement custom logic to determine which partition key to use for each scheduled run.
Here's an example of how you might modify the schedule to include the partition key in the `run_config`:
python
from dagster import ScheduleDefinition, RunRequest
def my_partition_selector(context):
# Custom logic to select the appropriate partition key
# For example, you might select based on the current date
partition_key = "some_logic_to_determine_partition_key"
return RunRequest(
run_key=None, # Unique run key for this schedule tick, or None to always run
run_config={"solids": {"my_solid": {"config": {"partition_key": partition_key}}}},
tags={"partition": partition_key}, # Optional: helpful for filtering runs in the UI
)
accuranker_schedule = ScheduleDefinition(
job=accuranker_job,
cron_schedule="0 6 * * *",
execution_timezone="Europe/Vienna",
default_status=DefaultScheduleStatus.RUNNING,
execution_fn=my_partition_selector # Custom execution function to select partition
)
In this example, my_partition_selector
is a custom function that determines which partition key to use for each scheduled run. You would need to replace "some_logic_to_determine_partition_key"
with the actual logic for selecting the appropriate partition key.
Please adjust the run_config
in the RunRequest
to match the structure expected by your job. The key point is that the partition_key
must be included in the run_config
if your job eScout
02/26/2024, 7:35 AMPaul Herzog
02/26/2024, 7:37 AMScout
02/26/2024, 7:38 AMRunRequest
for each partition key.
Here's an example of how you might set up a schedule that loops through each project ID and triggers a run for each partition:
python
from dagster import ScheduleDefinition, RunRequest, DefaultScheduleStatus
# Define your project IDs
job_traveler_id = 123 # Project DE
hello_jobs_id = 456 # Project EN
project_ids = [
job_traveler_id,
hello_jobs_id
]
# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
[str(project_id) for project_id in project_ids]
)
# Define the asset job (assuming this is already done correctly)
accuranker_job = define_asset_job(
"accuranker_job",
description="Accuranker updates data at 05:55-ish every day.",
selection=AssetSelection.groups("Accuranker"),
partitions_def=accuranker_partition_defintion_by_project_id
)
# Define a schedule that generates a RunRequest for each partition key
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
yield RunRequest(
run_key=f"{current_date}-{partition_key}", # Unique identifier for the run
run_config={
# Include the partition key in the run config
# Adjust this to match the structure expected by your job
"solids": {
"my_solid": {
"config": {
"project_id": partition_key
}
}
}
},
tags={"partition": partition_key} # Optional: helpful for filtering runs in the UI
)
# Create the schedule definition
accuranker_schedule = ScheduleDefinition(
job=accuranker_job,
cron_schedule="0 6 * * *", # Runs daily at 6:00 AM
execution_timezone="Europe/Vienna",
default_status=DefaultScheduleStatus.RUNNING,
execution_fn=my_partition_schedule
)
In this example, the my_partition_schedule
function is a generator that yields a RunRequest
for each project ID. Each RunRequest
includes a unique run_key
and the run_config
necessary to run the job for the specified partition.
Please adjust the run_config
in the RunRequest
to match the structure expected by your job. The key point is that the partition_key
must be included in the run_config
if your job eScout
02/26/2024, 7:38 AMPaul Herzog
02/26/2024, 7:41 AMScout
02/26/2024, 7:41 AM__init__.py
or any other module where you want to load your schedules. Here's how you can organize your code:
Let's assume you have a file named partitions.py
where your partition definition and job are defined:
python
# partitions.py
from dagster import define_asset_job, AssetSelection, StaticPartitionsDefinition
# Define your project IDs
job_traveler_id = 123 # Project DE
hello_jobs_id = 456 # Project EN
project_ids = [
job_traveler_id,
hello_jobs_id
]
# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
[str(project_id) for project_id in project_ids]
)
# Define the asset job
accuranker_job = define_asset_job(
"accuranker_job",
description="Accuranker updates data at 05:55-ish every day.",
selection=AssetSelection.groups("Accuranker"),
partitions_def=accuranker_partition_defintion_by_project_id
)
Now, in the same file or a different one, you can define your schedule function and the schedule itself:
python
# schedules.py
from dagster import ScheduleDefinition, RunRequest, DefaultScheduleStatus
from .partitions import accuranker_job, project_ids # Import from your partitions file
# Define a schedule that generates a RunRequest for each partition key
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
yield RunRequest(
run_key=f"{current_date}-{partition_key}",
run_config={
# Include the partition key in the run config
# Adjust this to match the structure expected by your job
},
tags={"partition": partition_key}
)
# Create the schedule definition
accuranker_schedule = ScheduleDefinition(
job=accuranker_job,
cron_schedule="0 6 * * *",
execution_timezone="Europe/Vienna",
default_status=DefaultScheduleStatus.RUNNING,
execution_fn=my_partition_schedule
)
Finally, in your __init__.py
or the module where you initialize your Dagster repository, you can import the schedule:
python
# __init__.py
from .schedules import accuranker_schedule
# ... your repository definition that includes accuranker_schedule
Make sure to adjust the import paths based on your actual file structure. The .
in the import statement represents a relative import from the current package. If your files are in different packages, you'll need to adjust the import accordiScout
02/26/2024, 7:41 AMPaul Herzog
02/26/2024, 8:02 AMScout
02/26/2024, 8:02 AMDefinitions
object within your repository definition. The repository should be aware of the schedule for it to be loaded. 5. Review the Schedule Definition: Double-check the ScheduleDefinition
to ensure that all required fields are provided and correctly configured, including job
, cron_schedule
, execution_timezone
, and execution_fn
. 6. Check dagster-daemon: If you're running Dagster locally, ensure that dagster-daemon
is running. The daemon process is responsible for executing scheduled runs. 7. Review Error Messages: If there are any error messages in Dagit or in the console where you're running Dagster, review them for clues about what might be going wrong. 8. Consult the Logs: Check the logs for any error messages related to the schedule. The logs can provide detailed information about issues that are preventing the schedule from loading. 9. Check for Updates: If you recently updated Dagster, ensure that all your code is compatible with the new version. Sometimes, updates can introduce changes that require modifications to your existing code. If you've gone through these steps and the schedule is still not loading, you may want to reach out to the Dagster community or support for further assistance. Provide as much detail as possible, including the steps you've taken to troubleshoot, any error messages you've encountered, and the relevant sections of your code.Paul Herzog
02/26/2024, 8:04 AMScout
02/26/2024, 8:04 AMconfig_schema
parameter in their definitions. This will tell you what config they expect.
python
@op(config_schema={"param": str})
def my_op(context):
param = context.op_config["param"]
# ...
2. Resources: For resources, the config_schema
is defined in the resource decorator or function.
python
@resource(config_schema={"connection_string": str})
def my_resource(context):
connection_string = context.resource_config["connection_string"]
# ...
3. Assets: Assets can also have a config_schema
if they require configuration.
python
@asset(config_schema={"file_path": str})
def my_asset(context):
file_path = context.op_config["file_path"]
# ...
4. Jobs: For jobs, especially those that materialize assets or execute ops, you will need to provide a run_config
that satisfies the combined config schemas of all the ops and resources used within the job.
python
@job
def my_job():
my_op()
# ...
# When executing the job, provide the run_config
result = my_job.execute_in_process(
run_config={
"ops": {
"my_op": {
"config": {
"param": "value"
}
}
},
"resources": {
"my_resource": {
"config": {
"connection_string": "<postgresql://localhost:5432>"
}
}
}
}
)
5. Schedules and Sensors: For schedules and sensors that trigger jobs, you will need to provide a run_config
that matches the job's requirements.
python
@schedule(job=my_job, cron_schedule="0 * * * *", execution_timezone="UTC")
def my_schedule(context):
return RunRequest(
run_config={
# Include the necessary config for the job
}
)
6. Documentation and Examples: The documentation provides detailed information on how to define and use config schemas. It also includes examples that can help you understand the expected structure.
7. Dagit: When using Dagit, you can use the playground to construct the run_config
. Dagit provides a form-based interface to fill in the required config based on the schema, and it will validate the config for you.
8. **Scout
02/26/2024, 8:05 AMrun_config
for a job or when configuring resources and ops.Paul Herzog
02/26/2024, 8:14 AMScout
02/26/2024, 8:14 AMio_manager
configured for GCS. To update your run_config
in the my_partition_schedule
function to include these resource configurations, you'll need to include the resources
block with the appropriate config for each resource.
Here's how you can update your my_partition_schedule
function to include the necessary resource configurations:
python
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Define the base resource config that will be used for all runs
base_resource_config = {
"resources": {
"bigquery": {
"config": {
"location": "europe-west3",
"project": "adroit-medium-11111"
}
},
"google_cloud_storage": {
"config": {
"project": "adroit-medium-11111"
}
},
"io_manager": {
"config": {
"gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
"gcs_prefix": "data_folder"
}
}
}
}
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
if partition_key == "494891":
project_name = "Job Traveler"
elif partition_key == "495588":
project_name = "Hello Jobs"
else:
project_name = "Unknown Project" # Default name if project_id doesn't match
# Merge the base resource config with the specific run config for the partition
run_config = {
**base_resource_config,
"solids": {
"my_solid": {
"config": {
"project_id": partition_key
}
}
}
}
yield RunRequest(
run_key=f"{current_date}-{partition_key}", # Unique identifier for the run
run_config=run_config,
tags={
"partition": partition_key,
"project_name": project_name # Optional: helpful for filtering runs in the UI
}
)
In this updated function, base_resource_config
contains the resource configurations that you saw in the launchpad. For each RunRequest
, we merge this base config with the specific config for the solid that requires the project_id
. This way, each run will have the necessary resource configurations as well as the specific solid config.
PleScout
02/26/2024, 8:14 AMsolids
key in the run_config
should match the structure expected by your job. If your job uses ops instead of solids, or if the structure of the config is different, you will need to adjust the run_config
accordingly. The project_name
tag is also included for each run, which can be useful for identifying runs in the UI.Paul Herzog
02/26/2024, 8:17 AMScout
02/26/2024, 8:17 AMrun_config
you provide when scheduling the job includes the necessary configuration for the resources and any other required config specific to the asset's execution.
Since your asset api_all_ranking_data
requires a partition_key
to determine the accuranker_id
, you need to ensure that the run_config
for each scheduled run includes the partition key. The partition key is typically passed as part of the context when the asset is executed within a partitioned job.
Here's how you can update the my_partition_schedule
function to include the necessary configuration:
python
from dagster import RunRequest
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Define the base resource config that will be used for all runs
base_resource_config = {
"resources": {
"bigquery": {
"config": {
"location": "europe-west3",
"project": "adroit-medium-11111"
}
},
"google_cloud_storage": {
"config": {
"project": "adroit-medium-11111"
}
},
"io_manager": {
"config": {
"gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
"gcs_prefix": "data_folder"
}
}
}
}
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
# Construct the run config for the partitioned run
run_config = {
**base_resource_config,
"ops": {
# The op name here should match the name of the op within your job
# If your asset is used directly, you may not need to specify this
"api_all_ranking_data": {
"config": {
# Include any op-specific config here
}
}
},
"solids": {
# If your job uses solids instead of ops, use this key
}
}
yield RunRequest(
run_key=f"{current_date}-{partition_key}",
run_config=run_config,
tags={
"partition": partition_key,
# Include any additional tags you want to attach to the run
}
)
In this updated function, we're assuming that the api_all_ranking_data
asset is represented as an op within the job. If your job uses the older solids
terminology, you should replace "ops"
with "solids"
in the `runScout
02/26/2024, 8:18 AMrun_config
should include the base resource configuration and any op-specific configuration that is required. Please note that the actual structure of the run_config
may vary depending on the specifics of your job and assets. You should adjust the "api_all_ranking_data"
key to match the actual name of the op or solid within your job if it's different. If there are additional configurations required for the op or solid, you should include them within the config
dictionary. Lastly, ensure that the partition_key
is correctly passed to the asset during execution. In a partitioned job, Dagster will automatically provide the partition_key
to the asset's context, so you typically do not need to include it explicitly in the run_config
. However, if you have custom logic that requires it, you should include it as needed.Paul Herzog
02/26/2024, 8:20 AMScout
02/26/2024, 8:20 AMpartitions_def
you provide. If your assets are partitioned by project IDs, as in your case, the job will expect to receive a partition_key
that corresponds to one of those project IDs when it runs.
When you schedule a partitioned job, you don't need to provide the partition_key
for each asset directly in the run_config
. Instead, Dagster handles this automatically based on the partition that is being materialized. The partition_key
is made available in the asset's execution context, which your asset function can access.
If you want to run the job for all partitions (project IDs) at the scheduled time, you would typically create a separate run for each partition. Here's an example of how you might adjust the schedule to create a run for each project ID:
python
from dagster import RunRequest
def my_partition_schedule(context):
# Define the base resource config that will be used for all runs
base_resource_config = {
"resources": {
"bigquery": {
"config": {
"location": "europe-west3",
"project": "adroit-medium-11111"
}
},
"google_cloud_storage": {
"config": {
"project": "adroit-medium-11111"
}
},
"io_manager": {
"config": {
"gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
"gcs_prefix": "data_folder"
}
}
}
}
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Generate a RunRequest for each project ID (partition)
for project_id in project_ids:
partition_key = str(project_id)
run_config = {
**base_resource_config,
# No need to specify the partition_key here; Dagster will provide it
}
yield RunRequest(
run_key=f"{current_date}-{partition_key}",
run_config=run_config,
tags={
"partition": partition_key,
# Include any additional tags you want to attach to the run
}
)
In this schedule, a RunRequest
is created for each project_id
in the project_ids
list. Dagster will automatically use the partition_key
to provide the correct context to your asset when the job runs.
Remember to ensure that your asset functions are designed to use the partition_key
from the context, as shown in your api_all_ranking_data
asset example. Dagster will handle the rest, ensuring that each asset is materialized with the correcScout
02/26/2024, 8:20 AMpartition_key
.Paul Herzog
02/26/2024, 8:28 AMScout
02/26/2024, 8:28 AMpartition_key
during execution, but the run was not initiated with a partition key. This can happen if the job is expected to be partitioned, but the schedule does not provide the partition information correctly.
In your my_partition_schedule
function, you are yielding RunRequest
objects for each project ID, but you are not specifying which partition each run corresponds to. In Dagster, when you have a partitioned job, you need to specify the partition_key
for each run so that the job knows which partition to execute.
To resolve this issue, you should include the partition_key
in the tags of the RunRequest
and ensure that the job is configured to use the partition key from the context during execution. However, since you are using a StaticPartitionsDefinition
, you might need to explicitly pass the partition_key
in the run_config
for the job to recognize it.
Here's how you might modify the my_partition_schedule
function to include the `partition_key`:
python
# accuranker_partitions.py
from dagster import RunRequest
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Define the base resource config that will be used for all runs
base_resource_config = {
# ... your resource config
}
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
# Determine project_name based on partition_key
# ...
run_config = {
**base_resource_config,
"solids": {
# Assuming 'my_solid' is the name of the solid that requires the partition_key
# You need to replace 'my_solid' with the actual name of your solid or op
"my_solid": {
"config": {
"partition_key": partition_key # Pass the partition_key in the run_config
}
}
}
}
yield RunRequest(
run_key=f"{current_date}-{partition_key}",
run_config=run_config,
tags={
"partition": partition_key, # Include the partition_key in the tags
"project_name": project_name
}
)
In this updated function, we are including the partition_key
in the run_config
under the solids
key. You need to replace "my_solid"
with the actual name of the solid or op in your job that requires the partition_key
. Additionally, the partition_key
is included in the tags for the run.
Please ensure that your job and assetsScout
02/26/2024, 8:29 AMpartition_key
from the context during execution. If your assets are expecting the partition_key
in a different way, you will need to adjust the run_config
accordingly. If you continue to face issues, it may be helpful to review the asset definitions to ensure they are correctly set up to use the partition_key
from the execution context. If necessary, reach out to the Dagster community or support for further assistance.Paul Herzog
02/26/2024, 8:31 AMScout
02/26/2024, 8:31 AMdefine_asset_job
to create a job that materializes those assets, Dagster will automatically handle the partitioning for you. Each asset function that you've defined with a partitions_def
will have access to the partition_key
through its context, as you've shown in your asset functions (context.partition_key
).
For your scheduled job, you do not need to specify the function name of each asset in the run_config
. Instead, you should ensure that the schedule is correctly set up to trigger the job with the appropriate partition key for each partition.
Here's how you can adapt your schedule to work with partitioned assets:
python
# accuranker_partitions.py
from dagster import RunRequest
def my_partition_schedule(context):
# Get the current date to determine which partition to run
current_date = context.scheduled_execution_time.date()
# Define the base resource config that will be used for all runs
base_resource_config = {
"resources": {
"bigquery": {
"config": {
"location": "europe-west3",
"project": "adroit-medium-379911"
}
},
"google_cloud_storage": {
"config": {
"project": "adroit-medium-379911"
}
},
"io_manager": {
"config": {
"gcs_bucket": "adroit-medium-379911-accuranker-orchestration-test",
"gcs_prefix": "data_folder"
}
}
}
}
# Generate a RunRequest for each project ID
for project_id in project_ids:
partition_key = str(project_id)
# Determine project_name based on partition_key
# ...
run_config = {
**base_resource_config,
# No need to specify the function name of each asset
}
yield RunRequest(
run_key=f"{current_date}-{partition_key}",
run_config=run_config,
tags={
"dagster/partition": partition_key, # Use the standard Dagster tag for partition
"project_name": project_name
}
)
In this schedule, we're using the standard Dagster tag dagster/partition
to specify the partition key. When the job runs, Dagster will use this tag to determine the partition key and provide it to the assets' execution context.
Please ensure that your project_ids
list is defined and accessible within this file, and that project_name
is determined based on the partition_key
as needed.
If you continue to encounter issues, it may be helpful to check the following:
- Ensure that dagster-daemon
is running, as it is responsible forScout
02/26/2024, 8:33 AMpartitions_def
is correctly associated with your assets and that the partition keys match the expected values. - Confirm that the schedule is included in your repository definition and that it is active. If the problem persists, consider reaching out to the Dagster community or support with details about your setup and the issue you're facing.