Stefan Adelbert
02/22/2023, 12:09 AMfailure_hook
and success_hook
to access the metadata and create a report. But the issue with that is that the hooks apply at the op level and I only want to create the report when the job is complete.
So, then I though I could use a run status sensor to detect when a run is complete and generate the report, but these sensors don't have access to the job's resources (they will already have been destroyed by that stage).
Does anyone have any ideas that would allow me to collect detailed metadata from ops and build a report at the end of a job run?Zach
02/22/2023, 3:05 AMZach
02/22/2023, 3:20 AM@resource
def recorder(context: InitResourceContext):
class Recorder:
def __init__(self, step_key: str):
self.bucket = "my-bucket"
self.run_id = context.run_id
self.client = boto3.client("s3")
self.prefix = f"{self.run_id}/{step_key}"
def record(self, data: dict):
self.client.put_object(Bucket=self.bucket,
Key=f"{self.prefix}.json",
Body=json.dumps(data).encode())
return Recorder
@op(required_resource_keys={"recorder"})
def my_op(context: OpExecutionContext):
recorder = context.resources.recorder(step_key=context.get_step_execution_context().step.key)
recorder.record(context.op.name, {"some": "data"})
Zach
02/22/2023, 3:21 AMStefan Adelbert
02/22/2023, 3:29 AMNicolas Huray
03/13/2023, 2:29 PMop
and asset
, then considering the log events are stored in database (e.g Postgres), query that one to build a report. Do you see drawbacks doing that ?Zach
03/13/2023, 4:49 PMStefan Adelbert
03/15/2023, 3:23 AMrun_dir
and this allows me to fetch that JSONL file outside of the scope of a run - provided I can know the run_id
(and I'm using the same dagster instance, on the same machine) I can calculate where to find the JSONL file and read it in.
class StepProgress(pydantic.BaseModel):
...
class StepProgressRecorder:
def __init__(self, path: Path):
self.path = path
# Make the directory for the file.
self.path.parent.mkdir(parents=True, exist_ok=True)
# Create the empty file, but error if it already exists
self.path.touch(exist_ok=False)
def record_step_progress(self, step_progress: StepProgress):
with open(self.path, "a") as f:
f.write(step_progress.json() + "\n")
class StepProgressReader:
def __init__(self, path: Path):
assert self.path.exists(), f"{self.path} doesn't exist"
self.path = path
def get_step_progress(self):
with open(self.path, "r") as f:
return [StepProgress.parse_raw(line) for line in f.readlines()]
# A dagster resource which appends dicts to a file-base store.
@dagster.resource
def step_progress_recorder(context: dagster.InitResourceContext):
assert context.instance, "instance is invalid"
assert context.dagster_run, "dagster_run is invalid"
path = Path(context.instance.storage_directory()) / context.dagster_run.run_id / "step_progress.jsonl"
return StepProgressRecorder(path)