Alex Hunsberger
09/06/2023, 7:17 AMAlex Hunsberger
09/06/2023, 2:58 PMimport os
from datetime import datetime, timedelta
from typing import Dict
from minio import Minio
from dagster import (asset, AssetIn, FilesystemIOManager, Definitions, ConfigurableResource,
sensor, RunRequest, Config,
AutoMaterializePolicy, AssetExecutionContext,
TimeWindowPartitionsDefinition, AssetSelection, job, op)
from dagster_graphql import DagsterGraphQLClient
from pydantic import PrivateAttr
BUCKET_NAME = "files"
START_DATE = datetime(2023, 9, 1)
END_DATE = datetime.now() + timedelta(days=1)
class S3ClientResource(ConfigurableResource):
_client: Minio = PrivateAttr()
def setup_for_execution(self, context) -> None:
self._client = Minio(...)
def client(self):
return self._client
assets = []
sensors = []
cur_date = START_DATE
while cur_date < END_DATE:
dt_suffix = cur_date.strftime("%Y%m%d")
group_name = f"FILE_{dt_suffix}"
s3_bucket_asset_name = f"s3_bucket_{dt_suffix}"
@asset(
group_name=group_name,
auto_materialize_policy=AutoMaterializePolicy.eager(),
name=s3_bucket_asset_name,
)
def s3_bucket_asset(context: AssetExecutionContext, s3: S3ClientResource) -> str:
...
return BUCKET_NAME
assets.append(s3_bucket_asset)
raw_minute_files_asset_name = f"raw_minute_files_{dt_suffix}"
@asset(
group_name=group_name,
partitions_def=TimeWindowPartitionsDefinition(
cron_schedule="* * * * *",
start=cur_date,
end=cur_date + timedelta(hours=1),
fmt="%Y-%m-%d %H:%M:%S",
),
ins={"bucket": AssetIn(s3_bucket_asset_name)},
name=raw_minute_files_asset_name,
)
def raw_minute_files_asset(
context: AssetExecutionContext,
bucket: str,
s3: S3ClientResource
) -> str:
dt_range = context.partition_key_range
<http://context.log.info|context.log.info>(f"minute files, bucket: '{bucket}', date: '{dt_range.start}'")
if dt_range.start != dt_range.end:
raise NotImplementedError("Dagster doesn't support multi-partition runs yet")
context.add_output_metadata(metadata={"partition": dt_range.start})
return "..."
assets.append(raw_minute_files_asset)
combined_hour_file_asset_name = f"combined_hour_file_{dt_suffix}"
@asset(
group_name=group_name,
auto_materialize_policy=AutoMaterializePolicy.eager(),
ins={
"bucket": AssetIn(s3_bucket_asset_name),
"minute_files": AssetIn(raw_minute_files_asset_name)
},
name=combined_hour_file_asset_name,
)
def combined_hour_file_asset(
context: AssetExecutionContext,
bucket: str,
minute_files: Dict[str, str],
s3: S3ClientResource
) -> str:
<http://context.log.info|context.log.info>(f"Combined asset, bucket: '{bucket}', minute_files: '{minute_files}'")
return "..."
assets.append(combined_hour_file_asset)
raw_file_sensor_name = f"raw_file_{dt_suffix}"
@sensor(
asset_selection=AssetSelection.keys(raw_minute_files_asset_name),
minimum_interval_seconds=30,
name=raw_file_sensor_name,
)
def raw_file_sensor(context) -> None:
partition = "..."
yield RunRequest(
run_key=partition,
partition_key=partition,
)
sensors.append(raw_file_sensor)
cur_date = cur_date + timedelta(days=1)
@asset(group_name=datetime.now().strftime("FOO_%Y%m%d_%H%M%S"))
def dummy_asset(context):
return ""
assets.append(dummy_asset)
@op
def dummy_op(context):
return
@job
def dummy_job():
dummy_op()
@sensor(job=dummy_job, minimum_interval_seconds=30)
def reload_code_sensor(context):
client = DagsterGraphQLClient("webserver", port_number=3000)
status = client.reload_repository_location("example_user_code")
<http://context.log.info|context.log.info>(str(status))
sensors.append(reload_code_sensor)
defs = Definitions(
assets=assets,
resources={
"io_manager": FilesystemIOManager(base_dir="..."),
"s3": S3ClientResource(),
},
sensors=sensors,
jobs=[dummy_job],
)
Alex Hunsberger
09/06/2023, 3:02 PMAlex Hunsberger
09/06/2023, 3:20 PM