Hello ! I’m new to Dagster and I’m working those d...
# ask-community
Hello ! I’m new to Dagster and I’m working those days on a pretty simple data pipeline to integrate data. I’m trying to seperate things properly in
so I have something like that:
Copy code
def clean_bnc_cases(bnc_cases: DataFrame) -> DataFrame:
  Apply cleaning transformation to the BNC cases DataFrame

  Returns the BNC Cases cleaned and normalized
  df = (bnc_cases
        # .pipe(clean_names)  # Clean column names
        .pipe(process_text, column_name="action", string_function="lower")  # Lower case for action column
        .pipe(process_text, column_name="caseType", string_function="title")  # Title case for caseType column
  return df

def validate_bnc_cases(context, bnc_cases_cleaned: DataFrame) -> Tuple[DataFrame, DataFrame]:
  Valid the BNC Cases against the Pandera schema

  Returns a tuple with the valid and invalid cases
  cases = {}
    cases['valid'] = BNCCases.validate(bnc_cases_cleaned)
  except pa.errors.SchemaError as exc:
    context.log_event(exc.data)  # contains the invalid data
    cases['invalid'] = exc.failure_cases  # contains a dataframe of failure cases
  return cases['valid'], cases['invalid']

def bnc_valid_cases(context, bnc_cases):
  """Filter the BNC Cases that valid (conforming the schema)"""
  valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
                     metadata={"num_rows": len(valid_cases)})
  return valid_cases

def bnc_invalid_cases(context, bnc_cases):
  """Filter the BNC Cases that are invalid (not conforming the schema)"""
  _ , invalid_cases = validate_bnc_cases(clean_bnc_cases(bnc_cases))
                     metadata={"num_rows": len(invalid_cases)})
  return invalid_cases
Hey Nicolas - are you trying to import context from dagster?
Hi @Odette Harary, I’m trying to log an AssetObservation, so I’m passing the context in the asset.
This works:
Copy code
def simple_asset(context):
This is not working:
Copy code
def graph_asset(context):
    <http://context.log.info|context.log.info>("Not working")
It looks we can’t pass the context in a graph asset. Here is the error I got when I try to load the code location:
Copy code
dagster._core.errors.DagsterInvalidDefinitionError: "context" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.
Hey Nicolas - you’re experiencing an error because graphs aren’t meant tot take contexts - graphs just define the dependencies between ops, you shouldn’t be doing any actual computation within them. Any events being logged should be done within the context of an op / asset decorated fxn
I understand but it’s not a
it’s a
, so I was expecting to get the same behaviour of an
Yea it’s a reasonable confusion honestly, but @graph_asset functions much the same as
syntax wise, it just sugars over providing asset dependency information as opposed to something like
for example
does that make sense?
not sure to really understand the difference.
Essentially graphs use a custom DSL that builds up dependencies between ops - there’s no compute happening there
Yeah I get that ! So does it means we can’t log into a
using contetx ?
That’s correct - sorry if that wasn’t clear
It’s clearer now with your explanations about how it’s implemented but definitely not obvious for developers. Maybe improving the documentation would be a good start to prevent that confusion
Yea that’s totally fair - I think graphs in assets are particularly hairy and we don’t do a great job explaining the uniqueness of the syntax. We could definitely do a better job documenting here
going back to my initial question…how would you manage that if I’m not using graph asset ?
Taking a closer look at the actual implementation here… You don’t need to be logging an
event here at all actually. Since everything is wrapped in a graph-backed asset, there will be an
automatically logged upon successful completion of all steps in the graph
but does that make sense to use a graph asset considering I want to chain 2 ops (clean then validate)
Then the validate should return a tuple with the valid rows and invalid rows and I wrap the result in 2 assets (valid, invalid)
Not sure what this means:
Copy code
Then the validate should return a tuple with the valid rows and invalid rows and I wrap the result in 2 assets (valid, invalid)
it means that :
Copy code
@op(description="Valid the BNC Cases against the Pandera schema")
def validate_bnc_cases(
    context, bnc_cases_cleaned: DataFrame
) -> Tuple[DataFrame, DataFrame]:
    Valid the BNC Cases against the Pandera schema

    Returns a tuple with the valid and invalid cases
    cases = {}
        cases["valid"] = BNCCases.validate(bnc_cases_cleaned)
    except pa.errors.SchemaError as exc:
        context.log_event(exc.data)  # contains the invalid data
        cases["invalid"] = exc.failure_cases  # contains a dataframe of failure cases
    return cases["valid"], cases["invalid"]

@graph_asset(description="Filter the BNC Cases that are valid")
def bnc_valid_cases(bnc_cases) -> DataFrame:
    """Filter the BNC Cases that are valid (conforming the schema)"""
    valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
    return valid_cases
I see - yea I think that this is a perfect use case for having a graph backed asset - you have two compute steps but the actual asset produced is the result of those chained compute steps