William Grisaitis

05/22/2023, 9:05 PM
hi 🙂 hopefully a simple question: how do i avoid re-evaluating assets when re-running jobs via the python API? for example, suppose the following example script, which defines an asset, defines a job using the asset, and runs the job with

from dagster import asset, job
from dagster_gcp_pandas import BigQueryPandasIOManager
import pandas as pd

io_manager = BigQueryPandasIOManager(...)

def dummy_dataframe() -> pd.DataFrame:
    df = pd.DataFrame({"A": range(10)})
    df["current_time"] =
    return df

@job(resource_defs={"io_manager": io_manager})
def my_job():

if __name__ == "__main__":
    result = my_job.execute_in_process()
suppose i run this command twice:
$ DAGSTER_HOME=$(pwd) python
my expectation is: if i run this script more than once, the job would use the persisted result for
from the first run. however, i can see in BigQuery that
is re-evaluated and updated each time i call the script (evidenced by the
field updating each run). why is this? why isn’t the job executor seeing that the asset was already computed and persisted, and simply loading that result? any help much appreciated 🙏


05/22/2023, 10:27 PM
hi @William Grisaitis! a couple things to note here. The first is that in order to make a job that updates assets, you'll want to use define_asset_job, not the
decorator (this is intended for use with ops/graphs, not assets). In your code, the dummy_dataframe asset is being invoked directly when the job is defined (rather than when
is being executed), and
actually does nothing, as it has zero ops inside of it.
The second is that currently Dagster does not assume that any given asset is idempotent (there may be random processes that happen within the asset), so any request to re-execute a set of assets is taken literally. In the UI, the set of assets that have "meaningful changes" above them is indicated visually, and there is a button available to only execute the assets that would be expected to change on execution
however, this automatic memoization of assets that are confirmed to be idempotent (likely via the user setting a code_version on the asset) is something we're looking into!

William Grisaitis

05/23/2023, 1:58 AM
Thank you so much @owen! I really appreciate your reply and setting me straight on the different job types.
Re automatic memoization - is there a GitHub issue where memoization of assets is being discussed? I would love to contribute, as such a functionality would be immensely useful to me. (Edit - I will search for this soon, just have bad internet right now)
And secondly, you mentioned in the dagit UI that there's a way of only re-executing stale (ie "meaningfully changed") assets. Is there a way from the python API to initiate that re-materialization? Eg, maybe from a DagsterInstance, get a list of assets that have meaningful changes, and maybe create and run defined_asset_job's on them? (I apologize if I'm using the wrong words or misunderstanding some concept vocabulary here.)


05/23/2023, 11:44 PM
No problem! To dive more into the programmatic invocation side, you'll need to start touching some non-public APIs. I don't anticipate any major changes to happen in this realm, but just keep in mind that things can shift around and get renamed without warning between minor releases. The way we calculate which assets are stale is with the CachingStaleStatusResolver. You can programmatically materialize a certain set of assets using
. putting this together, you could have a script that looked something like
defs = Definitions(assets=[all, your, assets])

instance = DagsterInstance.get()
asset_graph = defs.get_repository_def().asset_graph

# this bit relies on internal APIs
resolver = CachingStaleStatusResolver(instance, asset_graph)
stale_or_missing_keys: List[AssetKey] = []
for asset_key in asset_selection:
    if resolver.get_status(asset_key) in [StaleStatus.STALE, StaleStatus.MISSING]:

job = defs.get_implicit_job_def_for_assets(stale_or_missing_keys)

that's the basic idea at least (I haven't personally tested that), but this mimics the process that happens when you do this through the UI. the DagsterInstance is what stores the history of all your runs (so the CachingStaleStatusResolver is using it to look up which things have executed and all that)