Is it possible to define an asset without an input...
# ask-community
d
Is it possible to define an asset without an input? I know you can do so with Ops by creating one in the form of:
@Op(ins={"start": In(Nothing)})
but
start
is not a key that is available in
ins
of an
asset
. I have a pipeline that is like:
ingest_dataset1 -> ingest_dataset2 -> start_processing_data
in the form of:
asset -> asset -> op.
The assets do not depend on each other, I just want one to run after the other. I could also use:
op -> op -> op
but it is to my understanding that an Op should be used when performing some type of transformation or logic whereas an asset is signifies something which is stored to a persistent storage.
v
This minimal example works:
Copy code
from dagster import asset, AssetIn


@asset
def asset_one():
    pass


@asset(ins={"start": AssetIn("asset_one")})
def asset_two(start):
    pass
Alternatives: If you’re chaining ops that you don’t wanna persist you can use graph-backed assets, doesn’t sound like it applies here since you’re ingesting datasets. If you have expensive computing to start up the processing, you can always define a
multi_asset
that will ensure one gets materialized after the other without having this dependency (which you may or may not want, I like the fact that the asset graph serves as a mental model for all dependencies in the data platform). The latter would look something like this:
Copy code
@multi_asset(
    outs={
        "asset_one": AssetOut(is_required=True),
        "asset_two": AssetOut(is_required=True),
    },
    can_subset=True, # optional in case sometimes not all are/can be materialized
)
def my_assets(context):
    yield Output(value=process_asset_one(), output_name="asset_one")
    yield Output(value=process_asset_two(), output_name="asset_two")
d
Hey Vinnie, I tried out multi assets and it works, sort of. My datasets are ingested and pushed to S3 as one Op so they no longer depend on each other. However, my next node in the graph (which is an Op) launches a cluster so I can run PySpark jobs. I want the cluster to start after the datasets have been ingested. The Op is now complaining that it receives multiple outputs for a single input. Is it possible to set the op to not expect any inputs, no matter how many outputs are generated upstream? The error I get:
dagster._core.errors.DagsterInvalidDefinitionError: In @graph ingestion_pipeline, received a tuple of multiple outputs for input "start" (at position 0) in op invocation launch_emr_cluster. Must pass individual output, available from tuple: ('customer_transactions', 'customer_information')
My Op:
@op(
ins={"start": In(Nothing)},
)
def launch_emr_cluster(context: OpExecutionContext) -> str:
...
v
It’s not a case I’ve encountered yet, but I imagine the solution would be analog to my first example, something along the lines of
Copy code
@op(ins={"customer_transactions": AssetIn("customer_transactions"), "customer_information": AssetIn("customer_information")}
def my_op(context, customer_transactions, customer_information):
  ...
I also have to say, I’m not entirely sure if ops can depend on assets. The Dagster team is probably waking up around now and I’m sure they can help though 🙂
d
Yes I'd imagine that this would work but in my case I want the Op that launches a cluster to be Asset independent but is executed after my multi_asset function has finished. Thanks for the suggestion! 🙂 I'll let you know if I figure out a way to solve this
z
you might also think about using an AssetSensor to trigger the op when the assets have been materialized. this would break up the graph though, but can be helpful for managing this kind of dependency where you only need an order-based dependency between assets and ops.
❤️ 1
j
You can also use
non_argument_deps
https://docs.dagster.io/concepts/assets/software-defined-assets#non-argument-dependencies to define order dependencies without data dependencies between assets
❤️ 2
d
Thank you! I will look into both of them 🙂
435 Views