Danny Steffy
02/02/2023, 7:19 PMInputDefinition
not being iterable. What am I doing wrong here? Code in thread@graph
def full_scored_data_set(
ids_to_score: pd.DataFrame,
trained_model: xgb.XGBClassifier,
) -> pd.DataFrame:
"""chunk the recruiters to score and score them in batches"""
chunked_ids = np.array_split(ids_to_score, 10)
fan_outs = []
for chunk in chunked_ids:
chunk_df = pd.DataFrame.from_records(chunk)
fan_outs.append(
generate_score_factory(
ins={
"ids_to_score": chunk_df,
"trained_model": trained_model,
},
)
)
return append_scores(fan_outs)
chunked_ids = np.array_split(ids_to_score, 10)
Zach
02/02/2023, 7:38 PMDanny Steffy
02/02/2023, 7:50 PMZach
02/02/2023, 7:55 PM@op(out=DynamicOut())
def chunking_op(ids_to_score, trained_model):
chunked_ids = np.array_split(ids_to_score, 10)
fan_outs = []
for chunk in chunked_ids:
chunk_df = pd.DataFrame.from_records(chunk)
yield DynamicOutput({
"ids_to_score": chunk_df,
"trained_model": trained_model,
})
@graph
def full_scored_data_set() -> pd.DataFrame:
"""chunk the recruiters to score and score them in batches"""
result = chunking_op.map(generate_score_factory).collect()
return append_scores(result)
Danny Steffy
02/02/2023, 7:56 PMZach
02/02/2023, 7:57 PMDanny Steffy
02/02/2023, 7:59 PMAttributeError: 'OpDefinition' object has no attribute 'map'
Am I missing some sort of import or does map not work for this case?Zach
02/02/2023, 8:00 PMDanny Steffy
02/02/2023, 8:08 PMZach
02/02/2023, 8:11 PMresult = chunking_op().map(generate_score_factory).collect()
Danny Steffy
02/02/2023, 8:13 PMZach
02/02/2023, 8:17 PMDanny Steffy
02/02/2023, 8:18 PM@op(out=DynamicOut())
def chunking_op(recruiter_team_to_score, trained_model):
chunked_ids = np.array_split(recruiter_team_to_score, 10)
for chunk in chunked_ids:
chunk_df = pd.DataFrame.from_records(chunk)
yield DynamicOutput(
{
"ids_to_score": chunk_df,
"trained_model": trained_model,
}
)
@graph
def full_scored_data_set() -> pd.DataFrame:
"""chunk the recruiters to score and score them in batches"""
result = chunking_op().map(generate_score_for_ids).collect()
return append_scores(result)
@op(
required_resource_keys={"sql_04_resource"},
)
def generate_score_for_ids(context, ins: Dict[str, In]):
scoring_df = data_set_to_score_batch(
recruiters_teams_to_score=ins["ids_to_score"],
sql_conn=context.resources.sql_04_resource,
)
return scored_data(scoring_df, ins["trained_model"])
Zach
02/02/2023, 8:21 PM@op
def scoring_op(context, fanned_out_input):
<http://context.log.info|context.log.info>(f"this is the ids_to_score: {fanned_out_input['ids_to_score']}")
<http://context.log.info|context.log.info>(f"this is the trained_model: {trained_model}")
Danny Steffy
02/02/2023, 8:23 PMgenerate_score_for_ids is not defined
Zach
02/02/2023, 8:24 PMDanny Steffy
02/02/2023, 8:26 PMZach
02/02/2023, 8:28 PMDanny Steffy
02/02/2023, 8:34 PMgraph_asset = AssetsDefinition.from_graph(full_scored_data_set)
to turn that into an asset, right? when I try to materialize that, I get this error:
_ASSET_JOB cannot be executed with the provided config. Please fix the following errors:
Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'full_scored_data_set': {'ops': {'chunking_op': {'inputs': {'recruiter_team_to_score': '<selector>', 'trained_model': '<selector>'}}}}}}
Do I need to specify those values in the chunking_op as specific assets we have defined elsewhere?Zach
02/02/2023, 8:41 PMDanny Steffy
02/02/2023, 8:52 PMZach
02/02/2023, 8:52 PMDanny Steffy
02/02/2023, 8:55 PMZach
02/02/2023, 9:01 PMDanny Steffy
02/02/2023, 9:35 PMZach
02/02/2023, 9:38 PMDanny Steffy
02/02/2023, 9:38 PMdefine_asset_job
to materialize that asset. It looks like the entire job is run inside a single docker container. How can I specify that each step (particularly the fan out) should be done in it's own docker container?define_asset_job
takes in a config, which I'm presuming is where I can specify the executor? But I'm not sure how to go about doing thatexecutor_def
facepalmchunking_op(recruiter_teams_to_score)
.map(lambda batch: generate_score_for_ids(batch, trained_model))
.collect()
def score_data_set_batch(
context, recruiters_teams_to_score: pd.DataFrame, sql_conn, trained_model
):
score_df = pd.DataFrame()
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, "sql/recruiter_user_recruitable_athlete.sql")
count = 0
tic = dt.datetime.now()
logging_tic = dt.datetime.now()
scoring_metadata = []
for index, row in recruiters_teams_to_score.iterrows():
sql = open(filename).read()
sql = sql.replace("@RecruiterUserid", str(row["userid"]))
sql = sql.replace("@RecruiterTeamid", str(row["teamid"]))
start_time = time.time()
succeeded = False
retry_count = 0
scoring_batch = None
while not succeeded and retry_count < 5:
try:
scoring_batch = pd.read_sql(sql, sql_conn)