Nathan Skone
05/01/2023, 4:34 PM@repository
to the new Definitions
style jobs. So far the migration is going great, but one thing we noticed is that the Launchpad no longer populates with config when shift-clicking the “Materialize all” button. Instead now we get an error: Missing required config entries
and need to click the “Scaffold missing config” button. This feels like a step backwards in terms of our user experience. Is there a specific way that config needs to be provided to a Definition so that it will populate in the Launchpad? Note that the config is passed correctly when normally clicking the materialize button (not shift-clicking). We are on Dagster 1.2.7.yuhan
05/01/2023, 4:54 PMNathan Skone
05/01/2023, 4:56 PMjob = build_assets_job(
name=self.name,
assets=self.assets,
source_assets=self.assets,
resource_defs=self.get_resource_defs(),
tags=self.tags,
config=self.get_config(),
partitions_def=self.partitions_def,
)
@repository
def repo():
return {"jobs": {"job_name: job}}
After:
job = define_asset_job(
name="job_name",
config=configuration.load(),
selection=[datadog_metrics_asset],
}
Definitions(
assets=[datadog_metrics_asset],
jobs=[job],
schedules=[build_schedule_from_partitioned_job(job)],
sensors=get_datadog_sensors(job),
)
from dagster._core.definitions.assets_job import build_assets_job
and @repository
After we are using from dagster import define_asset_job
and from dagster import Definitions
.
In both cases the config is being passed into the job
yuhan
05/01/2023, 10:37 PMfrom dagster import define_asset_job, repository, asset, Definitions
@asset
def asset1():
return 1
@asset(config_schema={"path": str})
def asset2(context, asset1):
print(context.op_config["path"])
return 2
my_job = define_asset_job(
"boo", [asset1, asset2], config={"ops": {"asset2": {"config": {"path": "foo"}}}}
)
same asset and job code@repository
def repo():
return [asset1, asset2, my_job]
after using Definitions:
defs = Definitions(assets=[asset1, asset2], jobs=[my_job])
in both cases, i got the config that i specified on the define_asset_job
on populates in the UI as belowNathan Skone
05/01/2023, 11:00 PMyuhan
05/01/2023, 11:02 PMpartitions_def
arg to the define_asset_job
. could that be the issue?Nathan Skone
05/01/2023, 11:04 PM@asset
code out of my block, and that is the only place we are using a partitions_def
. Should a partitions_def
be applied to the job or the definition as well?partitions_def
on the define_asset_job
call?partitions_def
to the define_asset_job
call and it did not have any apparent affectyuhan
05/01/2023, 11:11 PMNathan Skone
05/01/2023, 11:12 PMPARTITION_DEFINITION = HourlyPartitionsDefinition(
start_date=datetime(2022, 12, 28), timezone="Etc/UTC"
)
@asset(
name="datadog_metrics_asset",
partitions_def=PARTITION_DEFINITION,
required_resource_keys={"warehouse"},
config_schema={
"bucket": str,
"redshift_role": str,
"datadog_api_key": Any,
"datadog_app_key": Any,
},
retry_policy=RetryPolicy( # delay is in seconds
max_retries=3, delay=10, backoff=Backoff.EXPONENTIAL
),
)
def datadog_metrics_asset(context) -> None:
start, end = context.output_asset_partitions_time_window()
<http://context.log.info|context.log.info>(
f"We will be querying DataDog for metric points starting at {start} and ending at {end}"
)
yuhan
05/01/2023, 11:12 PMconfig_schema={
"bucket": str,
"redshift_role": str,
"datadog_api_key": Any,
"datadog_app_key": Any,
},
Nathan Skone
05/01/2023, 11:14 PMyuhan
05/01/2023, 11:22 PMfrom dagster import (
define_asset_job,
repository,
asset,
HourlyPartitionsDefinition,
Definitions,
)
hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")
@asset(partitions_def=hourly_partitions_def)
def asset1():
return 1
@asset(partitions_def=hourly_partitions_def, config_schema={"path": str})
def asset2(context, asset1):
print(context.op_config["path"])
return 2
my_job = define_asset_job(
"boo",
[asset1, asset2],
partitions_def=hourly_partitions_def,
config={"ops": {"asset2": {"config": {"path": "foo"}}}},
)
@repository
def repo():
return [asset1, asset2, my_job]
the UI prompts “missing config” and when i select the partition key, it ends up populating the full config.
i believe this is an expected behavior as partition_key in an input to the config blob. in other words, for a partitioned job, when a partition key isn’t specified, dagster doesn’t know of the full config values, and therefore won’t populate the values.Nathan Skone
05/01/2023, 11:23 PMyuhan
05/01/2023, 11:24 PMNathan Skone
05/01/2023, 11:24 PM