i'm trying to follow the docs for creating a parti...
# ask-community
l
i'm trying to follow the docs for creating a partition asset job (and eventually want to schedule partitioned assets to materialize daily). My asset works and I can manually materialize and backfill assets, but when I try to add a job (so that I can eventually add a schedule) the code deployment fails `dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code serv`er. I keep going over the docs and am not sure where I am going wrong. If someone could point me to a good example or help me 1. understand if this is the correct way to add schedules for materializing partitioned assets daily - the plan is to get the job working and schedule the job using a scheduler 2. help me understand why I am unable to load this job. Please see code below
🤖 1
Copy code
daily_partitions_def = DailyPartitionsDefinition(start_date="2022-01-19")

@asset(
    group_name="my_table_asset",
    partitions_def=daily_partitions_def,
    required_resource_keys={"some_api_client"}
    )
def my_table_asset(context):
    date = context.asset_partition_key_for_output()
    context.resources.some_api_client.fetch_data(from_date_str=date)
    ...


partitioned_asset_job = define_asset_job(
    name="my_table_asset_job",
    selection=AssetSelection.assets(my_table_asset),
    partitions_def=daily_partitions_def,
)

@repository
def extract_load():
    return [
        partitioned_asset_job, # when I remove this the code doesn't fail
        with_resources(
            [my_table_asset], resource_defs = resources_by_env[os.getenv("DAGSTER_DEPLOYMENT")]
            ),
    ]
r
And you’re running into this during local development? Do you have any stack trace to work with here?
This minimal example works for me, running
dagit -f ./minimal_example.py
Copy code
# ./minimal_example.py

from dagster import (
    AssetSelection,
    DailyPartitionsDefinition,
    ScheduleDefinition,
    asset,
    define_asset_job,
    repository,
)

partitions_def = DailyPartitionsDefinition(start_date="2022-11-01")


@asset(
    group_name="staging",
    partitions_def=partitions_def,
)
def my_asset():
    pass


# Materialize all assets in the repository
partitioned_asset_job = define_asset_job(
    "partitioned_asset_job",
    AssetSelection.assets(my_asset),
    partitions_def=partitions_def,
)


@repository
def minimal_example():
    return [
        ScheduleDefinition(
            job=partitioned_asset_job,
            cron_schedule="@weekly",
        ),
        my_asset,
    ]
l
hm... yeh, I must think there must just be an error somewhere... i'll look for it and grab the stack trace if I can't identify. Thanks for this.
I wonder if I was turned around somewhere because the docs seem to have shifted to the new Definitions api and I am still working with repositories, not sure why that would change much, though...
r
The introduction of the
Definitions
API should not break existing references of
@repository
it was a backwards compatible change
l
oh yeh, i meant more of a user error issue reading through the doc. sorry for that confusion
r
Here’s the equivalent minimal example using
Definitions
, just for reference
Copy code
from dagster import (
    AssetSelection,
    DailyPartitionsDefinition,
    Definitions,
    ScheduleDefinition,
    asset,
    define_asset_job,
)

partitions_def = DailyPartitionsDefinition(start_date="2022-11-01")


@asset(
    group_name="staging",
    partitions_def=partitions_def,
)
def my_asset():
    pass


partitioned_asset_job = define_asset_job(
    "partitioned_asset_job",
    AssetSelection.assets(my_asset),
    partitions_def=partitions_def,
)


defs = Definitions(
    assets=[my_asset],
    schedules=[
        ScheduleDefinition(
            job=partitioned_asset_job,
            cron_schedule="@weekly",
        )
    ],
)
thank you box 1
m
@rex My code is very similar to what you put here with the exception that I am using static partitions. The job runs great when I trigger it manually. I can backfill partitions fine. However, when the schedule runs, it fails
dagster._check.CheckError: Failure condition: Tried to access partition_key_range for a non-partitioned run
. Any ideas?