Collecting Metadata from Ops During a Job Run I'd...
# random
s
Collecting Metadata from Ops During a Job Run I'd like allow ops to record metadata (their granular progress with comments and notes) when they run as part of a job. The idea is to collect the metadata from all the ops and run a reporting step once the job has completed or at the end of the job. This reporting step would probably take the form of an email or a slack message and would include details about each step (op) and most importantly, give some details about why a step might not have succeeded. This reporting is intended to go to business process owners and will need to be human readable. You could think of the "metadata" as a simple datastructure, e.g. list of dicts which each op would append to. I had thought about the ops receiving metadata as input (from previous ops) and add to it and the return it as an output (to the next op), but that confuses that flow of the "real" data between the ops I then thought to have the metadata as a resource available to the ops, which would allow each op to write to the metadata without cluttering inputs and outputs. And I could then use
failure_hook
and
success_hook
to access the metadata and create a report. But the issue with that is that the hooks apply at the op level and I only want to create the report when the job is complete. So, then I though I could use a run status sensor to detect when a run is complete and generate the report, but these sensors don't have access to the job's resources (they will already have been destroyed by that stage). Does anyone have any ideas that would allow me to collect detailed metadata from ops and build a report at the end of a job run?
z
One relatively simple idea would be to have a resource that just exposes a function which writes metadata for an op under a run_id prefix in object storage. Then you could have a run status sensor that reads the jsons that were dumped under the run_id prefix for a failed / successful run and construct your report from them.
❤️ 1
something like
Copy code
@resource
def recorder(context: InitResourceContext):
    class Recorder:
        def __init__(self, step_key: str):
            self.bucket = "my-bucket"
            self.run_id = context.run_id
            self.client = boto3.client("s3")
            self.prefix = f"{self.run_id}/{step_key}"

        def record(self, data: dict):
            self.client.put_object(Bucket=self.bucket,
                                   Key=f"{self.prefix}.json",
                                   Body=json.dumps(data).encode())
    return Recorder

@op(required_resource_keys={"recorder"})
def my_op(context: OpExecutionContext):
    recorder = context.resources.recorder(step_key=context.get_step_execution_context().step.key)
    recorder.record(context.op.name, {"some": "data"})
there's some edge cases in there around handling retries I think and you might want to inject the boto3 client into the resource, but that's the gist. then you should be able to get the step keys associated with a given run from your run status sensor and reconstruct the prefix so you can read in the metadata to construct the report
s
@Zach That's a nice idea. It has the advantage of working with a multi-process job too, rather than relying on the same instance of a resource being available to all ops in a job. Thanks for the suggestion.
🎉 1
n
@Stan Potums @Zach I also have that use case but I imagined a different approach just leveraging Asset Observation. The idea is just to log asset observations in the
op
and
asset
, then considering the log events are stored in database (e.g Postgres), query that one to build a report. Do you see drawbacks doing that ?
z
Kinda depends on where you want to access the metadata from, and what you want to store metadata about. If you want to store metadata about jobs / ops that aren't tied to an asset then it feels quite weird to use AssetObservations. You can also kinda do it with Outputs, but that also feels like kind of a weird mechanism to use for just logging metadata, at least to me. I also personally wouldn't want to couple an external system to Dagster's backing postgres instance directly. At a minimum I'd use the DagsterInstance event log filter to query the asset observations. Asset Observations are also a bit of a constrained interface to store metadata through, depending on what you want to track. But I could also imagine having an AssetMaterializationSensor that copies asset observations out to external storage if external systems needed access. If you're just trying to access metadata recorded from within the Dagster ecosystem and are only trying to log metadata from assets then asset observations seem like they'd work just fine.
s
I've settled on a resource which write s JSONL file into the
run_dir
and this allows me to fetch that JSONL file outside of the scope of a run - provided I can know the
run_id
(and I'm using the same dagster instance, on the same machine) I can calculate where to find the JSONL file and read it in.
Copy code
class StepProgress(pydantic.BaseModel):
    ...

class StepProgressRecorder:
    def __init__(self, path: Path):
        self.path = path
        # Make the directory for the file.
        self.path.parent.mkdir(parents=True, exist_ok=True)
        # Create the empty file, but error if it already exists
        self.path.touch(exist_ok=False)

    def record_step_progress(self, step_progress: StepProgress):
        with open(self.path, "a") as f:
            f.write(step_progress.json() + "\n")

class StepProgressReader:
    def __init__(self, path: Path):
        assert self.path.exists(), f"{self.path} doesn't exist"
        self.path = path

    def get_step_progress(self):
        with open(self.path, "r") as f:
            return [StepProgress.parse_raw(line) for line in f.readlines()]


# A dagster resource which appends dicts to a file-base store.
@dagster.resource
def step_progress_recorder(context: dagster.InitResourceContext):
    assert context.instance, "instance is invalid"
    assert context.dagster_run, "dagster_run is invalid"
    path = Path(context.instance.storage_directory()) / context.dagster_run.run_id / "step_progress.jsonl"
    return StepProgressRecorder(path)