Hi community! I'm trying to build a factory functi...
# ask-community
s
Hi community! I'm trying to build a factory function to run scripts on Databricks using graph-backed assets. Essentially my factory function has 3 ops: 1) upload script 2) create Databricks run op 3) delete script. The issue I am facing now is to add upstream dependencies as part of the factory function. See the code attached. The factory works if passed input (ie upstream) assets are empty, ie
asset_ins={}
. But if I passed some upstream asset as
asset_ins={"upstream": AssetIn("an_upstream_asset")}
I have an error that complain about the argument of `make_asset_from_graph`: it expects that
make_asset_from_graph
has the
upstream
argument. How can I generate the argument dinamically? I tried by using these lines:
Copy code
in_asset_args = list(asset_ins.keys())

@op(...)
def make_asset_from_graph(*in_asset_args):
    ...
But it does not work... Anyone can help how to dynamically create these arguments?
c
Hi Serg. Not sure what error you're getting exactly, but some thoughts: • all input arguments to a graph must be passed to the underlying ops, so all the
asset_ins
must be passed to an op • I think the
graph_asset
decorator makes some assumptions that aren't true in this case about the inputs to the underlying graph. I think you will need to define
GraphIn
inputs explicitly like below. I took a stab at making this work:
Copy code
def make_asset_for_script(
    asset_name, upstream_assets: Optional[Sequence[AssetKey]]
) -> AssetsDefinition:
    @op(
        name=f"{asset_name}_upload",
        ins={"__".join(asset_key.path): In(Nothing) for asset_key in upstream_assets}
        if upstream_assets
        else {},
        ...
    )
    def upload_script(context: OpExecutionContext) -> str:
        ...

    @op(
        name=f"{asset_name}_delete",
        ins={"path": In(str), "start": In(Nothing)},
    )
    def delete_script(context: OpExecutionContext, path):
        ...

    @graph(
        name=asset_name,
        ins={asset_key.path[-1]: GraphIn() for asset_key in upstream_assets}
        if upstream_assets
        else {},
    )
    def make_asset_from_graph(**kwargs):
        upload = upload_script(**kwargs)
        return delete_script(path=upload, start=run_script(upload))

    return AssetsDefinition.from_graph(
        make_asset_from_graph,
        keys_by_input_name={asset_key.path[-1]: asset_key for asset_key in upstream_assets}
        if upstream_assets
        else {},
    )
s
Thank you! I will try this way!