George Pearse
06/24/2022, 10:17 AM# <https://dagster.io/blog/software-defined-assets>
@asset(
config_schema={
'batch_size': int,
'size_of_batch': int,
'qdrant_collection_name': str,
'source_s3_directory': str,
's3_bucket_name': str,
'parallelism': int,
},
non_argument_deps={'s3_cxr_embeddings'},
)
def cxr_vector_collection(context, ml_cxr_collection: pd.core.frame.DataFrame):
"""Inserts the data in batches to QDrant
Args:
context (_type_): _description_
ml_cxr_collection (pd.core.frame.DataFrame): _description_
"""
qdrant_collection_name = context.op_config['qdrant_collection_name']
source_s3_directory = context.op_config['source_s3_directory']
s3_bucket_name = context.op_config['s3_bucket_name']
batch_size = context.op_config['batch_size']
size_of_batch = context.op_config['size_of_batch']
parallelism = context.op_config['parallelism']
to
# <https://dagster.io/blog/software-defined-assets>
@asset(
non_argument_deps={'s3_cxr_embeddings'},
)
def cxr_vector_collection(context, ml_cxr_collection: pd.core.frame.DataFrame):
"""Inserts the data in batches to QDrant
Args:
context (_type_): _description_
ml_cxr_collection (pd.core.frame.DataFrame): _description_
"""
qdrant_collection_name: str = context.op_config['qdrant_collection_name']
source_s3_directory: str = context.op_config['source_s3_directory']
s3_bucket_name: str = context.op_config['s3_bucket_name']
batch_size: int = context.op_config['batch_size']
size_of_batch: int = context.op_config['size_of_batch']
parallelism: int = context.op_config['parallelism']
Or similar, e.g.
parallelism = context.op_config('parallelism', int)
In such a way that setting the template and the extraction were one and the same. Though I'm not sure if such an API would extend to more complex templates.sandy
06/24/2022, 3:35 PMdef cxr_vector_collection(context, ml_cxr_collection: pd.core.frame.DataFrame, batch_size: Config[int]):
...
Though might be kind of awkward for other reasons. Thoughts?George Pearse
06/24/2022, 4:04 PMsandy
06/24/2022, 4:41 PM