https://dagster.io/ logo
#ask-community
Title
# ask-community
z

Zach Jablons

03/10/2023, 6:13 PM
Hey there, I feel like I might be missing something obvious, but is there something extra I have to do to get `asset`s to use cached results upon re-execution (w/o any changes to their input) 🧵
Copy code
from dagster import asset, materialize, Definitions, fs_io_manager         
                                                                            
 @asset                                                                     
 def a():                                                                   
     print("executing a")                                                   
     return 1                                                               
                                                                            
 if __name__ == "__main__":                                                 
     pipeline_results = a()                                                 
     print(pipeline_results)                                                
                                                                            
     pipeline_results2 = a()                                                
     print(pipeline_results2)
This is the code I'm running - I'd expect if it's caching the result of
a
that the 2nd invocation would leverage that and just give me its output, but instead I'm seeing that it's executing it multiple times
Copy code
executing a
1
executing a
1
I also notice the same if I do
materialize([a])
and
get_ouput_for_node
(or something like that?)
Is there some other layer I need to wrap these in before getting that to happen?
z

Zach Jablons

03/10/2023, 6:26 PM
Thanks I'll go through this!
D 1
Ok, quickly looking at this first section (which is what I'm doing in the code above) there's this line
We materialize it and look at the entry in the Asset Catalog:
In code, does 'we materialize it' look like
materialize([a])
?
c

chris

03/10/2023, 7:51 PM
In that guide, I think it’s referring to materialization via the UI. You could achieve something similar with
materialize()
if you ensure that you’re using the same DagsterInstance - just pass in
instance=DagsterInstance.get()
as a parameter
z

Zach Jablons

03/10/2023, 7:52 PM
Ok, let me try that
So that gets closer but I hit
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["a"]' for asset '["b"]' is not produced by any of the provided asset 
ops and is not one of the provided sources
Copy code
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                                   
                                                                                                                      
@asset                                                                                                                
def a():                                                                                                              
    print("executing a")                                                                                              
    return 1                                                                                                          
                                                                                                                      
@asset                                                                                                                
def b(a):                                                                                                             
    print("executing b")                                                                                              
    return a + 2                                                                                                      
                                                                                                                      
if __name__ == "__main__":                                                                                            
    instance = DagsterInstance.get()                                                                                  
                                                                                                                      
    pipeline_results = materialize([b], instance=instance)                                                            
    print(pipeline_results)                                                                                           
                                                                                                                      
    pipeline_results2 = materialize([b], instance=instance)                                                           
    print(pipeline_results2)
If I specify
materialize([a,b])
it appears to work but on the second run it runs both again Also ideally I wouldn't have to do that, as my impression was that
b
would "know" to run the
a
asset to fill its
a
argument
c

chris

03/10/2023, 11:38 PM
In https://docs.dagster.io/guides/dagster/asset-versioning-and-caching :
Copy code
You'll notice two hashes labeled code_version and data_version in the "System tags" section of the materialization details. The code version shown is a copy of the run ID for the run that generated this materialization. Because a_number has no user-defined code_version, Dagster assumes a different code version on every run, which it represents with the run ID. The data_version is also generated by Dagster. It is a hash of the code version together with the data versions of any inputs. Since a_number has no inputs, in this case the data version is a hash of the code version only.
You need to actually provide versioning information in order for caching to work. Dagster can’t automatically infer what has / hasn’t changed about your assets, and therefore cannot cache any results without you providing that information
z

Zach Jablons

03/11/2023, 12:08 AM
Copy code
Dagster assumes a different code version on every run, which it represents with the run ID.
Ok, I didn't fully get that. I'll try this with specifying the code version.
blob salute 1
Copy code
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance          
                                                                                             
@asset(code_version='1')                                                                     
def a():                                                                                     
    print("executing a")                                                                     
    return 1                                                                                 
                                                                                             
@asset(code_version='1')                                                                     
def b(a):                                                                                    
    print("executing b")                                                                     
    return a + 2                                                                             
                                                                                             
if __name__ == "__main__":                                                                   
    instance = DagsterInstance.get()                                                         
                                                                                             
    pipeline_results = materialize([a,b], instance=instance)                                 
    print(pipeline_results)                                                                  
                                                                                             
    pipeline_results2 = materialize([a,b], instance=instance)                                
    print(pipeline_results2)
Does this look like it should do what I want? I'm still seeing it execute twice
c

chris

03/11/2023, 12:51 AM
Mind trying this in the UI? Just want to verify that it works there for you
z

Zach Jablons

03/11/2023, 12:57 AM
I couldn't find where
stdout
goes, so I added some sleeps - 3 s for a and 6s for b
This is the first materialization
Presumably "Materialize Selected" here would be doing the same thing as running
materialize([a,b])
?
If so, it looks like materializing it again re-executes
You can see the 2 runs here:
Should the Snapshot ID match?
c

chris

03/11/2023, 1:00 AM
If you click the drop-down on the rematerialize all button, there should be a re-materialize stale and missing option
z

Zach Jablons

03/11/2023, 1:02 AM
I'm more interested in how to do this from the API rather than the UI
But I'm not seeing 're-materialize stale and missing', just 'wipe materializations'
image.png
Unless clicking on the wrong button 😅
c

chris

03/11/2023, 1:04 AM
strange… cc-ing @sean who worked on this - there might be UI intricacies I’m missing.
Fwiw I was able to repro the lack of caching with
materialize
, not sure if that’s an API gap or some nuance I’m missing.
z

Zach Jablons

03/11/2023, 1:06 AM
Alrighty - this is quickly becoming a monday problem for me, but I might check in here if I have some time tomorrow
😆 1
Hey - checking in on this - is there anything else I should try? Is there a version of the
materialize
API that replicates more 'materialize stale and missing' than 'materialize all'?
s

sean

03/13/2023, 4:23 PM
Hey Zach I’m catching up on this right now
OK, so first to give you a solution, this works:
Copy code
#!/usr/bin/env python

from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
from dagster._core.definitions.source_asset import SourceAsset                                   
                                                                                                                      
@asset                                                                                                                
def a():                                                                                                              
    print("executing a")                                                                                              
    return 1                                                                                                          
                                                                                                                      
@asset                                                                                                                
def b(a):                                                                                                             
    print("executing b")                                                                                              
    return a + 2                                                                                                      
                                                                                                                      
if __name__ == "__main__":                                                                                            
    instance = DagsterInstance.ephemeral()                                                                                  
                                                                                                                      
    pipeline_results = materialize([a], instance=instance)                                                            
    print(pipeline_results)                                                                                           
                                                                                                                      
    pipeline_results2 = materialize([b, SourceAsset("a")], instance=instance)                                                           
    print(pipeline_results2.output_for_node("b"))
👀 1
Second, to explain a little bit about what’s going on: • Runs need to use the same
DagsterInstance
to have access to results from previous runs. •
materialize
requires all dependencies to be provided on each call • you can use
SourceAsset
to “stub” a dependency that’s already been materialized in a previous run if you don’t want to materialize it again
All the stuff with code versions and data versions is really on another layer to this basic memoization functionality. We could certainly do a better job communicating this.
Also, I think it’s kind of unintuitive that you need to provide source assets for dependencies in
materialize
, since this doesn’t really match the dagit experience
So I’m going to look into that
z

Zach Jablons

03/13/2023, 6:40 PM
Gotcha - I really would love an API that looks like
materialize([b])
that would then auto-resolve: • That
a
needs to be materialized because
b
depends on it • Whether or not
a
should be loaded from cache or run It's not really viable to make a code change to use
SourceAsset
depending on whether or not
a
is cached for any given run of the DAG I'm also wondering if there's a way to ensure that runs can share the
DagsterInstance
. If I restart whatever defines a
DagsterInstance
, it's not intuitive to me that it would invalidate the caches which are defined by code/data
Also is
SourceAsset
not meant to be exposed publicly? (If importing from
_core
means what I think it means)
I'm also wondering if there's a way to ensure that runs can share the
DagsterInstance
. If I restart whatever defines a
DagsterInstance
, it's not intuitive to me that it would invalidate the caches which are defined by code/data
Ok, so I was trying to get clever and wrote this:
Copy code
def source_or_use(asset):                                        
    try:                                                         
        asset = SourceAsset(asset.key.to_python_identifier())    
        print("Using asset stub from SourceAsset")               
        return asset                                             
    except FileNotFoundError as fnf:                             
        print(fnf)                                               
        return asset
To be wrap assets input for materialize. I noticed on subsequent runs of this code that it's finding the cache, thus returning the
SourceAsset
object, which is what I want but not expected based on
Runs need to use the same
DagsterInstance
to have access to results
Unless
DagsterInstance.ephemeral()
produces the same instance between runs of the code
However, the problem is then that I get
Copy code
ipdb> pipeline_results2.output_for_node("b")
*** dagster._check.CheckError: Invariant failed. Description: __ephemeral_asset_job__ has no op named b.
When trying to get the results
I'm noticing it goes w/the
SourceAsset
even if I update the
code_version
, so I might be misunderstanding this
So reading more about
SourceAsset
, it's meant to represent an external data dependency (which this isn't, really). It looks like I could hack this together with
observable_source_asset
but it's not yet clear to me that that's the right path or how I'd compute the Output of that
s

sandy

03/22/2023, 11:54 PM
Hey @Zach Jablons - did you manage to get this figured out?
z

Zach Jablons

03/24/2023, 3:50 PM
No - I haven't had a chance to play with it more, but I don't think the workaround suggested works
s

sandy

03/24/2023, 4:27 PM
Here's a PR that adds a selection argument to
materialize
: https://github.com/dagster-io/dagster/pull/13133. If I understand your issue, I think this should make what you're trying to do much easier. Let me know if it would be helpful to find a time to discuss.
👀 2
z

Zach Jablons

03/27/2023, 2:57 PM
Thanks, I'll take a look at this and see if it helps
Took me a while to get around to this, and saw that it's merged into master - when is it likely to be released onto PyPI? I can do a dev installation in the meantime
s

sandy

03/31/2023, 8:24 PM
It will be released on Wednesday
z

Zach Jablons

03/31/2023, 8:25 PM
Ok, I wonder if
make dev_install
will finish by then 😅
Ok - I've been trying to get this working now with the
main
branch as of 2023-03-31, and I'm hitting a weird issue: Code:
Copy code
import tempfile                                                                                             
                                                                                                            
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                         
from dagster._core.definitions.source_asset import SourceAsset                                              
                                                                                                            
@asset(code_version='v1')                                                                                   
def a():                                                                                                    
    print("executing a")                                                                                    
    return 1                                                                                                
                                                                                                            
@asset(code_version='v2')                                                                                   
def b(a):                                                                                                   
    print("executing b")                                                                                    
    return a + 2                                                                                            
                                                                                                            
if __name__ == "__main__":                                                                                  
    with tempfile.TemporaryDirectory() as tmpdir:                                                           
        print(f"Caching to {tmpdir}")                                                                       
        instance = DagsterInstance.ephemeral(tempdir=tmpdir)                                                
                                                                                                            
        pipeline_results = materialize(assets=[a,b], selection=[b], instance=instance)                      
        print(pipeline_results.output_for_node("b"))                                                        
                                                                                                            
        pipeline_results2 = materialize(assets=[a,b], selection=[b], instance=instance)                     
        print(pipeline_results2.output_for_node("b"))
Output:
Copy code
Caching to /tmp/tmp47yf5djt                                                                                                             
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - ENGINE_EVENT - Executing steps in process (pid: 1236033)
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].         
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - LOGS_CAPTURED - Started capturing logs in process (pid: 1236033).
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_START - Started execution of step "b".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - b - Loading file from: /tmp/tmp47yf5djt/storage/a
2023-04-03 10:00:50 -0400 - dagster - ERROR - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_FAILURE - Execution of step "b" failed.
                                                                                                                                        
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "a" of step "b"::
                                                                                                                                        
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp47yf5djt/storage/a'
I'm not sure why it's insisting on materializing/caching to that directory - it doesn't exist afaict? I tried running this without specifying
tempdir
to
DagsterInstance.ephemeral
and got the same result
s

sandy

04/03/2023, 3:40 PM
is the directory that it's materializing to different than the one you're creating with
tempfile.TemporaryDirectory()
?
z

Zach Jablons

04/03/2023, 3:41 PM
Interestingly no
But I think for some reason it's not initializing the directory structure it needs with
storage
and all
Yeah - if I print
os.listdir(tmpdir)
after creating the ephemeral instance I can see that it's empty - is there an additional step that I'm missing?
s

sandy

04/03/2023, 3:48 PM
did you mean to select "a" in the first
materialize
statement?
z

Zach Jablons

04/03/2023, 3:49 PM
Well, I want the output of
b
- maybe I'm not fully clear on what
selection
does then
s

sandy

04/03/2023, 3:50 PM
you need to materialize "a" at some point, right? because "b" depends on it
but your example only ever tries to materialize "b"
z

Zach Jablons

04/03/2023, 3:51 PM
Right, ok, so what I was hoping to get was an API where dagster would resolve what needs to be materialized given a specific asset that I want materialized
Am I able to do this with
materialize
or do I need to use a different API in dagster?
Ok, thinking about this a bit more, I want to clarify what I'm looking for here: 1. Caching and reuse 2. Dependency resolution That said, I was hoping that the
selection
argument would help at least with 1, but I'm seeing that if I just run
a
multiple times it's still re-executing on the 2nd time. Am I correct in understanding that
materialize
means 'execute if not cached, otherwise load the output from cache'?
Copy code
import tempfile                                                                                       
import os                                                                                             
                                                                                                      
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance                   
from dagster._core.definitions.source_asset import SourceAsset                                        
                                                                                                      
@asset(code_version='v1')                                                                             
def a():                                                                                              
    print("executing a")                                                                              
    return 1                                                                                          
                                                                                                      
#@asset(code_version='v2')                                                                            
#def b(a):                                                                                            
#    print("executing b")                                                                             
#    return a + 2                                                                                     
                                                                                                      
if __name__ == "__main__":                                                                            
    with tempfile.TemporaryDirectory() as tmpdir:                                                     
        print(f"Caching to {tmpdir}")                                                                 
        instance = DagsterInstance.ephemeral(tempdir=tmpdir)                                          
                                                                                                      
        print(os.listdir(tmpdir))                                                                     
                                                                                                      
        pipeline_results = materialize(assets=[a], selection=['a'], instance=instance)                
        print(pipeline_results.output_for_node("a"))                                                  
                                                                                                      
        pipeline_results2 = materialize(assets=[a], selection=['a'], instance=instance)               
        print(pipeline_results2.output_for_node("a"))
Copy code
Caching to /tmp/tmp2cp6yfat
[]
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 4.28ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 9.12ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
1
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 3.96ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 8.48ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
1
s

sandy

04/03/2023, 4:57 PM
I see.
materialize
will actually execute in all situations - i.e. it won't skip assets that are cached. I filed an issue to track adding what you're looking for: https://github.com/dagster-io/dagster/issues/13333. We have a lot of the underlying tracking, but don't have it exposed yet in a neat way. @sean - is there a workaround that you'd be able to suggest? Potentially using internal APIs?
For dependency resolution, you can do something like:
Copy code
materialize([a, b], selection=AssetSelection.assets(b).upstream())
which will materialize "b" and all assets upstream of it
z

Zach Jablons

04/03/2023, 4:59 PM
Ah ok, that's handy to know - let me try that and see what happens. Where would I import
AssetSelection
from?
s

sandy

04/03/2023, 4:59 PM
ah yeah -
from dagster import AssetSelection
z

Zach Jablons

04/03/2023, 5:00 PM
Ah, ez
Ok, so the good news is that that works, and in multiple DAG configurations - I tested that three nodes (a-b-c) works as well as a pattern where c depends on both a and b. In that latter case (that looks for clarity like the below)
Copy code
a---|
|   |
b   |
|---|
c
I noticed that the result from
a
was in fact reused, which is also good - but my hope is that upon subsequent invocations (e.g., you can imagine running a script to get a result from b, working on it a bit, and then wanting a result from c later on) would leverage this caching as well
2 Views