Zach Jablons
03/10/2023, 6:13 PMZach Jablons
03/10/2023, 6:13 PMfrom dagster import asset, materialize, Definitions, fs_io_manager
@asset
def a():
print("executing a")
return 1
if __name__ == "__main__":
pipeline_results = a()
print(pipeline_results)
pipeline_results2 = a()
print(pipeline_results2)
Zach Jablons
03/10/2023, 6:14 PMa
that the 2nd invocation would leverage that and just give me its output, but instead I'm seeing that it's executing it multiple timesZach Jablons
03/10/2023, 6:14 PMexecuting a
1
executing a
1
Zach Jablons
03/10/2023, 6:15 PMmaterialize([a])
and get_ouput_for_node
(or something like that?)Zach Jablons
03/10/2023, 6:15 PMShantanu
03/10/2023, 6:26 PMZach Jablons
03/10/2023, 6:26 PMZach Jablons
03/10/2023, 6:29 PMWe materialize it and look at the entry in the Asset Catalog:In code, does 'we materialize it' look like
materialize([a])
?chris
03/10/2023, 7:51 PMmaterialize()
if you ensure that youāre using the same DagsterInstance - just pass in instance=DagsterInstance.get()
as a parameterZach Jablons
03/10/2023, 7:52 PMZach Jablons
03/10/2023, 10:24 PMdagster._core.errors.DagsterInvalidDefinitionError: Input asset '["a"]' for asset '["b"]' is not produced by any of the provided asset
ops and is not one of the provided sources
Zach Jablons
03/10/2023, 10:24 PMfrom dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
@asset
def a():
print("executing a")
return 1
@asset
def b(a):
print("executing b")
return a + 2
if __name__ == "__main__":
instance = DagsterInstance.get()
pipeline_results = materialize([b], instance=instance)
print(pipeline_results)
pipeline_results2 = materialize([b], instance=instance)
print(pipeline_results2)
Zach Jablons
03/10/2023, 10:25 PMmaterialize([a,b])
it appears to work but on the second run it runs both again
Also ideally I wouldn't have to do that, as my impression was that b
would "know" to run the a
asset to fill its a
argumentchris
03/10/2023, 11:38 PMYou'll notice two hashes labeled code_version and data_version in the "System tags" section of the materialization details. The code version shown is a copy of the run ID for the run that generated this materialization. Because a_number has no user-defined code_version, Dagster assumes a different code version on every run, which it represents with the run ID. The data_version is also generated by Dagster. It is a hash of the code version together with the data versions of any inputs. Since a_number has no inputs, in this case the data version is a hash of the code version only.
You need to actually provide versioning information in order for caching to work. Dagster canāt automatically infer what has / hasnāt changed about your assets, and therefore cannot cache any results without you providing that informationZach Jablons
03/11/2023, 12:08 AMOk, I didn't fully get that. I'll try this with specifying the code version.Copy codeDagster assumes a different code version on every run, which it represents with the run ID.
Zach Jablons
03/11/2023, 12:10 AMfrom dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
@asset(code_version='1')
def a():
print("executing a")
return 1
@asset(code_version='1')
def b(a):
print("executing b")
return a + 2
if __name__ == "__main__":
instance = DagsterInstance.get()
pipeline_results = materialize([a,b], instance=instance)
print(pipeline_results)
pipeline_results2 = materialize([a,b], instance=instance)
print(pipeline_results2)
Zach Jablons
03/11/2023, 12:10 AMchris
03/11/2023, 12:51 AMZach Jablons
03/11/2023, 12:57 AMstdout
goes, so I added some sleeps - 3 s for a and 6s for bZach Jablons
03/11/2023, 12:57 AMZach Jablons
03/11/2023, 12:58 AMmaterialize([a,b])
?Zach Jablons
03/11/2023, 12:58 AMZach Jablons
03/11/2023, 12:59 AMZach Jablons
03/11/2023, 12:59 AMchris
03/11/2023, 1:00 AMchris
03/11/2023, 1:01 AMZach Jablons
03/11/2023, 1:02 AMZach Jablons
03/11/2023, 1:02 AMZach Jablons
03/11/2023, 1:02 AMZach Jablons
03/11/2023, 1:03 AMchris
03/11/2023, 1:04 AMchris
03/11/2023, 1:05 AMmaterialize
, not sure if thatās an API gap or some nuance Iām missing.Zach Jablons
03/11/2023, 1:06 AMZach Jablons
03/13/2023, 3:05 PMmaterialize
API that replicates more 'materialize stale and missing' than 'materialize all'?sean
03/13/2023, 4:23 PMsean
03/13/2023, 4:32 PM#!/usr/bin/env python
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
from dagster._core.definitions.source_asset import SourceAsset
@asset
def a():
print("executing a")
return 1
@asset
def b(a):
print("executing b")
return a + 2
if __name__ == "__main__":
instance = DagsterInstance.ephemeral()
pipeline_results = materialize([a], instance=instance)
print(pipeline_results)
pipeline_results2 = materialize([b, SourceAsset("a")], instance=instance)
print(pipeline_results2.output_for_node("b"))
sean
03/13/2023, 4:34 PMDagsterInstance
to have access to results from previous runs.
ā¢ materialize
requires all dependencies to be provided on each call
ā¢ you can use SourceAsset
to āstubā a dependency thatās already been materialized in a previous run if you donāt want to materialize it againsean
03/13/2023, 4:35 PMsean
03/13/2023, 4:36 PMmaterialize
, since this doesnāt really match the dagit experiencesean
03/13/2023, 4:36 PMZach Jablons
03/13/2023, 6:40 PMmaterialize([b])
that would then auto-resolve:
ā¢ That a
needs to be materialized because b
depends on it
ā¢ Whether or not a
should be loaded from cache or run
It's not really viable to make a code change to use SourceAsset
depending on whether or not a
is cached for any given run of the DAG
I'm also wondering if there's a way to ensure that runs can share the DagsterInstance
. If I restart whatever defines a DagsterInstance
, it's not intuitive to me that it would invalidate the caches which are defined by code/dataZach Jablons
03/13/2023, 6:41 PMSourceAsset
not meant to be exposed publicly? (If importing from _core
means what I think it means)Zach Jablons
03/13/2023, 7:47 PMI'm also wondering if there's a way to ensure that runs can share theOk, so I was trying to get clever and wrote this:. If I restart whatever defines aDagsterInstance
, it's not intuitive to me that it would invalidate the caches which are defined by code/dataDagsterInstance
def source_or_use(asset):
try:
asset = SourceAsset(asset.key.to_python_identifier())
print("Using asset stub from SourceAsset")
return asset
except FileNotFoundError as fnf:
print(fnf)
return asset
To be wrap assets input for materialize.
I noticed on subsequent runs of this code that it's finding the cache, thus returning the SourceAsset
object, which is what I want but not expected based on
Runs need to use the sameUnlessto have access to resultsDagsterInstance
DagsterInstance.ephemeral()
produces the same instance between runs of the codeZach Jablons
03/13/2023, 7:47 PMipdb> pipeline_results2.output_for_node("b")
*** dagster._check.CheckError: Invariant failed. Description: __ephemeral_asset_job__ has no op named b.
When trying to get the resultsZach Jablons
03/13/2023, 8:03 PMSourceAsset
even if I update the code_version
, so I might be misunderstanding thisZach Jablons
03/13/2023, 10:42 PMSourceAsset
, it's meant to represent an external data dependency (which this isn't, really). It looks like I could hack this together with observable_source_asset
but it's not yet clear to me that that's the right path or how I'd compute the Output of thatsandy
03/22/2023, 11:54 PMZach Jablons
03/24/2023, 3:50 PMsandy
03/24/2023, 4:27 PMmaterialize
: https://github.com/dagster-io/dagster/pull/13133. If I understand your issue, I think this should make what you're trying to do much easier.
Let me know if it would be helpful to find a time to discuss.Zach Jablons
03/27/2023, 2:57 PMZach Jablons
03/31/2023, 5:40 PMsandy
03/31/2023, 8:24 PMZach Jablons
03/31/2023, 8:25 PMmake dev_install
will finish by then š
Zach Jablons
04/03/2023, 2:02 PMmain
branch as of 2023-03-31, and I'm hitting a weird issue:
Code:
import tempfile
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
from dagster._core.definitions.source_asset import SourceAsset
@asset(code_version='v1')
def a():
print("executing a")
return 1
@asset(code_version='v2')
def b(a):
print("executing b")
return a + 2
if __name__ == "__main__":
with tempfile.TemporaryDirectory() as tmpdir:
print(f"Caching to {tmpdir}")
instance = DagsterInstance.ephemeral(tempdir=tmpdir)
pipeline_results = materialize(assets=[a,b], selection=[b], instance=instance)
print(pipeline_results.output_for_node("b"))
pipeline_results2 = materialize(assets=[a,b], selection=[b], instance=instance)
print(pipeline_results2.output_for_node("b"))
Output:
Caching to /tmp/tmp47yf5djt
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - ENGINE_EVENT - Executing steps in process (pid: 1236033)
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - LOGS_CAPTURED - Started capturing logs in process (pid: 1236033).
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_START - Started execution of step "b".
2023-04-03 10:00:50 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - b - Loading file from: /tmp/tmp47yf5djt/storage/a
2023-04-03 10:00:50 -0400 - dagster - ERROR - __ephemeral_asset_job__ - 424defe8-f7bc-4ca5-8e71-014c9696bf1a - 1236033 - b - STEP_FAILURE - Execution of step "b" failed.
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "a" of step "b"::
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp47yf5djt/storage/a'
Zach Jablons
04/03/2023, 2:06 PMtempdir
to DagsterInstance.ephemeral
and got the same resultsandy
04/03/2023, 3:40 PMtempfile.TemporaryDirectory()
?Zach Jablons
04/03/2023, 3:41 PMZach Jablons
04/03/2023, 3:41 PMstorage
and allZach Jablons
04/03/2023, 3:45 PMos.listdir(tmpdir)
after creating the ephemeral instance I can see that it's empty - is there an additional step that I'm missing?sandy
04/03/2023, 3:48 PMmaterialize
statement?Zach Jablons
04/03/2023, 3:49 PMb
- maybe I'm not fully clear on what selection
does thensandy
04/03/2023, 3:50 PMsandy
04/03/2023, 3:50 PMZach Jablons
04/03/2023, 3:51 PMZach Jablons
04/03/2023, 3:52 PMmaterialize
or do I need to use a different API in dagster?Zach Jablons
04/03/2023, 4:39 PMselection
argument would help at least with 1, but I'm seeing that if I just run a
multiple times it's still re-executing on the 2nd time. Am I correct in understanding that materialize
means 'execute if not cached, otherwise load the output from cache'?
import tempfile
import os
from dagster import asset, materialize, Definitions, fs_io_manager, DagsterInstance
from dagster._core.definitions.source_asset import SourceAsset
@asset(code_version='v1')
def a():
print("executing a")
return 1
#@asset(code_version='v2')
#def b(a):
# print("executing b")
# return a + 2
if __name__ == "__main__":
with tempfile.TemporaryDirectory() as tmpdir:
print(f"Caching to {tmpdir}")
instance = DagsterInstance.ephemeral(tempdir=tmpdir)
print(os.listdir(tmpdir))
pipeline_results = materialize(assets=[a], selection=['a'], instance=instance)
print(pipeline_results.output_for_node("a"))
pipeline_results2 = materialize(assets=[a], selection=['a'], instance=instance)
print(pipeline_results2.output_for_node("a"))
Caching to /tmp/tmp2cp6yfat
[]
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 4.28ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 9.12ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 23c4dc30-2856-4719-9a55-7da19a31c33d - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
1
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Executing steps in process (pid: 1249122)
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - LOGS_CAPTURED - Started capturing logs in process (pid: 1249122).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_START - Started execution of step "a".
executing a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - a - Writing file at: /tmp/tmp2cp6yfat/storage/a
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - ASSET_MATERIALIZATION - Materialized value a.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - a - STEP_SUCCESS - Finished execution of step "a" in 3.96ms.
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - ENGINE_EVENT - Finished steps in process (pid: 1249122) in 8.48ms
2023-04-03 12:17:22 -0400 - dagster - DEBUG - __ephemeral_asset_job__ - 25604f5d-168e-45ad-a636-24bf3f16f28e - 1249122 - RUN_SUCCESS - Finished execution of run for "__ephemeral_asset_job__".
1
sandy
04/03/2023, 4:57 PMmaterialize
will actually execute in all situations - i.e. it won't skip assets that are cached. I filed an issue to track adding what you're looking for: https://github.com/dagster-io/dagster/issues/13333.
We have a lot of the underlying tracking, but don't have it exposed yet in a neat way.
@sean - is there a workaround that you'd be able to suggest? Potentially using internal APIs?sandy
04/03/2023, 4:58 PMmaterialize([a, b], selection=AssetSelection.assets(b).upstream())
which will materialize "b" and all assets upstream of itZach Jablons
04/03/2023, 4:59 PMAssetSelection
from?sandy
04/03/2023, 4:59 PMfrom dagster import AssetSelection
Zach Jablons
04/03/2023, 5:00 PMZach Jablons
04/03/2023, 5:34 PMa---|
| |
b |
|---|
c
I noticed that the result from a
was in fact reused, which is also good - but my hope is that upon subsequent invocations (e.g., you can imagine running a script to get a result from b, working on it a bit, and then wanting a result from c later on) would leverage this caching as well