I've got a situation that I'm trying to work throu...
# ask-community
t
I've got a situation that I'm trying to work through for which I haven't found a direct analogy in the docs or discussions. I've got a set of dynamic partitions in the form of files landing in GCS. I can handle that with a sensor and dynamic partitions, that is solved. The tricky part is that, for each of those files, I need to unpack it and generate a set of assets from it, but for each asset I need to partition it along a dynamic axis. How can I assign the yielded output of a multi-asset to a dynamic partition that is defined at run-time? The code that I have so far is:
Copy code
@sensor(
    default_status=DefaultSensorStatus.RUNNING,
    required_resource_keys={"gcp_gcs"},
)
def gcs_edxorg_archive_sensor(context: SensorEvaluationContext):
    dagster_instance = context.instance
    storage_client = context.resources.gcp_gcs.client
    bucket_name = "simeon-mitx-pipeline-main"
    bucket_prefix = "COLD"
    bucket_files = {
        file_
        for file_ in storage_client.list_blobs(bucket_name, prefix=bucket_prefix)
        if re.match(r"COLD/mitx-\d{4}-\d{2}-\d{2}.tar.gz$", file_.name)
        and file_.name.removeprefix("COLD/")
        not in edxorg_archive_partitions.get_partition_keys(
            dynamic_partitions_store=dagster_instance
        )
    }

    assets = []
    partition_keys = []
    for file_ in bucket_files:
        context.log.debug(f"Processing file {file_.name}")
        assets.append(
            AssetMaterialization(
                asset_key=AssetKey(("edxorg", "raw_data_archive")),
                partition=file_.name.removeprefix("COLD/"),
                description=(
                    "Archive of data exported from <http://edx.org|edx.org> for courses run by MIT. "
                    "Generated by <https://github.com/openedx/edx-analytics-exporter/>"
                ),
                metadata={
                    "source": "edxorg",
                    "path": MetadataValue.path(f"gs://{bucket_name}/{file_.name}"),
                    "creation_date": datetime.strptime(
                        re.search(r"(\d{4}-\d{2}-\d{2})", file_.name).groups()[0],
                        "%Y-%m-%d",
                    ).strftime("%Y-%m-%d"),
                    "size (bytes)": file_.size,
                    "materialization_time": datetime.now(tz=UTC).isoformat(),
                },
            )
        )
        partition_keys.append(file_.name.removeprefix("COLD/"))

    return SensorResult(
        asset_events=assets,
        dynamic_partition_requests=edxorg_archive_partitions.build_add_request(
            partition_keys=partition_keys
        ),
    )


@multi_asset(
    required_resource_keys={"gcp_gcs"},
    deps=[AssetKey(("edxorg", "raw_data_archive"))],
    partitions_def=course_and_source_partitions,
    outs={
        "course_structure": AssetOut(
            key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
        ),
        "course_xml": AssetOut(
            key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
        ),
        "forum_mongo": AssetOut(
            key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
        ),
        "assessment_assessment": AssetOut(
            key_prefix=["edxorg", "raw_data", "db_table"],
            is_required=False,
            group_name="edxorg",
        ),
        ...,
        "workflow_assessmentworkflowstep": AssetOut(
            key_prefix=["edxorg", "raw_data", "db_table"],
            is_required=False,
            group_name="edxorg",
        ),
    },
)
def process_edxorg_archive_bundle(context: AssetExecutionContext):
    dagster_instance = context.instance
    dep: AssetKey = context.job_def.asset_layer.asset_deps.values()[0]
    asset_dep = context.instance.get_event_records(
        event_records_filter=EventRecordsFilter(
            asset_key=dep,
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_partitions=context.partition_key,
        ),
        limit=1,
    )
    gcp: storage.Client = context.resources.gcp_gcs.client
    archive_path = Path(asset_dep["path"].split("/")[-1])
    with archive_path.open("wb") as archive_file:
        gcp.download_blob_to_file(asset_dep["path"], archive_file)

    archive = tarfile.open(archive_path)
    while tinfo := archive.next():
        if not tinfo.isdir():
            asset_info = parse_archive_path(tinfo.name)
            if not asset_info:
                continue
            normalized_source_system = (
                "edge" if "edge" in asset_info["source_system"] else "prod"
            )
            dagster_instance.add_dynamic_partitions(
                course_and_source_partitions.name,
                partition_keys=[normalized_source_system, asset_info["course_id"]],
            )
            output_key = asset_info.get("table_name") or categorize_archive_element(
                tinfo.name
            )
            archive_file = Path(tinfo.name.split("/")[-1]).write_bytes(
                archive.extractfile(tinfo).read()
            )
            data_version = sha256(archive_file).hexdigest()
            yield Output(
                archive_file,
                output_name=output_key,
                data_version=data_version,
            )
@sandy do you happen to have any thoughts?
I'm not seeing a way to specify in the
Output
object which partition the output is associated with.
Would I use the
AssetObservation
to record that piece of data? How would that get reflected to the IO Manager?
Or would this be better off as an
op
that generates a
DynamicOut
with
AssetMaterialization
events for each artifact?
s
Hey Tobias - alas there's not currently a great way to do this in Dagster Here's the issue where we're tracking this functionality: https://github.com/dagster-io/dagster/issues/9559
you might be able to find a way to do this with an
op
and `AssetMaterialization`s, as you mentioned
t
Thanks, I think that's what I'll end up doing for now.