<@U0667DNC02Y> How do i schedule a partitioned ass...
# ask-ai
p
@Scout How do i schedule a partitioned asset?
s
To schedule a partitioned asset in Dagster, you can use the
build_schedule_from_partitioned_job
function to construct a schedule whose interval matches the spacing of partitions in your job. Here's a step-by-step guide based on the documentation: 1. Define your partitioned asset with a
partitions_def
argument that specifies how the asset is partitioned. For example, you can use
DailyPartitionsDefinition
or
HourlyPartitionsDefinition
to partition your asset by day or hour, respectively. 2. Define an asset job using
define_asset_job
, passing in the partitioned asset and the
partitions_def
. 3. Use
build_schedule_from_partitioned_job
to create a schedule from the partitioned job. This function will automatically create a schedule that triggers at the interval defined by your partitions. Here is an example code snippet from the documentation:
Copy code
python from dagster import (     asset,     build_schedule_from_partitioned_job,     define_asset_job,     DailyPartitionsDefinition, )  partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")  @asset(partitions_def=partitions_def) def my_daily_asset():     ...  partitioned_asset_job = define_asset_job(     name="my_daily_asset_job",     selection=[my_daily_asset],     partitions_def=partitions_def, )  my_daily_asset_schedule = build_schedule_from_partitioned_job(     partitioned_asset_job, )
In this example,
my_daily_asset
is a daily-partitioned asset, and
my_daily_asset_schedule
is the schedule that will trigger the job to materialize the asset according to the daily partitions. Remember that the schedule created by
build_schedule_from_partitioned_job
will fill in the latest partition in the partition set that exists as of the tick time. If you need to adjust which partition is considered the latest, you can use the
end_offset
parameter in your partitions definition. For more specific use cases or configurations, you should refer to the Dagster documentation for additional details and examples.
p
@Scout My Partition looks like this, the ids are integers: project_ids = [ job_traveler_id, hello_jobs_id ] accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition( [str(project_id) for project_id in project_ids] )
s
Given your static partition definition with project IDs, you can create a schedule for a job that uses this partition definition. Since you have a static partition, you won't use
build_schedule_from_partitioned_job
, which is typically used for time-based partitions. Instead, you will manually create a schedule that specifies which partition to run. Here's a step-by-step guide on how to create a schedule for your static partitioned job: 1. Define your static partitioned asset or job with the
StaticPartitionsDefinition
you've provided. 2. Create a schedule that targets the specific partition you want to run. You can do this by defining a function that returns a
RunRequest
with the desired
partition_key
. Here's an example based on the documentation:
Copy code
python
from dagster import ScheduleDefinition, RunRequest, job, op, static_partitioned_config

# Assuming you have a list of project IDs
project_ids = [job_traveler_id, hello_jobs_id]

# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
    [str(project_id) for project_id in project_ids]
)

# Define your job with the static partitioned config
@static_partitioned_config(partition_keys=[str(project_id) for project_id in project_ids])
def my_partitioned_config(partition_key: str):
    # Return the configuration for the given partition key
    return {"ops": {"my_op": {"config": {"project_id": partition_key}}}}

# Define an op that will use the project_id from the config
@op(config_schema={"project_id": str})
def my_op(context):
    project_id = context.op_config["project_id"]
    <http://context.log.info|context.log.info>(f"Processing project_id: {project_id}")

# Define the job with the partitioned config
@job(config=my_partitioned_config)
def my_partitioned_job():
    my_op()

# Create a schedule for the static partitioned job
def my_schedule(context):
    # You can determine which partition to run based on some logic or a fixed value
    partition_to_run = "some_logic_to_determine_partition_key"
    run_config = my_partitioned_config(partition_to_run)
    return RunRequest(run_key=None, run_config=run_config)

# Define the schedule
my_static_partition_schedule = ScheduleDefinition(
    job=my_partitioned_job,
    cron_schedule="0 0 * * *",  # This is a daily schedule at midnight, adjust as needed
    execution_fn=my_schedule,
)
In this example,
my_partitioned_job
is the job that uses the static partitioned config. The
my_schedule
function is where you define the logic to select which partition to run. The
my_static_partition_schedule
is the schedule definition that will trigger the job according to the cron schedule you specify. Please adjust the
cron_schedule
to match the frequency at which you want to run the job and the logic inside `my_
schedule` to select the appropriate partition key based on your requirements.
p
currently my setup looks like this @Scout : accuranker_job = define_asset_job("accuranker_job", description="Accuranker updates data at 05:55-ish every day.", selection=AssetSelection.groups("Accuranker"), partitions_def=accuranker_partition_defintion_by_project_id) accuranker_schedule = ScheduleDefinition( job=accuranker_job, cron_schedule="0 6 * * *", execution_timezone="Europe/Vienna", default_status=DefaultScheduleStatus.RUNNING ) is this correct?
s
Based on the information you've provided, your setup for scheduling a partitioned asset job in Dagster looks mostly correct. You have defined an asset job named
accuranker_job
that targets assets in the "Accuranker" group and uses a static partition definition based on project IDs. You've also created a schedule named
accuranker_schedule
that is set to run daily at 6:00 AM in the "Europe/Vienna" timezone, and the schedule is set to be running by default. However, there is one aspect that needs attention. The
ScheduleDefinition
does not directly accept a
partitions_def
argument. The partition definition is associated with the assets themselves or the job when you define it using
define_asset_job
. The schedule will trigger the job, and the job will know which partitions to materialize based on its own definition. Here's a simplified version of your setup:
Copy code
python from dagster import define_asset_job, AssetSelection, ScheduleDefinition, DefaultScheduleStatus  # Assuming accuranker_partition_defintion_by_project_id is defined elsewhere as you showed before  # Define the asset job accuranker_job = define_asset_job(     "accuranker_job",     description="Accuranker updates data at 05:55-ish every day.",     selection=AssetSelection.groups("Accuranker"),     partitions_def=accuranker_partition_defintion_by_project_id  # This line is not needed if your assets are already partitioned )  # Define the schedule accuranker_schedule = ScheduleDefinition(     job=accuranker_job,     cron_schedule="0 6 * * *",     execution_timezone="Europe/Vienna",     default_status=DefaultScheduleStatus.RUNNING )
Please note that if your assets within the "Accuranker" group are already defined with the
accuranker_partition_defintion_by_project_id
, you do not need to specify the
partitions_def
again in the
define_asset_job
call. The job will inherit the partitioning information from the assets it is meant to materialize. If your assets are not already partitioned, you would need to define the partitioning at the asset level, not at the job level. The job will then materialize the correct partitions based on the schedule's execution time and the partition mapping logic. Make sure that your assets are correctly partitioned and that the job is set up to materialize those partitions. The schedule will then take care of running the job at the specified times.
p
i got this error when trying to run it without partitions_def in accuranker_job earlier today: dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run @Scout
s
The error message you encountered,
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
, suggests that there was an attempt to access a
partition_key
in a context where the run was not associated with a partition. This can happen if the job expects to be run with a partition but was triggered without specifying one. To resolve this issue, ensure that the job is correctly associated with the partition definition and that when the schedule triggers the job, it provides the necessary partition information. Here are a few things to check: 1. Asset Partitioning: Verify that the assets you are trying to materialize with the job are correctly partitioned. The partitioning should be defined at the asset level. 2. Job Definition: When defining the job with
define_asset_job
, you should not need to pass the
partitions_def
if the assets are already partitioned. The job will use the partitioning information from the assets. 3. Schedule Triggering: When the schedule triggers the job, it needs to specify which partition to materialize. This is typically done by providing a
run_config
that includes the
partition_key
for the job to run. However, since you are using a static partition scheme, you may need to implement custom logic to determine which partition key to use for each scheduled run. Here's an example of how you might modify the schedule to include the partition key in the `run_config`:
Copy code
python
from dagster import ScheduleDefinition, RunRequest

def my_partition_selector(context):
    # Custom logic to select the appropriate partition key
    # For example, you might select based on the current date
    partition_key = "some_logic_to_determine_partition_key"
    return RunRequest(
        run_key=None,  # Unique run key for this schedule tick, or None to always run
        run_config={"solids": {"my_solid": {"config": {"partition_key": partition_key}}}},
        tags={"partition": partition_key},  # Optional: helpful for filtering runs in the UI
    )

accuranker_schedule = ScheduleDefinition(
    job=accuranker_job,
    cron_schedule="0 6 * * *",
    execution_timezone="Europe/Vienna",
    default_status=DefaultScheduleStatus.RUNNING,
    execution_fn=my_partition_selector  # Custom execution function to select partition
)
In this example,
my_partition_selector
is a custom function that determines which partition key to use for each scheduled run. You would need to replace
"some_logic_to_determine_partition_key"
with the actual logic for selecting the appropriate partition key. Please adjust the
run_config
in the
RunRequest
to match the structure expected by your job. The key point is that the
partition_key
must be included in the
run_config
if your job e
xpects it. If you continue to encounter issues, please ensure that the assets and job are correctly configured for partitioning, and consider reaching out to the Dagster community or support for further assistance.
p
my partition looks like this, the logic is: loop through each project_id: #define project IDs which are than added to partition definition job_traveler_id = 123 #Project DE hello_jobs_id = 456 #Project EN project_ids = [ job_traveler_id, hello_jobs_id ] accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition( [str(project_id) for project_id in project_ids] @Scout
s
Given your static partition definition with project IDs, you can create a schedule that iterates over each project ID and triggers a run for the corresponding partition. To do this, you can define a schedule that generates a
RunRequest
for each partition key. Here's an example of how you might set up a schedule that loops through each project ID and triggers a run for each partition:
Copy code
python
from dagster import ScheduleDefinition, RunRequest, DefaultScheduleStatus

# Define your project IDs
job_traveler_id = 123  # Project DE
hello_jobs_id = 456    # Project EN

project_ids = [
    job_traveler_id,
    hello_jobs_id
]

# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
    [str(project_id) for project_id in project_ids]
)

# Define the asset job (assuming this is already done correctly)
accuranker_job = define_asset_job(
    "accuranker_job",
    description="Accuranker updates data at 05:55-ish every day.",
    selection=AssetSelection.groups("Accuranker"),
    partitions_def=accuranker_partition_defintion_by_project_id
)

# Define a schedule that generates a RunRequest for each partition key
def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",  # Unique identifier for the run
            run_config={
                # Include the partition key in the run config
                # Adjust this to match the structure expected by your job
                "solids": {
                    "my_solid": {
                        "config": {
                            "project_id": partition_key
                        }
                    }
                }
            },
            tags={"partition": partition_key}  # Optional: helpful for filtering runs in the UI
        )

# Create the schedule definition
accuranker_schedule = ScheduleDefinition(
    job=accuranker_job,
    cron_schedule="0 6 * * *",  # Runs daily at 6:00 AM
    execution_timezone="Europe/Vienna",
    default_status=DefaultScheduleStatus.RUNNING,
    execution_fn=my_partition_schedule
)
In this example, the
my_partition_schedule
function is a generator that yields a
RunRequest
for each project ID. Each
RunRequest
includes a unique
run_key
and the
run_config
necessary to run the job for the specified partition. Please adjust the
run_config
in the
RunRequest
to match the structure expected by your job. The key point is that the
partition_key
must be included in the
run_config
if your job e
xpects it. This setup assumes that your job and assets are correctly configured to handle partition keys. If you encounter any issues, please ensure that the assets and job are correctly set up for partitioning.
p
i have partition in a different file. adapt your example so i can put "my_partition_schedule" also in this file and load it to init.py @Scout
s
If you have your partition definition and job in a different file, you can still define the schedule in that file and then import it into your
__init__.py
or any other module where you want to load your schedules. Here's how you can organize your code: Let's assume you have a file named
partitions.py
where your partition definition and job are defined:
Copy code
python
# partitions.py

from dagster import define_asset_job, AssetSelection, StaticPartitionsDefinition

# Define your project IDs
job_traveler_id = 123  # Project DE
hello_jobs_id = 456    # Project EN

project_ids = [
    job_traveler_id,
    hello_jobs_id
]

# Create your static partition definition
accuranker_partition_defintion_by_project_id = StaticPartitionsDefinition(
    [str(project_id) for project_id in project_ids]
)

# Define the asset job
accuranker_job = define_asset_job(
    "accuranker_job",
    description="Accuranker updates data at 05:55-ish every day.",
    selection=AssetSelection.groups("Accuranker"),
    partitions_def=accuranker_partition_defintion_by_project_id
)
Now, in the same file or a different one, you can define your schedule function and the schedule itself:
Copy code
python
# schedules.py

from dagster import ScheduleDefinition, RunRequest, DefaultScheduleStatus
from .partitions import accuranker_job, project_ids  # Import from your partitions file

# Define a schedule that generates a RunRequest for each partition key
def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",
            run_config={
                # Include the partition key in the run config
                # Adjust this to match the structure expected by your job
            },
            tags={"partition": partition_key}
        )

# Create the schedule definition
accuranker_schedule = ScheduleDefinition(
    job=accuranker_job,
    cron_schedule="0 6 * * *",
    execution_timezone="Europe/Vienna",
    default_status=DefaultScheduleStatus.RUNNING,
    execution_fn=my_partition_schedule
)
Finally, in your
__init__.py
or the module where you initialize your Dagster repository, you can import the schedule:
Copy code
python
# __init__.py

from .schedules import accuranker_schedule

# ... your repository definition that includes accuranker_schedule
Make sure to adjust the import paths based on your actual file structure. The
.
in the import statement represents a relative import from the current package. If your files are in different packages, you'll need to adjust the import accordi
ngly. By organizing your code this way, you can keep your partition definitions, jobs, and schedules modular and maintainable.
p
schedule is not loading now @Scout
s
If your schedule is not loading, there are a few steps you can take to troubleshoot the issue: 1. Check for Syntax Errors: Ensure that there are no syntax errors in your code. Syntax errors can prevent the schedule from being recognized and loaded properly. 2. Verify Import Paths: Make sure that the import paths for your schedule are correct. If the paths are incorrect, the schedule will not be found and loaded. 3. Reload Code Locations: If you're using Dagit, you may need to reload your code locations to ensure that the latest code is being used. You can do this by clicking Deployment in the top navigation of Dagit, then in the Code locations tab, click Reload (local webserver) or Redeploy (Dagster Cloud). 4. Check the Repository Definition: Verify that the schedule is included in a
Definitions
object within your repository definition. The repository should be aware of the schedule for it to be loaded. 5. Review the Schedule Definition: Double-check the
ScheduleDefinition
to ensure that all required fields are provided and correctly configured, including
job
,
cron_schedule
,
execution_timezone
, and
execution_fn
. 6. Check dagster-daemon: If you're running Dagster locally, ensure that
dagster-daemon
is running. The daemon process is responsible for executing scheduled runs. 7. Review Error Messages: If there are any error messages in Dagit or in the console where you're running Dagster, review them for clues about what might be going wrong. 8. Consult the Logs: Check the logs for any error messages related to the schedule. The logs can provide detailed information about issues that are preventing the schedule from loading. 9. Check for Updates: If you recently updated Dagster, ensure that all your code is compatible with the new version. Sometimes, updates can introduce changes that require modifications to your existing code. If you've gone through these steps and the schedule is still not loading, you may want to reach out to the Dagster community or support for further assistance. Provide as much detail as possible, including the steps you've taken to troubleshoot, any error messages you've encountered, and the relevant sections of your code.
p
how to find out which structure of config is needed? @Scout
s
To determine the structure of the config needed for a Dagster job, asset, or resource, you should refer to the config schema defined for each component. The config schema specifies the shape and type of configuration data that Dagster expects when executing these components. Here's how you can find out the required config structure: 1. Ops and Solids: If you have ops or solids, look at the
config_schema
parameter in their definitions. This will tell you what config they expect.
Copy code
python
   @op(config_schema={"param": str})
   def my_op(context):
       param = context.op_config["param"]
       # ...
2. Resources: For resources, the
config_schema
is defined in the resource decorator or function.
Copy code
python
   @resource(config_schema={"connection_string": str})
   def my_resource(context):
       connection_string = context.resource_config["connection_string"]
       # ...
3. Assets: Assets can also have a
config_schema
if they require configuration.
Copy code
python
   @asset(config_schema={"file_path": str})
   def my_asset(context):
       file_path = context.op_config["file_path"]
       # ...
4. Jobs: For jobs, especially those that materialize assets or execute ops, you will need to provide a
run_config
that satisfies the combined config schemas of all the ops and resources used within the job.
Copy code
python
   @job
   def my_job():
       my_op()
       # ...

   # When executing the job, provide the run_config
   result = my_job.execute_in_process(
       run_config={
           "ops": {
               "my_op": {
                   "config": {
                       "param": "value"
                   }
               }
           },
           "resources": {
               "my_resource": {
                   "config": {
                       "connection_string": "<postgresql://localhost:5432>"
                   }
               }
           }
       }
   )
5. Schedules and Sensors: For schedules and sensors that trigger jobs, you will need to provide a
run_config
that matches the job's requirements.
Copy code
python
   @schedule(job=my_job, cron_schedule="0 * * * *", execution_timezone="UTC")
   def my_schedule(context):
       return RunRequest(
           run_config={
               # Include the necessary config for the job
           }
       )
6. Documentation and Examples: The documentation provides detailed information on how to define and use config schemas. It also includes examples that can help you understand the expected structure. 7. Dagit: When using Dagit, you can use the playground to construct the
run_config
. Dagit provides a form-based interface to fill in the required config based on the schema, and it will validate the config for you. 8. **
Error Messages**: If you run a job and the config is incorrect or incomplete, Dagster will provide error messages indicating what is missing or malformed. These messages can guide you in constructing the correct config. Remember that the config schema is a contract that specifies what configuration is expected. It's important to provide all required fields and to match the expected types when supplying the
run_config
for a job or when configuring resources and ops.
p
when i open launchpad for my partitioned asset i see this here @Scout : resources: bigquery: config: location: europe-west3 project: adroit-medium-11111 google_cloud_storage: config: project: adroit-medium-11111 io_manager: config: gcs_bucket: adroit-medium-11111-accuranker-orchestration-test gcs_prefix: data_folder how should i update my config now? def my_partition_schedule(context): # Get the current date to determine which partition to run current_date = context.scheduled_execution_time.date() # Generate a RunRequest for each project ID for project_id in project_ids: partition_key = str(project_id) if partition_key == "494891": project_name = "Job Traveler" elif partition_key == "495588": project_name = "Hello Jobs" yield RunRequest( run_key=f"{current_date}-{partition_key}", # Unique identifier for the run run_config={ # Include the partition key in the run config # Adjust this to match the structure expected by your job "solids": { "my_solid": { "config": { "project_id": partition_key } } } }, tags={"partition": partition_key, "project_name": project_name} # Optional: helpful for filtering runs in the UI )
s
Based on the launchpad configuration you've provided, it looks like you have a set of resources configured for BigQuery and Google Cloud Storage, as well as an
io_manager
configured for GCS. To update your
run_config
in the
my_partition_schedule
function to include these resource configurations, you'll need to include the
resources
block with the appropriate config for each resource. Here's how you can update your
my_partition_schedule
function to include the necessary resource configurations:
Copy code
python
def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()

    # Define the base resource config that will be used for all runs
    base_resource_config = {
        "resources": {
            "bigquery": {
                "config": {
                    "location": "europe-west3",
                    "project": "adroit-medium-11111"
                }
            },
            "google_cloud_storage": {
                "config": {
                    "project": "adroit-medium-11111"
                }
            },
            "io_manager": {
                "config": {
                    "gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
                    "gcs_prefix": "data_folder"
                }
            }
        }
    }

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        if partition_key == "494891":
            project_name = "Job Traveler"
        elif partition_key == "495588":
            project_name = "Hello Jobs"
        else:
            project_name = "Unknown Project"  # Default name if project_id doesn't match

        # Merge the base resource config with the specific run config for the partition
        run_config = {
            **base_resource_config,
            "solids": {
                "my_solid": {
                    "config": {
                        "project_id": partition_key
                    }
                }
            }
        }

        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",  # Unique identifier for the run
            run_config=run_config,
            tags={
                "partition": partition_key,
                "project_name": project_name  # Optional: helpful for filtering runs in the UI
            }
        )
In this updated function,
base_resource_config
contains the resource configurations that you saw in the launchpad. For each
RunRequest
, we merge this base config with the specific config for the solid that requires the
project_id
. This way, each run will have the necessary resource configurations as well as the specific solid config. Ple
ase note that the
solids
key in the
run_config
should match the structure expected by your job. If your job uses ops instead of solids, or if the structure of the config is different, you will need to adjust the
run_config
accordingly. The
project_name
tag is also included for each run, which can be useful for identifying runs in the UI.
p
my job looks like this @Scout , how do i update the config file for this? accuranker_job = define_asset_job("accuranker_job", description="Accuranker updates data at 05:55-ish every day.", selection=AssetSelection.groups("Accuranker"), partitions_def=accuranker_partition_defintion_by_project_id) and one asset looks like this: @asset(partitions_def=accuranker_partition_defintion_by_project_id, description="Fetching All Ranking Data from Accuranker.") def api_all_ranking_data(context: AssetExecutionContext, accuranker_api_client: AccurankerAPI) -> List: accuranker_id = context.partition_key params_kw = { "fields": "id,keyword,tags,search_volume.search_volume,ranks.rank,ranks.share_of_voice,ranks.search_intent" } response_as_list = accuranker_api_client.accuranker_api_call(payload=params_kw, account_id=accuranker_id, endpoint_path="keywords") context.add_output_metadata( metadata={ "num_records": len(response_as_list), "payload": MetadataValue.json(params_kw), "account_id": accuranker_id, "response_preview": MetadataValue.json(response_as_list) } ) return response_as_list
s
Given the structure of your job and asset, you need to ensure that the
run_config
you provide when scheduling the job includes the necessary configuration for the resources and any other required config specific to the asset's execution. Since your asset
api_all_ranking_data
requires a
partition_key
to determine the
accuranker_id
, you need to ensure that the
run_config
for each scheduled run includes the partition key. The partition key is typically passed as part of the context when the asset is executed within a partitioned job. Here's how you can update the
my_partition_schedule
function to include the necessary configuration:
Copy code
python
from dagster import RunRequest

def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()

    # Define the base resource config that will be used for all runs
    base_resource_config = {
        "resources": {
            "bigquery": {
                "config": {
                    "location": "europe-west3",
                    "project": "adroit-medium-11111"
                }
            },
            "google_cloud_storage": {
                "config": {
                    "project": "adroit-medium-11111"
                }
            },
            "io_manager": {
                "config": {
                    "gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
                    "gcs_prefix": "data_folder"
                }
            }
        }
    }

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        # Construct the run config for the partitioned run
        run_config = {
            **base_resource_config,
            "ops": {
                # The op name here should match the name of the op within your job
                # If your asset is used directly, you may not need to specify this
                "api_all_ranking_data": {
                    "config": {
                        # Include any op-specific config here
                    }
                }
            },
            "solids": {
                # If your job uses solids instead of ops, use this key
            }
        }

        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",
            run_config=run_config,
            tags={
                "partition": partition_key,
                # Include any additional tags you want to attach to the run
            }
        )
In this updated function, we're assuming that the
api_all_ranking_data
asset is represented as an op within the job. If your job uses the older
solids
terminology, you should replace
"ops"
with
"solids"
in the `run
_config`. The
run_config
should include the base resource configuration and any op-specific configuration that is required. Please note that the actual structure of the
run_config
may vary depending on the specifics of your job and assets. You should adjust the
"api_all_ranking_data"
key to match the actual name of the op or solid within your job if it's different. If there are additional configurations required for the op or solid, you should include them within the
config
dictionary. Lastly, ensure that the
partition_key
is correctly passed to the asset during execution. In a partitioned job, Dagster will automatically provide the
partition_key
to the asset's context, so you typically do not need to include it explicitly in the
run_config
. However, if you have custom logic that requires it, you should include it as needed.
p
cant i just provide the project_ids for all assets loaded? @Scout
s
In Dagster, when you define a partitioned asset job, the job is aware of the partitions through the
partitions_def
you provide. If your assets are partitioned by project IDs, as in your case, the job will expect to receive a
partition_key
that corresponds to one of those project IDs when it runs. When you schedule a partitioned job, you don't need to provide the
partition_key
for each asset directly in the
run_config
. Instead, Dagster handles this automatically based on the partition that is being materialized. The
partition_key
is made available in the asset's execution context, which your asset function can access. If you want to run the job for all partitions (project IDs) at the scheduled time, you would typically create a separate run for each partition. Here's an example of how you might adjust the schedule to create a run for each project ID:
Copy code
python
from dagster import RunRequest

def my_partition_schedule(context):
    # Define the base resource config that will be used for all runs
    base_resource_config = {
        "resources": {
            "bigquery": {
                "config": {
                    "location": "europe-west3",
                    "project": "adroit-medium-11111"
                }
            },
            "google_cloud_storage": {
                "config": {
                    "project": "adroit-medium-11111"
                }
            },
            "io_manager": {
                "config": {
                    "gcs_bucket": "adroit-medium-11111-accuranker-orchestration-test",
                    "gcs_prefix": "data_folder"
                }
            }
        }
    }

    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()

    # Generate a RunRequest for each project ID (partition)
    for project_id in project_ids:
        partition_key = str(project_id)
        run_config = {
            **base_resource_config,
            # No need to specify the partition_key here; Dagster will provide it
        }

        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",
            run_config=run_config,
            tags={
                "partition": partition_key,
                # Include any additional tags you want to attach to the run
            }
        )
In this schedule, a
RunRequest
is created for each
project_id
in the
project_ids
list. Dagster will automatically use the
partition_key
to provide the correct context to your asset when the job runs. Remember to ensure that your asset functions are designed to use the
partition_key
from the context, as shown in your
api_all_ranking_data
asset example. Dagster will handle the rest, ensuring that each asset is materialized with the correc
t
partition_key
.
p
i still get this error "Cannot access partition_key for a non-partitioned run", my code looks like this: @Scout in accuranker_partitions.py: def my_partition_schedule(context): # Get the current date to determine which partition to run current_date = context.scheduled_execution_time.date() # Define the base resource config that will be used for all runs base_resource_config = { "resources": { "bigquery": { "config": { "location": "europe-west3", "project": "adroit-medium-379911" } }, "google_cloud_storage": { "config": { "project": "adroit-medium-379911" } }, "io_manager": { "config": { "gcs_bucket": "adroit-medium-379911-accuranker-orchestration-test", "gcs_prefix": "data_folder" } } } } # Generate a RunRequest for each project ID for project_id in project_ids: partition_key = str(project_id) if partition_key == "494891": project_name = "JobTraveler" elif partition_key == "495588": project_name = "HelloJobs" run_config = { **base_resource_config } yield RunRequest( run_key=f"{current_date}-{project_name}", # Unique identifier for the run run_config=run_config, tags={"partition": partition_key, "project_name": project_name} # Optional: helpful for filtering runs in the UI ) in init.py: from data_pipelines.partitions.accuranker_partitions import accuranker_partition_defintion_by_project_id, my_partition_schedule accuranker_job = define_asset_job("accuranker_job", description="Accuranker updates data at 05:55-ish every day.", selection=AssetSelection.groups("Accuranker"), partitions_def=accuranker_partition_defintion_by_project_id) accuranker_schedule = ScheduleDefinition( job=accuranker_job, cron_schedule="27 9 * * *", execution_timezone="Europe/Vienna", execution_fn=my_partition_schedule, default_status=DefaultScheduleStatus.RUNNING )
s
The error message "Cannot access partition_key for a non-partitioned run" indicates that the job is attempting to access a
partition_key
during execution, but the run was not initiated with a partition key. This can happen if the job is expected to be partitioned, but the schedule does not provide the partition information correctly. In your
my_partition_schedule
function, you are yielding
RunRequest
objects for each project ID, but you are not specifying which partition each run corresponds to. In Dagster, when you have a partitioned job, you need to specify the
partition_key
for each run so that the job knows which partition to execute. To resolve this issue, you should include the
partition_key
in the tags of the
RunRequest
and ensure that the job is configured to use the partition key from the context during execution. However, since you are using a
StaticPartitionsDefinition
, you might need to explicitly pass the
partition_key
in the
run_config
for the job to recognize it. Here's how you might modify the
my_partition_schedule
function to include the `partition_key`:
Copy code
python
# accuranker_partitions.py

from dagster import RunRequest

def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()
    # Define the base resource config that will be used for all runs
    base_resource_config = {
        # ... your resource config
    }

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        # Determine project_name based on partition_key
        # ...

        run_config = {
            **base_resource_config,
            "solids": {
                # Assuming 'my_solid' is the name of the solid that requires the partition_key
                # You need to replace 'my_solid' with the actual name of your solid or op
                "my_solid": {
                    "config": {
                        "partition_key": partition_key  # Pass the partition_key in the run_config
                    }
                }
            }
        }

        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",
            run_config=run_config,
            tags={
                "partition": partition_key,  # Include the partition_key in the tags
                "project_name": project_name
            }
        )
In this updated function, we are including the
partition_key
in the
run_config
under the
solids
key. You need to replace
"my_solid"
with the actual name of the solid or op in your job that requires the
partition_key
. Additionally, the
partition_key
is included in the tags for the run. Please ensure that your job and assets
are set up to use the
partition_key
from the context during execution. If your assets are expecting the
partition_key
in a different way, you will need to adjust the
run_config
accordingly. If you continue to face issues, it may be helpful to review the asset definitions to ensure they are correctly set up to use the
partition_key
from the execution context. If necessary, reach out to the Dagster community or support for further assistance.
p
i dont have solids, i only have assets. like this here: @Scout @asset(partitions_def=accuranker_partition_defintion_by_project_id, description="Fetching All Ranking Data from Accuranker.") def api_all_ranking_data(context: AssetExecutionContext, accuranker_api_client: AccurankerAPI) -> List: accuranker_id = context.partition_key params_kw = { "fields": "id,keyword,tags,search_volume.search_volume,ranks.rank,ranks.share_of_voice,ranks.search_intent" } response_as_list = accuranker_api_client.accuranker_api_call(payload=params_kw, account_id=accuranker_id, endpoint_path="keywords") context.add_output_metadata( metadata={ "num_records": len(response_as_list), "payload": MetadataValue.json(params_kw), "account_id": accuranker_id, "response_preview": MetadataValue.json(response_as_list) } ) return response_as_list @asset(partitions_def=accuranker_partition_defintion_by_project_id, description="Fetching Google for Job Data from Accuranker (API Filter: 6067).") def api_google_for_jobs(context: AssetExecutionContext, accuranker_api_client: AccurankerAPI) -> List: accuranker_id = context.partition_key params_kw = { "fields": "id,keyword,tags,search_volume.search_volume,ranks.rank,ranks.share_of_voice,ranks.search_intent", "filter":6067 } response_as_list = accuranker_api_client.accuranker_api_call(payload=params_kw, account_id=accuranker_id, endpoint_path="keywords") context.add_output_metadata( metadata={ "num_records": len(response_as_list), "payload": MetadataValue.json(params_kw), "account_id": accuranker_id, "response_preview": MetadataValue.json(response_as_list) } ) return response_as_list how to adapt the config ? do i need to provide the function name of each asset? like "api_google_for_jobs"?
s
When you have partitioned assets and you're using
define_asset_job
to create a job that materializes those assets, Dagster will automatically handle the partitioning for you. Each asset function that you've defined with a
partitions_def
will have access to the
partition_key
through its context, as you've shown in your asset functions (
context.partition_key
). For your scheduled job, you do not need to specify the function name of each asset in the
run_config
. Instead, you should ensure that the schedule is correctly set up to trigger the job with the appropriate partition key for each partition. Here's how you can adapt your schedule to work with partitioned assets:
Copy code
python
# accuranker_partitions.py

from dagster import RunRequest

def my_partition_schedule(context):
    # Get the current date to determine which partition to run
    current_date = context.scheduled_execution_time.date()
    # Define the base resource config that will be used for all runs
    base_resource_config = {
        "resources": {
            "bigquery": {
                "config": {
                    "location": "europe-west3",
                    "project": "adroit-medium-379911"
                }
            },
            "google_cloud_storage": {
                "config": {
                    "project": "adroit-medium-379911"
                }
            },
            "io_manager": {
                "config": {
                    "gcs_bucket": "adroit-medium-379911-accuranker-orchestration-test",
                    "gcs_prefix": "data_folder"
                }
            }
        }
    }

    # Generate a RunRequest for each project ID
    for project_id in project_ids:
        partition_key = str(project_id)
        # Determine project_name based on partition_key
        # ...

        run_config = {
            **base_resource_config,
            # No need to specify the function name of each asset
        }

        yield RunRequest(
            run_key=f"{current_date}-{partition_key}",
            run_config=run_config,
            tags={
                "dagster/partition": partition_key,  # Use the standard Dagster tag for partition
                "project_name": project_name
            }
        )
In this schedule, we're using the standard Dagster tag
dagster/partition
to specify the partition key. When the job runs, Dagster will use this tag to determine the partition key and provide it to the assets' execution context. Please ensure that your
project_ids
list is defined and accessible within this file, and that
project_name
is determined based on the
partition_key
as needed. If you continue to encounter issues, it may be helpful to check the following: - Ensure that
dagster-daemon
is running, as it is responsible for
executing scheduled runs. - Verify that the
partitions_def
is correctly associated with your assets and that the partition keys match the expected values. - Confirm that the schedule is included in your repository definition and that it is active. If the problem persists, consider reaching out to the Dagster community or support with details about your setup and the issue you're facing.