Agon Shabi
07/16/2021, 11:28 AM() = solid
[] = OUTPUT
(load_training_data) (load_serving_data)
| |
[TRAINING_DATA] [SERVING_DATA]
| |
(create_training_features) (create_serving_features)
| |
[TRAINING_FEATURES] [SERVING_FEATURES]
| |
(train_model) |
| |
[TRAINED_MODEL] |
| |
+-----------+ +-----------+
| |
(serve_model)
|
[PREDICTIONS]
|
(publish_predictions)
Due to restrictions in my work environment, I'm unable to use any of dagster's scheduling capabilities - let's say all we have is cron jobs.
I would like to run this pipeline on two separate schedules in different ways:
1. Annual model train, i.e.
execute_pipeline(
instance=my_instance,
pipeline=my_pipeline,
step_selection=["*train_model"],
run_config={
"resources": {"io_manager": {"config": SOME_PERSISTENT_IO_MANAGER_CONFIG}},
"solids": {
"load_training_data": {"config": "path_to_training_data"},
},
},
)
2. Monthly model "serve" (make predictions), i.e.
reexecute_pipeline(
instance=instance,
pipeline=my_pipeline,
parent_run_id=find_most_recent_training_pipeline_run(instance, ...).run_id, # Implementation not important
step_selection=["load_serving_data*"],
run_config={
"resources": {"io_manager": {"config": SOME_PERSISTENT_IO_MANAGER_CONFIG}},
"solids": {
"load_serving_data": {"config": "path_to_serving_data"},
},
},
)
I think this pattern should allow me to have a single view, in dagit, of this entire model's pipeline, including
• every execution of the "training" subset
• every execution of the "serving" subset, along with which execution of the "training" subset was its "parent"
This should also allow me to:
• retrain the model on an ad-hoc basis, where subsequent "serves" use the most recently trained model
• easily answer the question for any predictions: "which data trained the model that produced this output"?
The problems I'm having are:
1. This second ("serving") execution fails due to missing configuration for solids that I'm not trying to run (e.g. "load_training_data").
Q1: Is it a bug that re-execution with a specific step_selection requires configuration values for solids that won't be executed?
2. When reexecuting via "reexecute_pipeline", there is no indication in the dagit UI that this second ("serving") execution is a derivative of the parent ("training") run.
• When re-executing in dagit, the UI indicates this by showing a 1-dimensional 'graph', with one marked as the 'root'
• I've been able to work around this by explicitly assignbing the run tags: "dagster/parent_run_id" and "dagster/root_run_id", but this feels brittle.
Q2: Is it a bug that these tags are automatically applied when running from dagit, but not when using "reexecute_pipeline"?
Apologies for the wall of text, and have to say I'm loving dagster since introducing it at work!Jessica Franks
07/16/2021, 2:45 PMJosh Lloyd
07/16/2021, 8:42 PMdagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline collective_pipeline
Error 1: Received unexpected config entries "['s3_bucket', 's3_prefix']" at path root:resources:io_manager:config. Expected: "['base_dir']."
my pipeline’s ModeDefinition has a lin in the resource_defs
that read "io_manager": s3_pickle_io_manager,
. and I have the run config in the playground set as follows:
resources:
io_manager:
config:
s3_bucket: <bucket>
s3_prefix: <prefix>
I don’t understand where i’m messing up here ….Daniel Kim
07/17/2021, 9:54 AMDaniel Kim
07/17/2021, 11:16 AM<http://my_job.to|my_job.to>_job(resource_defs={'io_manager': fs_io_manager")
and my dagster.yaml just has:
telemetry:
enabled: false
So not sure why only multiprocess is available or perhaps it is being specified somehow that I am not aware of.
EDIT: Ahhh, nevermind. I see there is a executor_def=
parameter in the to_job()
method. 😆Daniel Kim
07/17/2021, 3:54 PMKeyError: my_graph1
Example code:
import time
from dagster import graph, op, in_process_executor, repository, ScheduleDefinition
@op
def expensive_task1(context):
<http://context.log.info|context.log.info>('Performing expensive task1...')
time.sleep(3)
@op
def expensive_task2(context, my_input):
<http://context.log.info|context.log.info>('Performing expensive task2...')
time.sleep(5)
@op
def expensive_task3(context, my_input):
<http://context.log.info|context.log.info>('Performing expensive task3...')
time.sleep(5)
@graph
def my_graph1():
task1_done = expensive_task1()
task2_done = expensive_task2(task1_done)
@graph
def my_graph2():
task3_done = expensive_task3()
@graph
def my_graph_final():
my_graph1()
my_graph2()
my_job = my_graph_final.to_job()
@repository
def dev_repo():
return [my_job]
Daniel Kim
07/17/2021, 5:47 PMconfig_schema=
is still available for the @op decorator. Will there be a rename for the word "solid" to "op" in the context object and then for to_job()'s config=
, "solids" will be renamed to "ops"?
For example: context.solid_config['some_value']
will change to context.op_config['some_value']
and then
my_job = my_graph.to_job(
name='My_Job',
config={
'solids': {
'expensive_task1': {'config': {'plant_code': 'Plant_Code_123'}},
'expensive_task2': {'config': {'plant_code': 'Plant_Code_123'}}
}
}
)
will be
my_job = my_graph.to_job(
name='My_Job',
config={
'ops': {
'expensive_task1': {'config': {'plant_code': 'Plant_Code_123'}},
'expensive_task2': {'config': {'plant_code': 'Plant_Code_123'}}
}
}
)
Correct? Or are we keeping the solid/solids references still for some reason?madhurt
07/17/2021, 9:06 PMdagster-postgresql-0
pod which refers to an EBS volume (say A) through a StatefulSet: default/dagster-postgresql
. Now, when I delete the cluster (i.e. all the resources are cleared up using eksctl delete cluster
), and when I create a new one + deploy dagster again using helm
, it creates a new EBS volume (say B) for me. So even though A exists, but dagster now uses B and thus I don’t have my previous run history preserved.
So, to make the run history persistent across deletion and creation of the cluster, what do you guys recommend?
1. Is there anyway I can tell helm to use the existing EBS volume (A) instead of the newly created one (B)?
2. Or do you recommend that I use RDS instead of this postgres thing? In case of RDS, there are 2 approaches I could find.
a. First, https://docs.dagster.io/deployment/guides/aws#using-rds-for-run-and-event-log-storage i.e. having all this stuff set in a dagster.yaml
file which I could then supply inside my Docker image.
b. OR secondly, using https://docs.dagster.io/deployment/guides/kubernetes/customizing-your-deployment#configuring-an-external-database in the values.yaml
file.
So, can it be done through 1)
? If not, then for RDS, do I need to do a)
or b)
or both?Arun Kumar
07/19/2021, 4:48 AMevents_for_asset_key
, but I am still not sure how to get the partition info from the Asset key to pass it in the run request. The returned EventLogEntry
does not have the partition info. Is there any other method that I can use?Daniel Kim
07/19/2021, 11:09 AMdescription=
parameter for the to_job()
method. I tried regardless, but the description didn't changed. I also added a description to the @graph
decorator and that didn't do it either.George Pearse
07/19/2021, 11:39 AMSolid get_data did not fire outputs {'result'}
Noah Sanor
07/19/2021, 3:40 PMdagit_base_url
for the slack hook config without hardcoding it? It would be nice to be able to set it as a metadata property per mode and then dynamically grab it from that.Michel Rouly
07/19/2021, 6:25 PMJeff Hulbert
07/19/2021, 7:11 PMRitasha Verma
07/19/2021, 8:10 PMSuraj Narwade
07/20/2021, 8:05 AMSuraj Narwade
07/20/2021, 8:05 AMGeorge Pearse
07/20/2021, 8:11 AMszalai1
07/20/2021, 9:40 AM<http://graph.to|graph.to>_job
could help I believe.
is there any way to keep run_configs
schedules
close to the graphs/pipelines
and resources
tags
close to repos
like: [<http://g.to|g.to>_job(resources, tags..) for g in graphs]
what's the intended way to organise multiple pipelines + run_configs?Jessica Franks
07/20/2021, 1:40 PMGeorge Pearse
07/20/2021, 2:34 PMXu Zhang
07/20/2021, 2:41 PMUttasarga Singh
07/20/2021, 4:27 PMBreandán Kerin
07/20/2021, 4:34 PMDarren Haken
07/20/2021, 5:12 PMRob Meng
07/20/2021, 8:43 PM# Generated from path.to.my.module.__my_config_schema()
@dataclass
class MyConfig():
field1: str
@solid(
config_schema=__my_config_schema()
)
def my_solid(
context: SolidExecutionContext
) -> MyConfig:
return MyConfig(**context.solid_config)
Charles Lariviere
07/20/2021, 8:53 PMHebo Yang
07/20/2021, 10:22 PMTim Zhou
07/21/2021, 2:26 AMio_manager
which db table are the intermediate result outputs of ops/solids stored in (i.e. I am guessing that whatever mechanism allows me to rerun a pipeline/job from the latest successful intermediate step in a given run, is pulling that stored data from somewhere)?
Or is it stored as a file/blob in whatever storage backend I configured?George Pearse
07/21/2021, 7:26 AM