Quick question to <Conditional Execution> with com...
# announcements
Quick question to Conditional Execution with composite_solids:
Dagster’s mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required argument to False. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won’t run.
This doesn’t account for
correct? Because I’m struggling to achieve the same with composites. How would you do it there?
I have this two composites and the
should be skipped if properties is not set. How can I achieve that, I must miss something.
Copy code
        OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
def list_changed_properties():
    properties = get_changed_or_new_properties(list_props_immo24())
    return {"properties": properties}

        OutputDefinition(name="delta_coordinta", dagster_type=DeltaCoordinate, is_required=False)
def merge_staging_to_delta_table(properties: PropertyDataFrame) -> DeltaCoordinate:

    prop_s3_coordinate = upload_to_s3(cache_properies_from_rest_api(properties))
    # return assets for property
    return merge_property_delta(input_dataframe=flatten_json(s3_to_df(prop_s3_coordinate)))

def scrape_realestate():

    properties = list_changed_properties.alias('list_SO_buy_flat')()
Am I missing something or is conditional execution with composite_solids not working? 🤔
yea we don’t handle this interaction well - on the
on the
will effectively be ignored , the only thing that matters is whats set on the
how is
set up? is it an
and does not yield the output in some cases?
What I checked, with solids it works. You think there might be another workaround with keeping composite? the code is following - I played with
as well, with no luck:
Copy code
    input_defs=[InputDefinition("properties", PropertyDataFrame)],
        OutputDefinition(name="properties", dagster_type=PropertyDataFrame, is_required=False),
    required_resource_keys={'pyspark', 's3'},
    description="""This will check if property is already downloaded. If so, check if price or other
    columns have changed in the meantime, or if date is very old, download again""",
def get_changed_or_new_properties(context, properties):

    # prepare ids and fingerprints from fetched properties
    ids: list = [p['id'] for p in properties]
    ids: str = ', '.join(ids)

    #doing my stuff here ...
    # select new or changed once
    df_changed = ps.sqldf(
        SELECT p.id, p.fingerprint, p.is_prefix, p.rentOrBuy, p.city, p.propertyType, p.radius, p.last_normalized_price
        FROM pd_properties p LEFT OUTER JOIN pd_existing_props e
            ON p.id = e.propertyDetails_id
            WHERE p.fingerprint != e.fingerprint
                OR e.fingerprint IS NULL
    if df_changed.empty:
        <http://context.log.info|context.log.info>("No property of [{}] changed".format(ids))
        changed_properties = []
        for index, row in df_changed.iterrows():

        ids_changed = ', '.join(str(e) for e in df_changed['id'].tolist())

        <http://context.log.info|context.log.info>("changed properties: {}".format(ids_changed))
        return changed_properties
PropertyDataFrame datatype was just a list but I also tried to allow it to None, I thought I can workaround with that, but didn’t work either:
Copy code
PropertyDataFrame = DagsterType(
    type_check_fn=lambda _, value: isinstance(value, list) or value is None,
    description="A List with scraped Properties with id, last_normalized_price and search criterias wich it was found.",
What I checked, with solids it works
Im not sure what you did that worked as you expected since for conditional execution you need to explicitly
yield Output(properties, "properties")
when you want execution to occur and not yield it when you want it to skipp
if you use a regular function we will capture what ever value is returned and assume that was the output value, including the implicit
that is returned if you exit without an explicit
we could do better to error out in this condition and provide a useful error message
but i think all you need to do is change
return changed_properties
yield Output(changed_properties, "properties")
and it will work - even through the composites
👏 1
amazing, that did the trick! I think I had that before, but I didn’t had the `df_changed.empty:`case and only returned if there is something. In combination it works now as well with composite! Now my CDC pipeline is working, thanks so much Alex!
thanks for the report - working on a diff to add a warning to help people catch this more easily
That would be awesome. I tried quite some time and couldn’t figure it out. But for sure, more advanced python users would have caught it anyway 😅