hi :slightly_smiling_face: hopefully a simple ques...
# ask-community
w
hi 🙂 hopefully a simple question: how do i avoid re-evaluating assets when re-running jobs via the python API? for example, suppose the following example script, which defines an asset, defines a job using the asset, and runs the job with
execute_in_process()
:
Copy code
# example.py

from dagster import asset, job
from dagster_gcp_pandas import BigQueryPandasIOManager
import pandas as pd

io_manager = BigQueryPandasIOManager(...)

@asset
def dummy_dataframe() -> pd.DataFrame:
    df = pd.DataFrame({"A": range(10)})
    df["current_time"] = pd.Timestamp.now()
    return df

@job(resource_defs={"io_manager": io_manager})
def my_job():
    dummy_dataframe()

if __name__ == "__main__":
    result = my_job.execute_in_process()
suppose i run this command twice:
$ DAGSTER_HOME=$(pwd) python example.py
my expectation is: if i run this script more than once, the job would use the persisted result for
dummy_dataframe()
from the first run. however, i can see in BigQuery that
dummy_dataframe()
is re-evaluated and updated each time i call the script (evidenced by the
current_time
field updating each run). why is this? why isn’t the job executor seeing that the asset was already computed and persisted, and simply loading that result? any help much appreciated 🙏
o
hi @William Grisaitis! a couple things to note here. The first is that in order to make a job that updates assets, you'll want to use define_asset_job, not the
@job
decorator (this is intended for use with ops/graphs, not assets). In your code, the dummy_dataframe asset is being invoked directly when the job is defined (rather than when
my_job
is being executed), and
my_job
actually does nothing, as it has zero ops inside of it.
The second is that currently Dagster does not assume that any given asset is idempotent (there may be random processes that happen within the asset), so any request to re-execute a set of assets is taken literally. In the UI, the set of assets that have "meaningful changes" above them is indicated visually, and there is a button available to only execute the assets that would be expected to change on execution
however, this automatic memoization of assets that are confirmed to be idempotent (likely via the user setting a code_version on the asset) is something we're looking into!
w
Thank you so much @owen! I really appreciate your reply and setting me straight on the different job types.
Re automatic memoization - is there a GitHub issue where memoization of assets is being discussed? I would love to contribute, as such a functionality would be immensely useful to me. (Edit - I will search for this soon, just have bad internet right now)
And secondly, you mentioned in the dagit UI that there's a way of only re-executing stale (ie "meaningfully changed") assets. Is there a way from the python API to initiate that re-materialization? Eg, maybe from a DagsterInstance, get a list of assets that have meaningful changes, and maybe create and run defined_asset_job's on them? (I apologize if I'm using the wrong words or misunderstanding some concept vocabulary here.)
o
No problem! To dive more into the programmatic invocation side, you'll need to start touching some non-public APIs. I don't anticipate any major changes to happen in this realm, but just keep in mind that things can shift around and get renamed without warning between minor releases. The way we calculate which assets are stale is with the CachingStaleStatusResolver. You can programmatically materialize a certain set of assets using
defs.get_implicit_job_def_for_assets(some_keys)
. putting this together, you could have a script that looked something like
Copy code
defs = Definitions(assets=[all, your, assets])

instance = DagsterInstance.get()
asset_graph = defs.get_repository_def().asset_graph

# this bit relies on internal APIs
resolver = CachingStaleStatusResolver(instance, asset_graph)
stale_or_missing_keys: List[AssetKey] = []
for asset_key in asset_selection:
    if resolver.get_status(asset_key) in [StaleStatus.STALE, StaleStatus.MISSING]:
        stale_or_missing_keys.append(asset_key)

job = defs.get_implicit_job_def_for_assets(stale_or_missing_keys)

job.execute_in_process(instance=instance)
that's the basic idea at least (I haven't personally tested that), but this mimics the process that happens when you do this through the UI. the DagsterInstance is what stores the history of all your runs (so the CachingStaleStatusResolver is using it to look up which things have executed and all that)
b
Hi @owen, I am working on sth similar and this is an interesting solution that you have proposed! Will
job.execute_in_process(instance=instance)
update the asset status? E.g. Will running this script update that the asset on the dagster-webserver? Seems like it doesnt, and I'm not sure how to make it reflect on the UI
For some context, I am working on creating a backfill runner script. In this runner script, I am executing individual partition jobs, and will like to have them reflected on the UI
o
Hi @Brenda Thng! Invoking a job like that would be expected to update the UI, provided you have your
$DAGSTER_HOME
environment variable set to wherever your dagster.yaml file lives (usually ~/.dagster). It might also be worth looking into the graphql API for launching jobs (as executing jobs in process means that you can't have multiple jobs running in parallel). There are also instance methods to kick off explicit backfills, rather than manually submitting runs one by one, but not sure if that goes against what you're trying to achieve here
b
What if I'm not using
dagster.yaml
, but kubernetes helm charts instead? I did consider the graphql API as well, but I was hoping to be able to use a pipeline to invoke other dagster pipeline runs, so that I can track the progress, and also be able to invoke backfill runs using the dagster UI instead of writing custom scripts. For "instance methods to kick off explicit backfills", I've seen those. I am trying to perform a large backfill of a few thousand jobs, and whenever I do that, I notice that it will lag the webserver but it puts everything in a queue. Hence the workaround I'm doing it to submit each job individually, one after another
o
If you're going to invoke your script from the body of an op (rather than just running locally), the op's
context.instance
is what you'd want to pass in as the instance argument
b
I see okay thank you