Tobias Macey
02/09/2024, 8:29 PM@sensor(
default_status=DefaultSensorStatus.RUNNING,
required_resource_keys={"gcp_gcs"},
)
def gcs_edxorg_archive_sensor(context: SensorEvaluationContext):
dagster_instance = context.instance
storage_client = context.resources.gcp_gcs.client
bucket_name = "simeon-mitx-pipeline-main"
bucket_prefix = "COLD"
bucket_files = {
file_
for file_ in storage_client.list_blobs(bucket_name, prefix=bucket_prefix)
if re.match(r"COLD/mitx-\d{4}-\d{2}-\d{2}.tar.gz$", file_.name)
and file_.name.removeprefix("COLD/")
not in edxorg_archive_partitions.get_partition_keys(
dynamic_partitions_store=dagster_instance
)
}
assets = []
partition_keys = []
for file_ in bucket_files:
context.log.debug(f"Processing file {file_.name}")
assets.append(
AssetMaterialization(
asset_key=AssetKey(("edxorg", "raw_data_archive")),
partition=file_.name.removeprefix("COLD/"),
description=(
"Archive of data exported from <http://edx.org|edx.org> for courses run by MIT. "
"Generated by <https://github.com/openedx/edx-analytics-exporter/>"
),
metadata={
"source": "edxorg",
"path": MetadataValue.path(f"gs://{bucket_name}/{file_.name}"),
"creation_date": datetime.strptime(
re.search(r"(\d{4}-\d{2}-\d{2})", file_.name).groups()[0],
"%Y-%m-%d",
).strftime("%Y-%m-%d"),
"size (bytes)": file_.size,
"materialization_time": datetime.now(tz=UTC).isoformat(),
},
)
)
partition_keys.append(file_.name.removeprefix("COLD/"))
return SensorResult(
asset_events=assets,
dynamic_partition_requests=edxorg_archive_partitions.build_add_request(
partition_keys=partition_keys
),
)
@multi_asset(
required_resource_keys={"gcp_gcs"},
deps=[AssetKey(("edxorg", "raw_data_archive"))],
partitions_def=course_and_source_partitions,
outs={
"course_structure": AssetOut(
key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
),
"course_xml": AssetOut(
key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
),
"forum_mongo": AssetOut(
key_prefix=["edxorg", "raw_data"], is_required=False, group_name="edxorg"
),
"assessment_assessment": AssetOut(
key_prefix=["edxorg", "raw_data", "db_table"],
is_required=False,
group_name="edxorg",
),
...,
"workflow_assessmentworkflowstep": AssetOut(
key_prefix=["edxorg", "raw_data", "db_table"],
is_required=False,
group_name="edxorg",
),
},
)
def process_edxorg_archive_bundle(context: AssetExecutionContext):
dagster_instance = context.instance
dep: AssetKey = context.job_def.asset_layer.asset_deps.values()[0]
asset_dep = context.instance.get_event_records(
event_records_filter=EventRecordsFilter(
asset_key=dep,
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_partitions=context.partition_key,
),
limit=1,
)
gcp: storage.Client = context.resources.gcp_gcs.client
archive_path = Path(asset_dep["path"].split("/")[-1])
with archive_path.open("wb") as archive_file:
gcp.download_blob_to_file(asset_dep["path"], archive_file)
archive = tarfile.open(archive_path)
while tinfo := archive.next():
if not tinfo.isdir():
asset_info = parse_archive_path(tinfo.name)
if not asset_info:
continue
normalized_source_system = (
"edge" if "edge" in asset_info["source_system"] else "prod"
)
dagster_instance.add_dynamic_partitions(
course_and_source_partitions.name,
partition_keys=[normalized_source_system, asset_info["course_id"]],
)
output_key = asset_info.get("table_name") or categorize_archive_element(
tinfo.name
)
archive_file = Path(tinfo.name.split("/")[-1]).write_bytes(
archive.extractfile(tinfo).read()
)
data_version = sha256(archive_file).hexdigest()
yield Output(
archive_file,
output_name=output_key,
data_version=data_version,
)
Tobias Macey
02/09/2024, 8:29 PMTobias Macey
02/09/2024, 8:30 PMOutput
object which partition the output is associated with.Tobias Macey
02/09/2024, 8:31 PMAssetObservation
to record that piece of data? How would that get reflected to the IO Manager?Tobias Macey
02/09/2024, 8:59 PMop
that generates a DynamicOut
with AssetMaterialization
events for each artifact?sandy
02/10/2024, 12:02 AMsandy
02/10/2024, 12:03 AMop
and `AssetMaterialization`s, as you mentionedTobias Macey
02/10/2024, 2:53 AM