Hey there, I feel like I might be missing somethin...
Hey there, I feel like I might be missing something obvious, but is there something extra I have to do to get `asset`s to use cached results upon re-execution (w/o any changes to their input) šŸ§µ
from dagster import asset, materialize, Definitions, fs_io_manager         
 def a():                                                                   
     print("executing a")                                                   
     return 1                                                               
 if __name__ == "__main__":                                                 
     pipeline_results = a()                                                 
     pipeline_results2 = a()                                                
This is the code I'm running - I'd expect if it's caching the result of
that the 2nd invocation would leverage that and just give me its output, but instead I'm seeing that it's executing it multiple times
executing a
executing a
I also notice the same if I do
(or something like that?)
Is there some other layer I need to wrap these in before getting that to happen?
Thanks I'll go through this!
Ok, quickly looking at this first section (which is what I'm doing in the code above) there's this line
We materialize it and look at the entry in the Asset Catalog:
In code, does 'we materialize it' look like
In that guide, I think itā€™s referring to materialization via the UI. You could achieve something similar with
if you ensure that youā€™re using the same DagsterInstance - just pass in
as a parameter
Ok, let me try that
So that gets closer but I hit
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["a"]' for asset '["b"]' is not produced by any of the provided asset 
ops and is not one of the provided sources
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                                   
def a():                                                                                                              
    print("executing a")                                                                                              
    return 1                                                                                                          
def b(a):                                                                                                             
    print("executing b")                                                                                              
    return a + 2                                                                                                      
if __name__ == "__main__":                                                                                            
    instance = DagsterInstance.get()                                                                                  
    pipeline_results = materialize([b], instance=instance)                                                            
    pipeline_results2 = materialize([b], instance=instance)                                                           
If I specify
it appears to work but on the second run it runs both again Also ideally I wouldn't have to do that, as my impression was that
would "know" to run the
asset to fill its
In https://docs.dagster.io/guides/dagster/asset-versioning-and-caching :
You'll notice two hashes labeled code_version and data_version in the "System tags" section of the materialization details. The code version shown is a copy of the run ID for the run that generated this materialization. Because a_number has no user-defined code_version, Dagster assumes a different code version on every run, which it represents with the run ID. The data_version is also generated by Dagster. It is a hash of the code version together with the data versions of any inputs. Since a_number has no inputs, in this case the data version is a hash of the code version only.
You need to actually provide versioning information in order for caching to work. Dagster canā€™t automatically infer what has / hasnā€™t changed about your assets, and therefore cannot cache any results without you providing that information
Dagster assumes a different code version on every run, which it represents with the run ID.
Ok, I didn't fully get that. I'll try this with specifying the code version.
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance          
def a():                                                                                     
    print("executing a")                                                                     
    return 1                                                                                 
def b(a):                                                                                    
    print("executing b")                                                                     
    return a + 2                                                                             
if __name__ == "__main__":                                                                   
    instance = DagsterInstance.get()                                                         
    pipeline_results = materialize([a,b], instance=instance)                                 
    pipeline_results2 = materialize([a,b], instance=instance)                                
Does this look like it should do what I want? I'm still seeing it execute twice
Mind trying this in the UI? Just want to verify that it works there for you
I couldn't find where
goes, so I added some sleeps - 3 s for a and 6s for b
This is the first materialization
Presumably "Materialize Selected" here would be doing the same thing as running
If so, it looks like materializing it again re-executes
You can see the 2 runs here:
Should the Snapshot ID match?
If you click the drop-down on the rematerialize all button, there should be a re-materialize stale and missing option
I'm more interested in how to do this from the API rather than the UI
But I'm not seeing 're-materialize stale and missing', just 'wipe materializations'
Unless clicking on the wrong button šŸ˜…
strangeā€¦ cc-ing @sean who worked on this - there might be UI intricacies Iā€™m missing.
Fwiw I was able to repro the lack of caching with
, not sure if thatā€™s an API gap or some nuance Iā€™m missing.
Alrighty - this is quickly becoming a monday problem for me, but I might check in here if I have some time tomorrow
Hey - checking in on this - is there anything else I should try? Is there a version of the
API that replicates more 'materialize stale and missing' than 'materialize all'?
Hey Zach Iā€™m catching up on this right now
OK, so first to give you a solution, this works:
#!/usr/bin/env python

from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
from dagster._core.definitions.source_asset import SourceAsset                                   
def a():                                                                                                              
    print("executing a")                                                                                              
    return 1                                                                                                          
def b(a):                                                                                                             
    print("executing b")                                                                                              
    return a + 2                                                                                                      
if __name__ == "__main__":                                                                                            
    instance = DagsterInstance.ephemeral()                                                                                  
    pipeline_results = materialize([a], instance=instance)                                                            
    pipeline_results2 = materialize([b, SourceAsset("a")], instance=instance)                                                           
Second, to explain a little bit about whatā€™s going on: ā€¢ Runs need to use the same
to have access to results from previous runs. ā€¢
requires all dependencies to be provided on each call ā€¢ you can use
to ā€œstubā€ a dependency thatā€™s already been materialized in a previous run if you donā€™t want to materialize it again
All the stuff with code versions and data versions is really on another layer to this basic memoization functionality. We could certainly do a better job communicating this.
Also, I think itā€™s kind of unintuitive that you need to provide source assets for dependencies in
, since this doesnā€™t really match the dagit experience
So Iā€™m going to look into that
Gotcha - I really would love an API that looks like
that would then auto-resolve: ā€¢ That
needs to be materialized because
depends on it ā€¢ Whether or not
should be loaded from cache or run It's not really viable to make a code change to use
depending on whether or not
is cached for any given run of the DAG I'm also wondering if there's a way to ensure that runs can share the
. If I restart whatever defines a
, it's not intuitive to me that it would invalidate the caches which are defined by code/data
Also is
not meant to be exposed publicly? (If importing from
means what I think it means)
I'm also wondering if there's a way to ensure that runs can share the
. If I restart whatever defines a
, it's not intuitive to me that it would invalidate the caches which are defined by code/data
Ok, so I was trying to get clever and wrote this:
def source_or_use(asset):                                        
        asset = SourceAsset(asset.key.to_python_identifier())    
        print("Using asset stub from SourceAsset")               
        return asset                                             
    except FileNotFoundError as fnf:                             
        return asset
To be wrap assets input for materialize. I noticed on subsequent runs of this code that it's finding the cache, thus returning the
object, which is what I want but not expected based on
Runs need to use the same
to have access to results
produces the same instance between runs of the code
However, the problem is then that I get
ipdb> pipeline_results2.output_for_node("b")
*** dagster._check.CheckError: Invariant failed. Description: __ephemeral_asset_job__ has no op named b.
When trying to get the results
I'm noticing it goes w/the
even if I update the
, so I might be misunderstanding this
So reading more about
, it's meant to represent an external data dependency (which this isn't, really). It looks like I could hack this together with
but it's not yet clear to me that that's the right path or how I'd compute the Output of that
Hey @Zach Jablons - did you manage to get this figured out?
No - I haven't had a chance to play with it more, but I don't think the workaround suggested works
Here's a PR that adds a selection argument to
: https://github.com/dagster-io/dagster/pull/13133. If I understand your issue, I think this should make what you're trying to do much easier. Let me know if it would be helpful to find a time to discuss.
Thanks, I'll take a look at this and see if it helps
Took me a while to get around to this, and saw that it's merged into master - when is it likely to be released onto PyPI? I can do a dev installation in the meantime
It will be released on Wednesday
Ok, I wonder if
make dev_install
will finish by then šŸ˜…
Ok - I've been trying to get this working now with the
branch as of 2023-03-31, and I'm hitting a weird issue: Code:
import tempfile                                                                                             
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                         
from dagster._core.definitions.source_asset import SourceAsset                                              
def a():                                                                                                    
    print("executing a")                                                                                    
    return 1                                                                                                
def b(a):                                                                                                   
    print("executing b")                                                                                    
    return a + 2                                                                                            
if __name__ == "__main__":                                                                                  
    with tempfile.TemporaryDirectory() as tmpdir:                                                           
        print(f"Caching to {tmpdir}")                                                                       
        instance = DagsterInstance.ephemeral(tempdir=tmpdir)                                                
        pipeline_results = materialize(assets=[a,b], selection=[b], instance=instance)                      
        pipeline_results2 = materialize(assets=[a,b], selection=[b], instance=instance)                     
Caching to /tmp/tmp47yf5djt                                                                                                             
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - ENGINE_EVENT - Executing steps in process (pid: 1236033)
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].         
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - LOGS_CAPTURED - Started capturing logs in process (pid: 1236033).
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_START - Started execution of step "b".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - b - Loading file from: /tmp/tmp47yf5djt/storage/a
2023-04-03 10:00:50 -0400 - dagster - ERROR - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_FAILURE - Execution of step "b" failed.
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "a" of step "b"::
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp47yf5djt/storage/a'
I'm not sure why it's insisting on materializing/caching to that directory - it doesn't exist afaict? I tried running this without specifying
and got the same result
is the directory that it's materializing to different than the one you're creating with
Interestingly no
But I think for some reason it's not initializing the directory structure it needs with
and all
Yeah - if I print
after creating the ephemeral instance I can see that it's empty - is there an additional step that I'm missing?
did you mean to select "a" in the first
Well, I want the output of
- maybe I'm not fully clear on what
does then
you need to materialize "a" at some point, right? because "b" depends on it
but your example only ever tries to materialize "b"
Right, ok, so what I was hoping to get was an API where dagster would resolve what needs to be materialized given a specific asset that I want materialized
Am I able to do this with
or do I need to use a different API in dagster?
Ok, thinking about this a bit more, I want to clarify what I'm looking for here: 1. Caching and reuse 2. Dependency resolution That said, I was hoping that the
argument would help at least with 1, but I'm seeing that if I just run
multiple times it's still re-executing on the 2nd time. Am I correct in understanding that
means 'execute if not cached, otherwise load the output from cache'?
import tempfile                                                                                       
import os                                                                                             
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                   
from dagster._core.definitions.source_asset import SourceAsset                                        
def a():                                                                                              
    print("executing a")                                                                              
    return 1                                                                                          
#def b(a):                                                                                            
#    print("executing b")                                                                             
#    return a + 2                                                                                     
if __name__ == "__main__":                                                                            
    with tempfile.TemporaryDirectory() as tmpdir:                                                     
        print(f"Caching to {tmpdir}")                                                                 
        instance = DagsterInstance.ephemeral(tempdir=tmpdir)                                          
        pipeline_results = materialize(assets=[a], selection=['a'], instance=instance)                
        pipeline_results2 = materialize(assets=[a], selection=['a'], instance=instance)               
Caching to /tmp/tmp2cp6yfat
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 4.28ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 9.12ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 3.96ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 8.48ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
I see.
will actually execute in all situations - i.e. it won't skip assets that are cached. I filed an issue to track adding what you're looking for: https://github.com/dagster-io/dagster/issues/13333. We have a lot of the underlying tracking, but don't have it exposed yet in a neat way. @sean - is there a workaround that you'd be able to suggest? Potentially using internal APIs?
For dependency resolution, you can do something like:
materialize([a, b], selection=AssetSelection.assets(b).upstream())
which will materialize "b" and all assets upstream of it
Ah ok, that's handy to know - let me try that and see what happens. Where would I import
ah yeah -
from dagster import AssetSelection
Ah, ez
Ok, so the good news is that that works, and in multiple DAG configurations - I tested that three nodes (a-b-c) works as well as a pattern where c depends on both a and b. In that latter case (that looks for clarity like the below)
|   |
b   |
I noticed that the result from
was in fact reused, which is also good - but my hope is that upon subsequent invocations (e.g., you can imagine running a script to get a result from b, working on it a bit, and then wanting a result from c later on) would leverage this caching as well