Samuel Stütz
03/21/2022, 10:38 AMpartitions_def=DailyPartitionsDefinition(start_date=datetime(2022, 3, 13))
but then using AssetGroup/build_job there is no config parameter, yet the materialize selected always complains it needs configurations.
build_assets_job can be given PartitionedConfig but here I do not see what exactly is necessary to configure.
Are there any useful examples?jamie
03/21/2022, 3:06 PMassets/id_range_for_time.py
)Samuel Stütz
03/21/2022, 4:13 PMassets_job = build_assets_job(
"mat_assets",
source_assets=[source_asset],
assets=[my_table, report_table],
resource_defs={
"bq": bigquery_resource,
"gcs": gcs_resource,
"io_manager": gcs_pickle_io_manager,
# "mlflow": mlflow_tracking,
},
config={
"resources": {
"bq": BQ_CONFIG,
"io_manager": IO_CONFIG_GCS
}
})
when working with partitioned assets when using AssetGroup and build_job.
I did find a very basic example.
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-02-01"))
def dummy_asset_partitioned(context) -> DataFrame:
"""Creates a mini dummy asset which is partitioned"""
partition_key = context.output_asset_partition_key()
...
yield Output(df, metadata={...})
partitioned_asset_dummy_ag = AssetGroup(
assets=[dummy_asset_partitioned], source_assets=[],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(PandasCsvIOManagerWithOutputAssetPartitions()),
# "partition_bounds": ResourceDefinition.none_resource()
},
)
partitioned_asset_dummy_pipeline = partitioned_asset_dummy_ag.build_job("partitioned_asset_dummy")
which builds and works but when I go to Launchpad
ops:
order_history:
config:
assets:
input_partitions: {}
output_partitions:
result:
end: '2022-03-14'
start: '2022-03-14'
this config is generated in my actual use case this one is missing the
resources:
bq:
config:
project: ....
io_manager:
config:
gcs_bucket: ....
gcs_prefix: ...
Now if I append this resources in Launchpad it does work.
Where shall I put the configuration so clicking on materialize and backfill or just adding a new LaunchPad Config will fill in the correct default config for the required resources?
One solution I found is to do the following.
@daily_partitioned_config(start_date=datetime(2022, 3, 15))
def daily_config(start: datetime, _end: datetime):
return {
"ops": {
"order_history": {
"config": {
"assets": {
"input_partitions": {},
"output_partitions": {
"result": {
"start": start.strftime("%Y-%m-%d"),
"end": _end.strftime("%Y-%m-%d")
} } } } } },
"resources": {"bq": BQ_CONFIG, "io_manager": IO_CONFIG_GCS}
}
order_history_job = build_assets_job(
"order_history_job",
assets=[
# inventory, purchase,
order_history],
config=daily_config,
resource_defs={
"bq": bigquery_resource,
"gcs": gcs_resource,
"io_manager": gcs_pickle_io_manager,
}
)
Which works but is
In the partitioned asset function I do
start, end = context.output_asset_partitions_time_window("result")
partition_date: date = start
Is this a sensible solution or am I being too cumbersome with this?Samuel Stütz
03/21/2022, 4:19 PMops.{asset_op_name}.config.assets.input_partitions…
configuration.
Also it seems to be the asset group + build_job is missing any equivalent way of supplying configs. Similar to build_assets_jobjamie
03/21/2022, 4:30 PMconfigured
function. For example, when you make you asset group, you can do the following
my_asset_group = AssetGroup(
assets=[asset_1, asset_2], source_assets=[],
resource_defs={
"gcs": gcs_resource.configured({"project": "<project_name>"})
},
)
Samuel Stütz
03/21/2022, 4:31 PMSamuel Stütz
03/21/2022, 4:32 PM