Hello Dagster Community, I need some help with on...
# ask-community
d
Hello Dagster Community, I need some help with one of my ops that I have written. I am getting the 'dagster._core.errors.DagsterStepOutputNotFoundError: Core compute for op "fetch_mySQL_raw_data" did not return an output for non-optional output "result" ' error, I am trying to return a list of pandas Dataframes. I am reading files from my AWS S3 bucket into a pandas dataframe and appending them to a list which I am returning. Maybe I have some misunderstanding, but I have written a test function where I read in a single file from my S3 bucket and return that as a DF and that works perfectly fine. I am providing the code below:
Copy code
@op(out=Out(dagster_type=list[pd.DataFrame]))
def fetch_mySQL_raw_data() -> list[pd.DataFrame]:
    logging.basicConfig(level=logging.DEBUG)
    boto3.set_stream_logger('botocore', level='DEBUG')

    s3_client = boto3.client('s3')
    bucket_name = 'cobbleweb-ds'

    result = []
    filenames = {}
    prefix = 'raw/nestify/mySQL/'

    # Fetch data from the S3 bucket and return the result
    response = s3_client.list_objects(Bucket=bucket_name, Prefix=prefix)

    for obj in response.get('Contents', []):
        filename = obj['Key']
        # Skipping the prefix value
        if filename == prefix:
             continue

        file = s3_client.get_object(Bucket=bucket_name, Key=filename)
        df = pd.read_csv(file['Body'])
        result.append(df)

        yield AssetMaterialization(
            asset_key=filename,
            description="Raw Nestify mySQL data from S3 bucket"
        )

    return result

# FUNCTION THAT WORKS
@op(out=Out(dagster_type=pd.DataFrame))
def load_test_dataframe() -> pd.DataFrame:
    
    s3_client = boto3.client('s3')
    bucket_name = 'cobbleweb-ds'

    # Fetch data from the S3 bucket and return the result
    response = s3_client.get_object(Bucket=bucket_name, Key='test/mySql_test.csv')
 
    df = pd.read_csv(response['Body'])
    print(df)
    return df


@job()
def fetch_data_pipeline():
    fetch_mySQL_raw_data()
    # load_test_dataframe()
c
I bet it’s the combination of return and yield being used here. You should be using the
context.log_event(…)
syntax to log that asset materialization instead of yielding it directly. I think the iterator coercion that we do might be getting messy in this case.
d
Ye I am finding the same. I think it well could be the yield that is screwing it up
@chris that's exactly what it is. I just commented out the yield statement and ran and it worked just fine...Interesting. Thanks man
Spent all day on it lol. Thinking where is it possibly going wrong lol
c
yea we perform some shenanigans under the hood to coerce both regular functions and iterators to work… but when you start to mix the return syntax and yield syntax, I think that the coercion behavior gets a bit more unpredictable
👍 1