William Grisaitis
05/22/2023, 9:05 PMexecute_in_process()
:
# example.py
from dagster import asset, job
from dagster_gcp_pandas import BigQueryPandasIOManager
import pandas as pd
io_manager = BigQueryPandasIOManager(...)
@asset
def dummy_dataframe() -> pd.DataFrame:
df = pd.DataFrame({"A": range(10)})
df["current_time"] = pd.Timestamp.now()
return df
@job(resource_defs={"io_manager": io_manager})
def my_job():
dummy_dataframe()
if __name__ == "__main__":
result = my_job.execute_in_process()
suppose i run this command twice:
$ DAGSTER_HOME=$(pwd) python example.py
my expectation is: if i run this script more than once, the job would use the persisted result for dummy_dataframe()
from the first run.
however, i can see in BigQuery that dummy_dataframe()
is re-evaluated and updated each time i call the script (evidenced by the current_time
field updating each run). why is this? why isn’t the job executor seeing that the asset was already computed and persisted, and simply loading that result?
any help much appreciated 🙏owen
05/22/2023, 10:27 PM@job
decorator (this is intended for use with ops/graphs, not assets). In your code, the dummy_dataframe asset is being invoked directly when the job is defined (rather than when my_job
is being executed), and my_job
actually does nothing, as it has zero ops inside of it.William Grisaitis
05/23/2023, 1:58 AMowen
05/23/2023, 11:44 PMdefs.get_implicit_job_def_for_assets(some_keys)
.
putting this together, you could have a script that looked something like
defs = Definitions(assets=[all, your, assets])
instance = DagsterInstance.get()
asset_graph = defs.get_repository_def().asset_graph
# this bit relies on internal APIs
resolver = CachingStaleStatusResolver(instance, asset_graph)
stale_or_missing_keys: List[AssetKey] = []
for asset_key in asset_selection:
if resolver.get_status(asset_key) in [StaleStatus.STALE, StaleStatus.MISSING]:
stale_or_missing_keys.append(asset_key)
job = defs.get_implicit_job_def_for_assets(stale_or_missing_keys)
job.execute_in_process(instance=instance)