is <this> actually fixed? I think I’m bumping up a...
# ask-community
j
is this actually fixed? I think I’m bumping up against it…
🤖 1
Copy code
@op(
    out={
        "ip_metadata": Out(
            asset_key=AssetKey(["schema","table"]),
            asset_partitions=context.partition_key,
            metadata={"partition_expr": "created_on"},
            io_manager_key="snowflake_io_manager",
        )
    }
)
I don;t see how to set the asset_partitions, and am not sure that the asset materialization won;t be deleted before the new data is inserted.
s
hey Jeremy - I just tried the following on 0.15.0 and it worked:
Copy code
from dagster import op, graph, AssetsDefinition, repository, AssetGroup, DailyPartitionsDefinition


@op
def my_op(context):
    context.output_asset_partition_key()


@graph
def my_graph():
    return my_op()


my_asset = AssetsDefinition.from_graph(
    my_graph, partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")
)


@repository
def repo():
    return [AssetGroup([my_asset])]
the only difference is that I added a
partitions_def
argument to the
from_graph
invocation does this work for you?
j
Copy code
@graph
def ip_augmentation_graph():
    start, RUN_ID, new_ips_df = start_run()
    return ip_metadata_logic(
        RUN_ID,
        new_ips_df,
        get_ssl_cert_info(start, RUN_ID, new_ips_df),
        get_clearbit_reveal(start, RUN_ID, new_ips_df),
        get_registry_info(start, RUN_ID, new_ips_df),
        get_nslookup_info(start, RUN_ID, new_ips_df),
    )

ip_metadata_dev = ip_augmentation_graph.to_job(
    partitions_def=HourlyPartitionsDefinition(start_date="2022-06-20-00:00"),
    metadata={"partition_expr": "created_on"},
    resource_defs={
        **RESOURCES_LOCAL,
    },
)
i’ve been doing this.
I wasn;t able to get the AssetsDefinition to compile
s
does the example I sent you above not compile?
j
I’ll try it again. it complained about assets not having the right inputs (if memory serves me)
s
(I mean the one in this thread - not the one from yesterday)
(the one in this thread is simpler, so might be a better place to start)
j
I’ll rewrite mine to match your pattern and see what happens
Copy code
@graph
def ip_augmentation_graph():
    start, RUN_ID, new_ips_df = start_run()
    return ip_metadata_logic(
        RUN_ID,
        new_ips_df,
        get_ssl_cert_info(start, RUN_ID, new_ips_df),
        get_clearbit_reveal(start, RUN_ID, new_ips_df),
        get_registry_info(start, RUN_ID, new_ips_df),
        get_nslookup_info(start, RUN_ID, new_ips_df),
    )

ip_metadata_dev = AssetsDefinition.from_graph(
    ip_augmentation_graph,
    partitions_def=HourlyPartitionsDefinition(start_date="2022-06-20-00:00"),
)

ip_metadata_dev_group = AssetGroup(
    [ip_metadata_dev],
    resource_defs={
        **RESOURCES_LOCAL,
    },
)

# no job is created?
this compiles.
If I try to materialize a partition i get an error:
Copy code
__ASSET_JOB_1 cannot be executed with the provided config. Please fix the following errors:

Missing required config entry "resources" at the root. Sample config for missing entry: {'resources': {'s3_dataframe_staging_client': {'config': {'bucket': '...', 'prefix': '...'}}}}
no job is created, just the asset
s
no job is created, just the asset (edited)
if you want to create a job as well, you can do something like this:
Copy code
from dagster import repository, define_asset_job

@repository
def repo():
    return [ip_metadata_dev_group, define_asset_job("all_assets")]
or you can provide a value for the selection argument to select just a subset of the assets
If I try to materialize a partition i get an error:
it looks like that error is there because the resources that you're depending on require configuration. you can supply that using
.configured
. e.g. RESOURCES_LOCAL = {"s3_dataframe_staging_client": s3_dataframe_staging_client.configured({"bucket": ...}, {"prefix": ...})}
j
i was configuring in the job def, not the source code.
i could do that though. I’ll make the job and see if the partitioning works.
thanks!
where do i put the “required_resource_keys” now?
resource with key ‘snowflake_query_manager’ required by op ‘start_run’ was not provided.
it’s in the AssetGroup but not the AssetsDefinition
s
it’s in the AssetGroup but not the AssetsDefinition
I would think that would work. Mind sharing the code that's giving you that error?
j
can i dm it to you?
s
that works