I am curious if assets can help in the following c...
# ask-community
d
I am curious if assets can help in the following case. I have a job with the following ops: • t1 = create table1 if not exists • t2 = create table2 if not exists • m1 = create materialized view based on table1 and table2 if not exists • f1 = fill table1(t1) • f2 = fill table(t2) • refresh the materialized view (m1, f1, f2) All the ops at this point write directly to the database, but conceivably I could extract that functionality out. Is there a way to do this with assets? The main things I am looking to gain are: • an easy way for consumers to tell when a table was last updated • Ideally some metadata about the updates • Some way to not forget to refresh the materialized view when the upstream data gets updated
o
hi daniel, the way I would conceptualize this as a group of assets is: • Asset Table1: Do t1 + f1 • Asset Table2: Do t2 + f2 • Asset View1: Do m1 + refresh, depends on Table1 and Table2
The underlying operations could continue to write directly to the database (the dependency information between the view1 asset could even be encoded using the non_argument_deps parameter to indicate that no data is being passed using dagster)
You can also add whatever metadata you want at runtime using either AssetObservations or by attaching metadata to the relevant outputs. This will get stored/tracked over time
d
I am a bit confused on the documentation. Should I be using `@asset`or
@op
like I currently am and then in the op doing the actual work?
and how much of the metadata should I be trying to generate from a custom
io_manager
vs. the individual assets/ops?
o
you should replace your
@op
decorators with
@asset
(but the functions underneath the decorators would remain basically the same, other than combining together the bodies of a couple of the ops)
d
got it. Can assets call
@op
or no?
o
and I think you can skip using a custom io_manager (my guess is that it would be more convenient to just include the metadata-generating code in the op body)
d
is there a way to differentiate between asset creation and asset "update"?
or it is all the same
o
under the hood,
@asset
will create an op for you (and will do some other stuff to help dagster figure out how to connect the generated ops to other ops), so the code underneath the asset decorator has the same constraints as code underneath the op decatorator (so no, you shouldn't create ops manually in those functions)
and there's no difference between a creation / update of an asset in the assets model
d
perfect, I'm going to give it a go
thank you for the help
o
no problem 🙂
d
how do I pass an asset as non_argument_deps? I see that the param exists in the docs, but not how to use it
(more generally, I think I need
@asset(required_resource_keys = ..., <what goes here for upstream assets. if they do or don't need data passed around?)
o
oops that's an oversight. the argument's type is just a set of AssetKeys. So you pass in the set of AssetKeys that you want this asset to depend on (in your case it should be something like
non_argument_deps={AssetKey("table1"), AssetKey("table2")}
)
the asset keys are generated (by default) from the names of the annotated functions
d
and then dagster figures out the graph for all of those automagically? (I currently pass a bunch of
Nothing
around to make it it all work)
o
yep exactly 🙂 -- you can toss the assets into an AssetGroup, and dagster will hook up all the op inputs/outputs in a way that satisfies the asset dependencies that are defined
d
🙂
what is the right way to build the assets from a job (and/or otherwise update assets on a schedule)?
and how can I pass the config_schema that I had on the ops into the assets?
o
for the first question, once you have an AssetGroup, you can call .build_job() on it to get a job that will update all those assets (and this job can then be put on a schedule just like any other job)
for the second question, assets do not currently support runtime configuration. depending on what sorts of things you want to configure, you might be able to push that configuration into a resource, or just bake in some values in the python code itself
configurable assets is a tricky problem, but it is something we want to support in the future
d
I think what I was trying to do might be supported by partitioned assets
but some of the partitions might overlap
(in a way that is is not observable to dagster)
o
hm interesting -- can you say more?
d
basically, I had an
op
that took in a list of files to process as part of the config
there might be some entries that are duplicated amoung the input files
but could I replace the list of files (ordered by which ones should take precedence in the case of conflicts) and use the partition mechanism?
(and then in the job, specific the list of partitions as a
op
output or similar?)
Or is this a bad rabbit hole to go down?
o
with these files, do you combine their contents together into one larger thing in the op, or do you basically do some sequence of actions for each (deduped) file, with these actions spanning multiple ops?
d
Not 100% sure what the difference is. Imagine 10 files, with 100 records each. Some records might appear in multiple files. Process each record in each file, and write them to a single table (order matters for clobbering)
The actual list of files is dynamic, and currently generated by an op/job (I have that part working in a sensor now)
o
could it make sense to model that list of files as an asset itself?
d
not logically. I understand that it might work to get the data passed to an asset though. Do partitions only really support time?
also I am getting `Param "path" is not one of ['Sequence']. Got <dagster.core.asset_defs.asset.AssetsDefinition object at .... which is type <dagster.core.asset_defs.asset.AssetDefinition>.
o
partitions definitely support non-time dimensions, so your original suggestion seems like it should work in theory, the tricky part is just that you can generally only launch a job for a single partition at a time. If you model partitions as 1 file = 1 partition, then you run into issues. You could hack around the immediate issue by swapping out op config with resource config, and reading the configuration of the resource inside the asset to determine what files to read, but this might be a bit of a bad rabbit hole to go down, as it doesn't really mesh well with the asset model, which sort of assumes that you're able to just execute the asset in a vacuum (without need for more configuration)
d
h'm
even for backfill, it would still be one partition at a time?
o
the most "asset-y" way of doing this would be to include the code to generate the files that you need as part of the asset that combines them together, but I recognize that might not be ideal depending on your exact setup
and yep, even for backfills (although this is definitely a problem that we're interested in tackling)
d
the core issue with that is that it is hard to call ops from other ops
folding all the functions together into a monster at some point seems wrong, and the challenge of calling regular functions from jobs, and the like means that it matters that a function is or is not an op
o
if you could create an asset from an @graph (rather than just an @op), would this be sufficient? this is functionality that we are planning to include in the nearish future
d
I think so, because then I can effectively call an
op
from an
op
o
great
d
now I'm getting an
AssetsDefinition object has no attribute 'asset_key'.
Function decoration looks like:
@asset(ins={'files': list_of_files_asset}, required_resource_keys={'r1','r2'}, non_argument_deps={AssetKey('<name of upstream asset')})
any idea what it could be?
o
oh weird -- can I see more of the stack trace?
d
it just goes to
run_code
I just figured out
@asset(ins={'foo': AssetIn(AssetKey('my_asset_key'))})