Simon Späti
11/15/2020, 9:41 AMDagster’s mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required argument to False. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won’t run.This doesn’t account for
composite_solids
correct? Because I’m struggling to achieve the same with composites. How would you do it there?merge_staging_to_delta_table
should be skipped if properties is not set. How can I achieve that, I must miss something.
@composite_solid(
output_defs=[
OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
],
)
def list_changed_properties():
properties = get_changed_or_new_properties(list_props_immo24())
return {"properties": properties}
@composite_solid(
output_defs=[
OutputDefinition(name="delta_coordinta", dagster_type=DeltaCoordinate, is_required=False)
],
)
def merge_staging_to_delta_table(properties: PropertyDataFrame) -> DeltaCoordinate:
prop_s3_coordinate = upload_to_s3(cache_properies_from_rest_api(properties))
# return assets for property
return merge_property_delta(input_dataframe=flatten_json(s3_to_df(prop_s3_coordinate)))
@pipeline
def scrape_realestate():
properties = list_changed_properties.alias('list_SO_buy_flat')()
merge_staging_to_delta_table.alias('merge_SO_buy')(properties)
alex
11/18/2020, 8:27 PM@composite_solid
the is_required
on the OutputDefinition
will effectively be ignored , the only thing that matters is whats set on the @solid
sget_changed_or_new_properties
set up? is it an is_required=False
and does not yield the output in some cases?Simon Späti
11/18/2020, 8:37 PMdagster_type=Optional[PropertyDataFrame]
as well, with no luck:
solid(
input_defs=[InputDefinition("properties", PropertyDataFrame)],
output_defs=[
OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
],
required_resource_keys={'pyspark', 's3'},
description="""This will check if property is already downloaded. If so, check if price or other
columns have changed in the meantime, or if date is very old, download again""",
)
def get_changed_or_new_properties(context, properties):
# prepare ids and fingerprints from fetched properties
ids: list = [p['id'] for p in properties]
ids: str = ', '.join(ids)
#doing my stuff here ...
# select new or changed once
df_changed = ps.sqldf(
"""
SELECT p.id, p.fingerprint, p.is_prefix, p.rentOrBuy, p.city, p.propertyType, p.radius, p.last_normalized_price
FROM pd_properties p LEFT OUTER JOIN pd_existing_props e
ON p.id = e.propertyDetails_id
WHERE p.fingerprint != e.fingerprint
OR e.fingerprint IS NULL
"""
)
if df_changed.empty:
<http://context.log.info|context.log.info>("No property of [{}] changed".format(ids))
else:
changed_properties = []
for index, row in df_changed.iterrows():
changed_properties.append(row.to_dict())
ids_changed = ', '.join(str(e) for e in df_changed['id'].tolist())
<http://context.log.info|context.log.info>("changed properties: {}".format(ids_changed))
return changed_properties
PropertyDataFrame datatype was just a list but I also tried to allow it to None, I thought I can workaround with that, but didn’t work either:
PropertyDataFrame = DagsterType(
name="PropertyDataFrame",
type_check_fn=lambda _, value: isinstance(value, list) or value is None,
description="A List with scraped Properties with id, last_normalized_price and search criterias wich it was found.",
)
alex
11/18/2020, 8:41 PMWhat I checked, with solids it worksIm not sure what you did that worked as you expected since for conditional execution you need to explicitly
yield Output(properties, "properties")
when you want execution to occur and not yield it when you want it to skippNone
that is returned if you exit without an explicit return
return changed_properties
to
yield Output(changed_properties, "properties")
and it will work - even through the compositesSimon Späti
11/18/2020, 10:15 PMalex
11/18/2020, 10:17 PMSimon Späti
11/18/2020, 10:27 PM