Jeremy
06/21/2022, 1:45 PMJeremy
06/21/2022, 2:19 PM@op(
out={
"ip_metadata": Out(
asset_key=AssetKey(["schema","table"]),
asset_partitions=context.partition_key,
metadata={"partition_expr": "created_on"},
io_manager_key="snowflake_io_manager",
)
}
)
I don;t see how to set the asset_partitions, and am not sure that the asset materialization won;t be deleted before the new data is inserted.sandy
06/21/2022, 3:21 PMfrom dagster import op, graph, AssetsDefinition, repository, AssetGroup, DailyPartitionsDefinition
@op
def my_op(context):
context.output_asset_partition_key()
@graph
def my_graph():
return my_op()
my_asset = AssetsDefinition.from_graph(
my_graph, partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")
)
@repository
def repo():
return [AssetGroup([my_asset])]
the only difference is that I added a partitions_def
argument to the from_graph
invocation
does this work for you?Jeremy
06/21/2022, 3:36 PM@graph
def ip_augmentation_graph():
start, RUN_ID, new_ips_df = start_run()
return ip_metadata_logic(
RUN_ID,
new_ips_df,
get_ssl_cert_info(start, RUN_ID, new_ips_df),
get_clearbit_reveal(start, RUN_ID, new_ips_df),
get_registry_info(start, RUN_ID, new_ips_df),
get_nslookup_info(start, RUN_ID, new_ips_df),
)
ip_metadata_dev = ip_augmentation_graph.to_job(
partitions_def=HourlyPartitionsDefinition(start_date="2022-06-20-00:00"),
metadata={"partition_expr": "created_on"},
resource_defs={
**RESOURCES_LOCAL,
},
)
i’ve been doing this.Jeremy
06/21/2022, 3:37 PMsandy
06/21/2022, 3:37 PMJeremy
06/21/2022, 3:38 PMsandy
06/21/2022, 3:38 PMsandy
06/21/2022, 3:39 PMJeremy
06/21/2022, 3:39 PMJeremy
06/21/2022, 3:47 PM@graph
def ip_augmentation_graph():
start, RUN_ID, new_ips_df = start_run()
return ip_metadata_logic(
RUN_ID,
new_ips_df,
get_ssl_cert_info(start, RUN_ID, new_ips_df),
get_clearbit_reveal(start, RUN_ID, new_ips_df),
get_registry_info(start, RUN_ID, new_ips_df),
get_nslookup_info(start, RUN_ID, new_ips_df),
)
ip_metadata_dev = AssetsDefinition.from_graph(
ip_augmentation_graph,
partitions_def=HourlyPartitionsDefinition(start_date="2022-06-20-00:00"),
)
ip_metadata_dev_group = AssetGroup(
[ip_metadata_dev],
resource_defs={
**RESOURCES_LOCAL,
},
)
# no job is created?
Jeremy
06/21/2022, 3:47 PMJeremy
06/21/2022, 3:50 PM__ASSET_JOB_1 cannot be executed with the provided config. Please fix the following errors:
Missing required config entry "resources" at the root. Sample config for missing entry: {'resources': {'s3_dataframe_staging_client': {'config': {'bucket': '...', 'prefix': '...'}}}}
Jeremy
06/21/2022, 3:50 PMsandy
06/21/2022, 3:52 PMno job is created, just the asset (edited)if you want to create a job as well, you can do something like this:
from dagster import repository, define_asset_job
@repository
def repo():
return [ip_metadata_dev_group, define_asset_job("all_assets")]
or you can provide a value for the selection argument to select just a subset of the assetssandy
06/21/2022, 3:54 PMIf I try to materialize a partition i get an error:it looks like that error is there because the resources that you're depending on require configuration. you can supply that using
.configured
. e.g.
RESOURCES_LOCAL = {"s3_dataframe_staging_client": s3_dataframe_staging_client.configured({"bucket": ...}, {"prefix": ...})}Jeremy
06/21/2022, 3:54 PMJeremy
06/21/2022, 3:55 PMJeremy
06/21/2022, 3:55 PMJeremy
06/21/2022, 4:02 PMJeremy
06/21/2022, 4:30 PMresource with key ‘snowflake_query_manager’ required by op ‘start_run’ was not provided.
Jeremy
06/21/2022, 4:33 PMsandy
06/21/2022, 4:50 PMit’s in the AssetGroup but not the AssetsDefinitionI would think that would work. Mind sharing the code that's giving you that error?
Jeremy
06/21/2022, 4:51 PMsandy
06/21/2022, 4:51 PM