Danny Steffy
03/03/2023, 7:48 PMdaniel
03/03/2023, 7:49 PMDanny Steffy
03/03/2023, 7:52 PMEngine_Event
saying Starting execution with step handler DockerStepHandler.
Does that launch a new container for each step? How do I configure it not to do that for each step?daniel
03/03/2023, 7:53 PMDanny Steffy
03/03/2023, 7:53 PMdaniel
03/03/2023, 7:54 PMDanny Steffy
03/03/2023, 7:55 PM@graph
def full_scored_data_set(recruiter_teams_to_score, trained_model):
"""chunk the recruiters to score and score them in batches"""
# result = (
# chunking_op(recruiter_teams_to_score)
# .map(lambda batch: generate_score_for_ids(batch, trained_model))
# .collect()
# )
result = (
key_to_score(recruiter_teams_to_score)
.map(
lambda key: score_data_set(
data_to_score_for_key(recruiter_in_batch=key), trained_model
)
)
.collect()
)
return append_scores(result)
graph_asset = AssetsDefinition.from_graph(
full_scored_data_set,
keys_by_input_name={
"recruiter_teams_to_score": AssetKey("recruiter_teams_to_score"),
"trained_model": AssetKey("trained_model"),
},
resource_defs={"mem": mem_io_manager},
)
distributed_scoring_sensor(
define_asset_job(
"distributed_scoring_job",
selection=AssetSelection.groups("distributed_scoring"),
executor_def=docker_executor,
)
),
daniel
03/03/2023, 7:56 PMDanny Steffy
03/03/2023, 7:56 PMdaniel
03/03/2023, 7:57 PMDanny Steffy
03/03/2023, 7:57 PMfull_scored_data_set
assetdaniel
03/03/2023, 7:58 PMDanny Steffy
03/03/2023, 7:58 PMdata_to_score_for_keys
and score_data_set
be run in-process so we can use the memory IO manager (we don't want to persist the data from data_to_score_for_keys
as it's a lot of data to persist to disk). If that's not possible, I'm guessing I would need a custom IO manager that could store the data in a file and then clean up the file after the downstream op ingests it?daniel
03/03/2023, 8:09 PMDanny Steffy
03/03/2023, 8:10 PMdaniel
03/03/2023, 8:10 PMDanny Steffy
03/03/2023, 8:11 PMdaniel
03/03/2023, 8:11 PMDanny Steffy
03/03/2023, 8:11 PMdaniel
03/03/2023, 8:18 PMDanny Steffy
03/03/2023, 8:21 PMdaniel
03/03/2023, 8:23 PMfrom dagster import multiprocess_executor
Definitions(
...,
executor=multiprocess_executor.configured({"max_concurrent": 8})
Danny Steffy
03/03/2023, 8:24 PMdata_to_score_for_key
op be a multiprocess executor, but have the score_data_set
op be an in_process executor? I was hoping to do that so that the data_to_score_for_key
could be persisted only in memory until the score_data_set
op is finished.daniel
03/03/2023, 8:32 PMDanny Steffy
03/03/2023, 8:35 PMdaniel
03/03/2023, 8:37 PMDanny Steffy
03/03/2023, 8:38 PM