https://dagster.io/ logo
#ask-community
Title
# ask-community
j

Josh Kutsko

05/12/2023, 12:41 AM
Hi - I’m trying to set up software-defined assets for my hightouch syncs and running into an issue that’s probably simple. I have the following set-up:
Copy code
hightouch_instance = ht_resource.configured(
    {"api_key": os.getenv("HIGHTOUCH_API_KEY")}
)

run_ht_deal_sync_op = hightouch_sync_op.configured(
    {"sync_id": HT_DEAL_DATA}, name="platform_deal_sync"
)


@asset(io_manager_key="io_manager", required_resource_keys={"hightouch"})
def hightouch_deal_sync():
    run_ht_deal_sync_op()
But I am getting the following error when running the asset through dagster locally: “Compute function of op ‘platform_deal_sync’ has context argument, but no context was provided when invoking.” If I’m understanding correctly, the hightouch sync op needs the configuration argument to execute, but I thought that the ht_resource.configured() call should set that up already.
1
dagster bot responded by community 1
🤖 1
Ah - I was able to get this working by creating a context using build_op_context(). However, I am now running into a new issue where the resources I set up in the main file’s Definitions aren’t included when I materialize this asset.
Hmm, according to the docs, manually building an op context seems to be mostly used for testing, so there’s probably a better way to pipe the resources I defined into the op execution.
l

Le Yang

05/12/2023, 1:26 AM
I think you are missing some fundamental Dagster concepts. You should review the docs again, especially the core concepts.
j

Josh Kutsko

05/12/2023, 1:28 AM
Can you elaborate more? A lot of my issue here is that the hightouch docs have nothing for asset configuration so I’m guessing with most of the setup for hightouch specifically. I haven’t had an issue with airbyte integration at all.
l

Le Yang

05/12/2023, 1:30 AM
I mean the code you provided was not really complete. You want to use the required resource but no where is the resource configured for the asset.
j

Josh Kutsko

05/12/2023, 1:32 AM
Oh - sorry that wasn’t clear, the resource definitions are in another file here:
Copy code
defs = Definitions(
    assets=load_assets_from_package_module(assets),
    resources={
        "io_manager": FilesystemIOManager(),
        "hightouch": hightouch_instance,
        "airbyte": airbyte_instance,
    },
)
z

Zach

05/12/2023, 4:06 PM
To be a bit more helpful here the main thing you're missing is that you're trying to call an op from within another op/asset. Ops / assets cannot be called from one another, so if you have a function you want to share between ops / assets you need to encapsulate it as a vanilla python function. Dependencies between ops are defined in graphs, whereas assets infer their dependencies to create the execution graph based on the asset functions inputs or explicit asset key references.
it's likely in your example you'll just want to take the logic from you
hightouch_sync_op
and put it directly in the
hightouch_deal_sync
asset
scratch that, this is coming from an integration
j

Josh Kutsko

05/12/2023, 4:08 PM
Ahh, ok that makes sense, thanks. In this case, the hightouch-dagster library only exposes an op, in the source code it just has the
hightouch_sync_op
method as an op.
(this does explain the error I was getting though)
z

Zach

05/12/2023, 4:10 PM
so if you want to make it into an asset you could use the graph_asset decorator to call your op from a graph and create a graph-backed asset
j

Josh Kutsko

05/12/2023, 4:15 PM
Awesome, that should work, I’ll try that. I’m a bit confused about the docs here still - it explicitly says a software-defined asset should include an op. What is the proper way to configure that for my own understanding?
Happy to just look at an example if that’s easier 🙂
z

Zach

05/12/2023, 4:25 PM
yeah that's some very confusing text. a better description is that an asset IS an op under the hood, with additional metadata that Dagster tracks and a bigger emphasis on IO management through IOManagers. the code within the
@asset
definition is just vanilla python code to carry out whatever task / transformation you're modeling, the same way an op is defined. so a simple asset would be like
Copy code
@asset
def unique_companies(industry_df: spark.sql.DataFrame):
    return [r.company for r in industry_df.select("company").distinct().collect()]
you could do the same exact thing with an op, but dagster treats them slightly differently in that it doesn't have first-class mechanisms for tracking state associated with an execution for ops. assets are a way to essentially execute logic that also executes in a graph, but for which you want to track some external state
j

Josh Kutsko

05/12/2023, 4:27 PM
Ahhhh ok that makes complete sense. So in this case because the hightouch integration only exposes an op method, I can create an op graph from that and create my asset from there.
z

Zach

05/12/2023, 4:28 PM
exactly, graph-backed assets were exposed as an additional way to create assets for situations where the logic for creating the state being produced by the asset is dynamic in nature or better modeled as a number of steps. in your case, because the framework exposes the functionality you want as an op, graph-backed assets make an easy way to transform that into an asset
j

Josh Kutsko

05/12/2023, 4:29 PM
nice, makes sense now, thanks for all the help!
🎉 1
d

Dan Meyer

06/21/2023, 6:06 PM
@Josh Kutsko, did you ever figure out how to splice the Hightouch integration stuff together? @Bashir Jaji and I struggling with the same issue. I think we may be misunderstanding how to work with GraphAssets and/or
AssetsDefinition.from_op
j

Josh Kutsko

06/21/2023, 6:14 PM
Hi Dan, sort of, I basically implemented a fake asset that represents the hightouch sync output, using a graph asset definition. The core issue here is that the hightouch library only provides an API to create an op from a sync. I think using
from_op
should technically work, but my approach was to use the
@graph
decorator. The code is pretty hacky, really just a way to consolidate my syncs into assets so that the rest of the asset model can be clean.
Copy code
@graph_asset(
    ins={
        "platform_tables": AssetIn(
            key=AssetKey(["platform_tables"]),
        )
    },
)
def platform_export_live(platform_tables):
    first_sync = run_ht_first_sync_op(start_after=platform_tables)
    return run_ht_second_sync_op(start_after=first_sync)
that’s pretty much what I did (names anonymized). Basically just giving it an input asset (not that it can use anything from it) and then manually setting the ops to run after each other.
if you’re able to create assets directly from ops, and still have the proper dependencies, that’s likely a cleaner solution.
d

Dan Meyer

06/21/2023, 6:19 PM
really just a way to consolidate my syncs into assets so that the rest of the asset model can be clean.
Awesome - that's really all we need, too 🙂
👍 1
b

Bashir Jaji

06/21/2023, 6:21 PM
Thank you @Josh Kutsko!
🎉 1
j

Josh Kutsko

06/21/2023, 6:21 PM
No problem!
5 Views