Hello ! I’m new to Dagster and I’m working those d...
# ask-community
n
Hello ! I’m new to Dagster and I’m working those days on a pretty simple data pipeline to integrate data. I’m trying to seperate things properly in
assets
and
ops
so I have something like that:
Copy code
@op
def clean_bnc_cases(bnc_cases: DataFrame) -> DataFrame:
  """
  Apply cleaning transformation to the BNC cases DataFrame

  Returns the BNC Cases cleaned and normalized
  """
  df = (bnc_cases
        # .pipe(clean_names)  # Clean column names
        .pipe(process_text, column_name="action", string_function="lower")  # Lower case for action column
        .pipe(process_text, column_name="caseType", string_function="title")  # Title case for caseType column
        )
  return df


@op
def validate_bnc_cases(context, bnc_cases_cleaned: DataFrame) -> Tuple[DataFrame, DataFrame]:
  """
  Valid the BNC Cases against the Pandera schema

  Returns a tuple with the valid and invalid cases
  """
  cases = {}
  try:
    cases['valid'] = BNCCases.validate(bnc_cases_cleaned)
  except pa.errors.SchemaError as exc:
    context.log_event(exc.data)  # contains the invalid data
    cases['invalid'] = exc.failure_cases  # contains a dataframe of failure cases
  return cases['valid'], cases['invalid']


@graph_asset
def bnc_valid_cases(context, bnc_cases):
  """Filter the BNC Cases that valid (conforming the schema)"""
  valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
  context.log_event(
    AssetObservation(asset_key="bnc_valid_cases",
                     metadata={"num_rows": len(valid_cases)})
  )
  return valid_cases


@graph_asset
def bnc_invalid_cases(context, bnc_cases):
  """Filter the BNC Cases that are invalid (not conforming the schema)"""
  _ , invalid_cases = validate_bnc_cases(clean_bnc_cases(bnc_cases))
  context.log_event(
    AssetObservation(asset_key="bnc_invalid_cases",
                     metadata={"num_rows": len(invalid_cases)})
  )
  return invalid_cases
o
Hey Nicolas - are you trying to import context from dagster?
n
Hi @Odette Harary, I’m trying to log an AssetObservation, so I’m passing the context in the asset.
This works:
Copy code
@asset
def simple_asset(context):
    <http://context.log.info|context.log.info>("Working")
This is not working:
Copy code
@graph_asset
def graph_asset(context):
    <http://context.log.info|context.log.info>("Not working")
It looks we can’t pass the context in a graph asset. Here is the error I got when I try to load the code location:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: "context" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.
c
Hey Nicolas - you’re experiencing an error because graphs aren’t meant tot take contexts - graphs just define the dependencies between ops, you shouldn’t be doing any actual computation within them. Any events being logged should be done within the context of an op / asset decorated fxn
n
I understand but it’s not a
graph
it’s a
graph_asset
, so I was expecting to get the same behaviour of an
asset
c
Yea it’s a reasonable confusion honestly, but @graph_asset functions much the same as
graph
syntax wise, it just sugars over providing asset dependency information as opposed to something like
AssetsDefinition.from_graph
for example
does that make sense?
n
not sure to really understand the difference.
c
Essentially graphs use a custom DSL that builds up dependencies between ops - there’s no compute happening there
n
Yeah I get that ! So does it means we can’t log into a
@graph_asset
using contetx ?
c
That’s correct - sorry if that wasn’t clear
n
It’s clearer now with your explanations about how it’s implemented but definitely not obvious for developers. Maybe improving the documentation would be a good start to prevent that confusion
c
Yea that’s totally fair - I think graphs in assets are particularly hairy and we don’t do a great job explaining the uniqueness of the syntax. We could definitely do a better job documenting here
n
👍
going back to my initial question…how would you manage that if I’m not using graph asset ?
c
Taking a closer look at the actual implementation here… You don’t need to be logging an
AssetObservation
event here at all actually. Since everything is wrapped in a graph-backed asset, there will be an
AssetMaterialization
automatically logged upon successful completion of all steps in the graph
n
but does that make sense to use a graph asset considering I want to chain 2 ops (clean then validate)
Then the validate should return a tuple with the valid rows and invalid rows and I wrap the result in 2 assets (valid, invalid)
c
Not sure what this means:
Copy code
Then the validate should return a tuple with the valid rows and invalid rows and I wrap the result in 2 assets (valid, invalid)
n
it means that :
Copy code
@op(description="Valid the BNC Cases against the Pandera schema")
def validate_bnc_cases(
    context, bnc_cases_cleaned: DataFrame
) -> Tuple[DataFrame, DataFrame]:
    """
    Valid the BNC Cases against the Pandera schema

    Returns a tuple with the valid and invalid cases
    """
    cases = {}
    try:
        cases["valid"] = BNCCases.validate(bnc_cases_cleaned)
    except pa.errors.SchemaError as exc:
        context.log_event(exc.data)  # contains the invalid data
        cases["invalid"] = exc.failure_cases  # contains a dataframe of failure cases
    return cases["valid"], cases["invalid"]


@graph_asset(description="Filter the BNC Cases that are valid")
def bnc_valid_cases(bnc_cases) -> DataFrame:
    """Filter the BNC Cases that are valid (conforming the schema)"""
    valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
    return valid_cases
c
I see - yea I think that this is a perfect use case for having a graph backed asset - you have two compute steps but the actual asset produced is the result of those chained compute steps