https://dagster.io/ logo
s

Simon Späti

11/15/2020, 9:41 AM
Quick question to Conditional Execution with composite_solids:
Dagster’s mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required argument to False. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won’t run.
This doesn’t account for
composite_solids
correct? Because I’m struggling to achieve the same with composites. How would you do it there?
I have this two composites and the
merge_staging_to_delta_table
should be skipped if properties is not set. How can I achieve that, I must miss something.
Copy code
@composite_solid(
    output_defs=[
        OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
    ],
)
def list_changed_properties():
    properties = get_changed_or_new_properties(list_props_immo24())
    return {"properties": properties}


@composite_solid(
    output_defs=[
        OutputDefinition(name="delta_coordinta", dagster_type=DeltaCoordinate, is_required=False)
    ],
)
def merge_staging_to_delta_table(properties: PropertyDataFrame) -> DeltaCoordinate:

    prop_s3_coordinate = upload_to_s3(cache_properies_from_rest_api(properties))
    # return assets for property
    return merge_property_delta(input_dataframe=flatten_json(s3_to_df(prop_s3_coordinate)))

@pipeline
def scrape_realestate():

    properties = list_changed_properties.alias('list_SO_buy_flat')()
    merge_staging_to_delta_table.alias('merge_SO_buy')(properties)
Am I missing something or is conditional execution with composite_solids not working? 🤔
a

alex

11/18/2020, 8:27 PM
yea we don’t handle this interaction well - on the
@composite_solid
the
is_required
on the
OutputDefinition
will effectively be ignored , the only thing that matters is whats set on the
@solid
s
how is
get_changed_or_new_properties
set up? is it an
is_required=False
and does not yield the output in some cases?
s

Simon Späti

11/18/2020, 8:37 PM
What I checked, with solids it works. You think there might be another workaround with keeping composite? the code is following - I played with
dagster_type=Optional[PropertyDataFrame]
as well, with no luck:
Copy code
solid(
    input_defs=[InputDefinition("properties", PropertyDataFrame)],
    output_defs=[
        OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
    ],
    required_resource_keys={'pyspark', 's3'},
    description="""This will check if property is already downloaded. If so, check if price or other
    columns have changed in the meantime, or if date is very old, download again""",
)
def get_changed_or_new_properties(context, properties):

    # prepare ids and fingerprints from fetched properties
    ids: list = [p['id'] for p in properties]
    ids: str = ', '.join(ids)

    #doing my stuff here ...
    
    # select new or changed once
    df_changed = ps.sqldf(
        """
        SELECT p.id, p.fingerprint, p.is_prefix, p.rentOrBuy, p.city, p.propertyType, p.radius, p.last_normalized_price
        FROM pd_properties p LEFT OUTER JOIN pd_existing_props e
            ON p.id = e.propertyDetails_id
            WHERE p.fingerprint != e.fingerprint
                OR e.fingerprint IS NULL
        """
    )
    if df_changed.empty:
        <http://context.log.info|context.log.info>("No property of [{}] changed".format(ids))
    else:
        changed_properties = []
        for index, row in df_changed.iterrows():
            changed_properties.append(row.to_dict())

        ids_changed = ', '.join(str(e) for e in df_changed['id'].tolist())

        <http://context.log.info|context.log.info>("changed properties: {}".format(ids_changed))
        return changed_properties
PropertyDataFrame datatype was just a list but I also tried to allow it to None, I thought I can workaround with that, but didn’t work either:
Copy code
PropertyDataFrame = DagsterType(
    name="PropertyDataFrame",
    type_check_fn=lambda _, value: isinstance(value, list) or value is None,
    description="A List with scraped Properties with id, last_normalized_price and search criterias wich it was found.",
)
a

alex

11/18/2020, 8:41 PM
What I checked, with solids it works
Im not sure what you did that worked as you expected since for conditional execution you need to explicitly
yield Output(properties, "properties")
when you want execution to occur and not yield it when you want it to skipp
if you use a regular function we will capture what ever value is returned and assume that was the output value, including the implicit
None
that is returned if you exit without an explicit
return
we could do better to error out in this condition and provide a useful error message
but i think all you need to do is change
return changed_properties
to
yield Output(changed_properties, "properties")
and it will work - even through the composites
👏 1
1
s

Simon Späti

11/18/2020, 10:15 PM
amazing, that did the trick! I think I had that before, but I didn’t had the `df_changed.empty:`case and only returned if there is something. In combination it works now as well with composite! Now my CDC pipeline is working, thanks so much Alex!
a

alex

11/18/2020, 10:17 PM
thanks for the report - working on a diff to add a warning to help people catch this more easily
s

Simon Späti

11/18/2020, 10:27 PM
That would be awesome. I tried quite some time and couldn’t figure it out. But for sure, more advanced python users would have caught it anyway 😅
2 Views