Nicolas Huray
03/07/2023, 7:12 PMassets
and ops
so I have something like that:
@op
def clean_bnc_cases(bnc_cases: DataFrame) -> DataFrame:
"""
Apply cleaning transformation to the BNC cases DataFrame
Returns the BNC Cases cleaned and normalized
"""
df = (bnc_cases
# .pipe(clean_names) # Clean column names
.pipe(process_text, column_name="action", string_function="lower") # Lower case for action column
.pipe(process_text, column_name="caseType", string_function="title") # Title case for caseType column
)
return df
@op
def validate_bnc_cases(context, bnc_cases_cleaned: DataFrame) -> Tuple[DataFrame, DataFrame]:
"""
Valid the BNC Cases against the Pandera schema
Returns a tuple with the valid and invalid cases
"""
cases = {}
try:
cases['valid'] = BNCCases.validate(bnc_cases_cleaned)
except pa.errors.SchemaError as exc:
context.log_event(exc.data) # contains the invalid data
cases['invalid'] = exc.failure_cases # contains a dataframe of failure cases
return cases['valid'], cases['invalid']
@graph_asset
def bnc_valid_cases(context, bnc_cases):
"""Filter the BNC Cases that valid (conforming the schema)"""
valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
context.log_event(
AssetObservation(asset_key="bnc_valid_cases",
metadata={"num_rows": len(valid_cases)})
)
return valid_cases
@graph_asset
def bnc_invalid_cases(context, bnc_cases):
"""Filter the BNC Cases that are invalid (not conforming the schema)"""
_ , invalid_cases = validate_bnc_cases(clean_bnc_cases(bnc_cases))
context.log_event(
AssetObservation(asset_key="bnc_invalid_cases",
metadata={"num_rows": len(invalid_cases)})
)
return invalid_cases
Odette Harary
03/08/2023, 3:15 PMNicolas Huray
03/08/2023, 6:44 PMNicolas Huray
03/08/2023, 6:44 PM@asset
def simple_asset(context):
<http://context.log.info|context.log.info>("Working")
Nicolas Huray
03/08/2023, 6:45 PM@graph_asset
def graph_asset(context):
<http://context.log.info|context.log.info>("Not working")
Nicolas Huray
03/08/2023, 6:46 PMdagster._core.errors.DagsterInvalidDefinitionError: "context" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.
chris
03/08/2023, 7:31 PMNicolas Huray
03/08/2023, 7:33 PMgraph
it’s a graph_asset
, so I was expecting to get the same behaviour of an asset
chris
03/08/2023, 7:34 PMgraph
syntax wise, it just sugars over providing asset dependency information as opposed to something like AssetsDefinition.from_graph
for examplechris
03/08/2023, 7:34 PMNicolas Huray
03/08/2023, 7:36 PMchris
03/08/2023, 7:39 PMNicolas Huray
03/08/2023, 7:40 PM@graph_asset
using contetx ?chris
03/08/2023, 7:41 PMNicolas Huray
03/08/2023, 8:13 PMchris
03/08/2023, 9:12 PMNicolas Huray
03/08/2023, 9:12 PMNicolas Huray
03/08/2023, 9:13 PMchris
03/08/2023, 9:19 PMAssetObservation
event here at all actually. Since everything is wrapped in a graph-backed asset, there will be an AssetMaterialization
automatically logged upon successful completion of all steps in the graphNicolas Huray
03/08/2023, 10:13 PMNicolas Huray
03/08/2023, 10:14 PMchris
03/08/2023, 10:15 PMThen the validate should return a tuple with the valid rows and invalid rows and I wrap the result in 2 assets (valid, invalid)
Nicolas Huray
03/08/2023, 10:35 PM@op(description="Valid the BNC Cases against the Pandera schema")
def validate_bnc_cases(
context, bnc_cases_cleaned: DataFrame
) -> Tuple[DataFrame, DataFrame]:
"""
Valid the BNC Cases against the Pandera schema
Returns a tuple with the valid and invalid cases
"""
cases = {}
try:
cases["valid"] = BNCCases.validate(bnc_cases_cleaned)
except pa.errors.SchemaError as exc:
context.log_event(exc.data) # contains the invalid data
cases["invalid"] = exc.failure_cases # contains a dataframe of failure cases
return cases["valid"], cases["invalid"]
@graph_asset(description="Filter the BNC Cases that are valid")
def bnc_valid_cases(bnc_cases) -> DataFrame:
"""Filter the BNC Cases that are valid (conforming the schema)"""
valid_cases, _ = validate_bnc_cases(clean_bnc_cases(bnc_cases))
return valid_cases
chris
03/08/2023, 10:47 PM