I'm trying to split up an asset in a graph into eq...
# ask-community
d
I'm trying to split up an asset in a graph into equal parts and then run an op on each of those parts. However this is currently failing to load due to
InputDefinition
not being iterable. What am I doing wrong here? Code in thread
dagster bot responded by community 1
Copy code
@graph
def full_scored_data_set(
    ids_to_score: pd.DataFrame,
    trained_model: xgb.XGBClassifier,
) -> pd.DataFrame:
    """chunk the recruiters to score and score them in batches"""
    chunked_ids = np.array_split(ids_to_score, 10)
    fan_outs = []
    for chunk in chunked_ids:
        chunk_df = pd.DataFrame.from_records(chunk)
        fan_outs.append(
            generate_score_factory(
                ins={
                    "ids_to_score": chunk_df,
                    "trained_model": trained_model,
                },
            )
        )

    return append_scores(fan_outs)
failing on this part
chunked_ids = np.array_split(ids_to_score, 10)
do I need to partition my input asset by an integer somehow?
z
you can't really run arbitrary code in a dagster graph definition. the code inside a graph definition is compiled into a DAG when Dagster initially loads your code, so it doesn't run the way you would expect when you actually run a job. to do a fan-out you'll want to encapsulate that logic into an op that yields a DynamicOutput for each chunk you want to analyze in your fan-out ops
d
would changing the graph to a job allow it to compile the way I expect? or I would need to create an op factory that would generate a dynamicoutput for each chunk?
z
a job is just a configured and executable graph, so that won't help you (although you'll need to turn it into a job to execute it, which you can do simply with full_scored_data_set.to_job()). you shouldn't need an op factory, you just need an upstream op that encapsulates all that chunking logic. something like
Copy code
@op(out=DynamicOut())
def chunking_op(ids_to_score, trained_model):
    chunked_ids = np.array_split(ids_to_score, 10)
    fan_outs = []
    for chunk in chunked_ids:
        chunk_df = pd.DataFrame.from_records(chunk)
        yield DynamicOutput({
                    "ids_to_score": chunk_df,
                    "trained_model": trained_model,
                })

@graph
def full_scored_data_set() -> pd.DataFrame:
    """chunk the recruiters to score and score them in batches"""
    result = chunking_op.map(generate_score_factory).collect()
    return append_scores(result)
d
ah! I see
thank you!
z
no problem, hope that helps!
d
I'm getting this error now:
AttributeError: 'OpDefinition' object has no attribute 'map'
Am I missing some sort of import or does map not work for this case?
z
how are you executing this code?
d
I'm just trying to load it into dagster as a code location
z
ah looks like I missing the () to actually call the chunking_op -
Copy code
result = chunking_op().map(generate_score_factory).collect()
d
ah I see! So then I would need to get rid of my generate_score_factory, which is an op generating factory, and instead change that to an op that uses a Dynamic Input?
actually, it looks like there aren't dynamic inputs? I need the scoring op to also take in another input asset besides the chunking_op
ah looks like a lambda
I see that in the doc now
👍 1
oh wait, I see the chunking op was already passing that through
z
yeah you shouldn't need to do the op factory thing
d
how do I access those inputs in my called scoring_op though?
Copy code
@op(out=DynamicOut())
def chunking_op(recruiter_team_to_score, trained_model):
    chunked_ids = np.array_split(recruiter_team_to_score, 10)
    for chunk in chunked_ids:
        chunk_df = pd.DataFrame.from_records(chunk)
        yield DynamicOutput(
            {
                "ids_to_score": chunk_df,
                "trained_model": trained_model,
            }
        )


@graph
def full_scored_data_set() -> pd.DataFrame:
    """chunk the recruiters to score and score them in batches"""
    result = chunking_op().map(generate_score_for_ids).collect()
    return append_scores(result)


@op(
    required_resource_keys={"sql_04_resource"},
)
def generate_score_for_ids(context, ins: Dict[str, In]):
    scoring_df = data_set_to_score_batch(
        recruiters_teams_to_score=ins["ids_to_score"],
        sql_conn=context.resources.sql_04_resource,
    )
    return scored_data(scoring_df, ins["trained_model"])
z
same way you would any input:
Copy code
@op
def scoring_op(context, fanned_out_input):
    <http://context.log.info|context.log.info>(f"this is the ids_to_score: {fanned_out_input['ids_to_score']}")
    <http://context.log.info|context.log.info>(f"this is the trained_model: {trained_model}")
yeah that looks right
d
hm, I get an error saying
generate_score_for_ids is not defined
z
yeah you need to define it above your graph
python executes from top-down
so it's not defined when python reads your graph definition
d
🤦 thanks, still pretty new to python
z
no worries! I still do that stuff and I've been coding for like 15 years lol
d
so at the very end of the file I put in
graph_asset = AssetsDefinition.from_graph(full_scored_data_set)
to turn that into an asset, right? when I try to materialize that, I get this error:
Copy code
_ASSET_JOB cannot be executed with the provided config. Please fix the following errors:

Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'full_scored_data_set': {'ops': {'chunking_op': {'inputs': {'recruiter_team_to_score': '<selector>', 'trained_model': '<selector>'}}}}}}
Do I need to specify those values in the chunking_op as specific assets we have defined elsewhere?
z
nice so the graph is compiling. so I'm not sure how well assets and ops interoperate right now, so to keep it simple I'd start with loading these inputs through an op. the job needs direction on where your inputs are stored, i.e. where your dataframe and model live - are these stored in a filesystem or on cloud storage somewhere?
you'll basically want an initial op that can explicitly load those objects from wherever they're stored (possibly with the paths configurable via op config), then pass those objects from that op to your chunking_op
if those are assets somewhere already defined in Dagster and can be imported into the module (maybe this is more where you were headed since you created an asset from the graph), then you can make your graph-backed asset dependent on them, like in this example
d
ah it looks like adding the keys_by_input_name and passing those through the graph worked and it was able to attempt to run!
🎉 1
looks like the DynamicOutput is missing a mapping_key though
z
ah yeah good catch
d
it's running! thanks for all your help!
z
awesome, glad we got it going!
🎉 1
d
so just want to double check with this, this will parallelize these and we can have each of the generate_score_for_ids op in their own container in production? We're just starting out to use dagster
z
it depends on what executor you're using. if you're using the multiprocess executor, which is the default, it'll launch each one in its own process. you can use the docker executor to launch each step in its own container
d
excellent, thank you!
just to follow up on this using the docker executor... I have this graph defined as an asset, and then have a sensor that uses a
define_asset_job
to materialize that asset. It looks like the entire job is run inside a single docker container. How can I specify that each step (particularly the fan out) should be done in it's own docker container?
it looks like
define_asset_job
takes in a config, which I'm presuming is where I can specify the executor? But I'm not sure how to go about doing that
oh wait there's
executor_def
facepalm
to come back to this much much later... I wrote most of our scoring processing in plain python, instead of defining ops for each as I'm not sure the best way to write them as ops. We now have the keys we want chunked as their own ops(so basically batches of 300+ keys in a single op) I now want to use those keys to make another database dip, get data, and run that data through our scoring system, and then output that data somewhere. My thought is the database dip, the scoring, and the output could all be their own op, with different IO managers and storing the data in memory temporarily until the op is finished. But I'm not sure how to piece those together in the code. I'm currently just using a for-loop and going over each key in plain python. How would it look using ops instead?
Would it be as simple as changing the map call here to run multiple ops? How could I tell the mapped op that it needs to run the process on each row?
Copy code
chunking_op(recruiter_teams_to_score)
        .map(lambda batch: generate_score_for_ids(batch, trained_model))
        .collect()
I'm just not sure how to change this up to be more in-line with an op. There is where I'm doing my for-loop for the keys in the batch and reading the data in, which we immediately score against a model instead of writing out as an asset or anything
Copy code
def score_data_set_batch(
    context, recruiters_teams_to_score: pd.DataFrame, sql_conn, trained_model
):
    score_df = pd.DataFrame()
    dirname = os.path.dirname(__file__)
    filename = os.path.join(dirname, "sql/recruiter_user_recruitable_athlete.sql")
    count = 0
    tic = dt.datetime.now()
    logging_tic = dt.datetime.now()
    scoring_metadata = []
    for index, row in recruiters_teams_to_score.iterrows():
        sql = open(filename).read()
        sql = sql.replace("@RecruiterUserid", str(row["userid"]))
        sql = sql.replace("@RecruiterTeamid", str(row["teamid"]))
        start_time = time.time()
        succeeded = False
        retry_count = 0
        scoring_batch = None
        while not succeeded and retry_count < 5:
            try:
                scoring_batch = pd.read_sql(sql, sql_conn)
would I maybe need to fan out twice?