Divyesh Chudasama
07/13/2023, 3:02 PM@op(out=Out(dagster_type=list[pd.DataFrame]))
def fetch_mySQL_raw_data() -> list[pd.DataFrame]:
logging.basicConfig(level=logging.DEBUG)
boto3.set_stream_logger('botocore', level='DEBUG')
s3_client = boto3.client('s3')
bucket_name = 'cobbleweb-ds'
result = []
filenames = {}
prefix = 'raw/nestify/mySQL/'
# Fetch data from the S3 bucket and return the result
response = s3_client.list_objects(Bucket=bucket_name, Prefix=prefix)
for obj in response.get('Contents', []):
filename = obj['Key']
# Skipping the prefix value
if filename == prefix:
continue
file = s3_client.get_object(Bucket=bucket_name, Key=filename)
df = pd.read_csv(file['Body'])
result.append(df)
yield AssetMaterialization(
asset_key=filename,
description="Raw Nestify mySQL data from S3 bucket"
)
return result
# FUNCTION THAT WORKS
@op(out=Out(dagster_type=pd.DataFrame))
def load_test_dataframe() -> pd.DataFrame:
s3_client = boto3.client('s3')
bucket_name = 'cobbleweb-ds'
# Fetch data from the S3 bucket and return the result
response = s3_client.get_object(Bucket=bucket_name, Key='test/mySql_test.csv')
df = pd.read_csv(response['Body'])
print(df)
return df
@job()
def fetch_data_pipeline():
fetch_mySQL_raw_data()
# load_test_dataframe()
chris
07/13/2023, 4:32 PMcontext.log_event(…)
syntax to log that asset materialization instead of yielding it directly. I think the iterator coercion that we do might be getting messy in this case.Divyesh Chudasama
07/13/2023, 4:33 PMDivyesh Chudasama
07/13/2023, 4:36 PMDivyesh Chudasama
07/13/2023, 4:36 PMchris
07/13/2023, 4:36 PM