William Grisaitis
05/22/2023, 9:05 PMexecute_in_process()
:
# example.py
from dagster import asset, job
from dagster_gcp_pandas import BigQueryPandasIOManager
import pandas as pd
io_manager = BigQueryPandasIOManager(...)
@asset
def dummy_dataframe() -> pd.DataFrame:
df = pd.DataFrame({"A": range(10)})
df["current_time"] = pd.Timestamp.now()
return df
@job(resource_defs={"io_manager": io_manager})
def my_job():
dummy_dataframe()
if __name__ == "__main__":
result = my_job.execute_in_process()
suppose i run this command twice:
$ DAGSTER_HOME=$(pwd) python example.py
my expectation is: if i run this script more than once, the job would use the persisted result for dummy_dataframe()
from the first run.
however, i can see in BigQuery that dummy_dataframe()
is re-evaluated and updated each time i call the script (evidenced by the current_time
field updating each run). why is this? why isn’t the job executor seeing that the asset was already computed and persisted, and simply loading that result?
any help much appreciated 🙏owen
05/22/2023, 10:27 PM@job
decorator (this is intended for use with ops/graphs, not assets). In your code, the dummy_dataframe asset is being invoked directly when the job is defined (rather than when my_job
is being executed), and my_job
actually does nothing, as it has zero ops inside of it.owen
05/22/2023, 10:29 PMowen
05/22/2023, 10:29 PMWilliam Grisaitis
05/23/2023, 1:58 AMWilliam Grisaitis
05/23/2023, 1:59 AMWilliam Grisaitis
05/23/2023, 2:04 AMowen
05/23/2023, 11:44 PMdefs.get_implicit_job_def_for_assets(some_keys)
.
putting this together, you could have a script that looked something like
defs = Definitions(assets=[all, your, assets])
instance = DagsterInstance.get()
asset_graph = defs.get_repository_def().asset_graph
# this bit relies on internal APIs
resolver = CachingStaleStatusResolver(instance, asset_graph)
stale_or_missing_keys: List[AssetKey] = []
for asset_key in asset_selection:
if resolver.get_status(asset_key) in [StaleStatus.STALE, StaleStatus.MISSING]:
stale_or_missing_keys.append(asset_key)
job = defs.get_implicit_job_def_for_assets(stale_or_missing_keys)
job.execute_in_process(instance=instance)
owen
05/23/2023, 11:46 PMBrenda Thng
08/06/2023, 2:00 PMjob.execute_in_process(instance=instance)
update the asset status? E.g. Will running this script update that the asset on the dagster-webserver? Seems like it doesnt, and I'm not sure how to make it reflect on the UIBrenda Thng
08/06/2023, 2:02 PMowen
08/07/2023, 10:59 PM$DAGSTER_HOME
environment variable set to wherever your dagster.yaml file lives (usually ~/.dagster). It might also be worth looking into the graphql API for launching jobs (as executing jobs in process means that you can't have multiple jobs running in parallel). There are also instance methods to kick off explicit backfills, rather than manually submitting runs one by one, but not sure if that goes against what you're trying to achieve hereBrenda Thng
08/08/2023, 5:34 AMdagster.yaml
, but kubernetes helm charts instead?
I did consider the graphql API as well, but I was hoping to be able to use a pipeline to invoke other dagster pipeline runs, so that I can track the progress, and also be able to invoke backfill runs using the dagster UI instead of writing custom scripts.
For "instance methods to kick off explicit backfills", I've seen those. I am trying to perform a large backfill of a few thousand jobs, and whenever I do that, I notice that it will lag the webserver but it puts everything in a queue. Hence the workaround I'm doing it to submit each job individually, one after anotherowen
08/08/2023, 6:19 PMcontext.instance
is what you'd want to pass in as the instance argumentBrenda Thng
08/09/2023, 3:24 PM