Hello! A complete dagster newbie here. I was looki...
# ask-community
t
Hello! A complete dagster newbie here. I was looking for some guidance using dynamic partitioning. We have an IoT-type streaming dataset with measurements belonging to different batches. We do not know the batch IDs beforehand. I wanted to set up a pipeline that would consume the incoming data and then save it to parquet using batch IDs as partitions. Initially, I went with DynamicOutput solution where I split the entries by batchID and then materialised the assets inside the op that was mapped onto them. I stored the batchIDs separately when materialising the assets and supplied a function that could return all of them to the dynamic_partitioned_config as they were being added. This felt like a hack though, as obviously I needed to leave the job config empty as all the partitions were materialised in the same job. The second solution was to save the incoming source dataset as a materialised asset and add the batch IDs included in that particular file as metadata. Then in an asset sensor monitoring that asset I retrieved all the batches and submitted a separate RunRequest for each, supplying the batchID via config, so everything should work nicely as each partition would have its own job. However, the result is very similar as when I did it the the hackish way: the partitions are registered for the asset, but Dagit shows them all as missing and submitting a backfill request doesn’t do anything. I assume this is because the function passed to the dynamic_partitioned_config returns all the correct keys, but the asset materialisations do not get registered correctly. Any ideas on how this setup could work, or maybe some better architectural design that would be more in line with Dagster’s way of working? Thanks in advance!
c
just to make sure, are you logging the partition key on the assetmaterialization? If not, that's probably why they aren't showing up in dagit
t
The op that handles each batch looks like this:
Copy code
@op(
    config_schema={"batch_id": str},
    required_resource_keys={"source_data_path", "stage_data_path"},
)
def stage_op_persist_batch(context):
    batch_id = context.op_config["batch_id"]
    _put_key(batch_id)
    file_path = f"{context.resources.source_data_path}/{batch_id}.pickle"
    df = pd.read_pickle(file_path)
    table = pa.Table.from_pandas(df)

    pq.write_to_dataset(
        table,
        root_path=f"{STAGE_PATH}/BatchID={batch_id}",
    )
    context.log_event(
        AssetMaterialization(
            asset_key="stage",
            description=f"Persisted partition {batch_id} to storage",
            partition=batch_id,
        )
    )
It is passed the following config:
Copy code
@dynamic_partitioned_config(partition_fn=_get_keys)
def batch_config(batch_id: str):
    return {"ops": {"stage_op_persist_batch": {"config": {"batch_id": batch_id}}}}
Using a job:
Copy code
stage_job_persist_batch = stage_graph_persist_batch.to_job(
    name="stage_job_persist_batch",
    config=batch_config,
    resource_defs={
        "source_data_path": source_data_path,
        "stage_data_path": stage_data_path,
    },
)
So I think I’m using it correctly, but the partitions aren’t getting logged? I was also wondering if this is a reasonable pattern for dealing with this type of use-case in general, as I cannot find many examples of dynamic partitioning.
c
This sounds pretty reasonable to me - I'm wondering if your
_get_keys
function could potentially have issues? Are you seeing the actual partitions in the UI that you expect?
t
That is kind of the issue, it looks like the
_get_keys
works fine, but the runs don’t get registered with the partition. In the UI I can see all the partitions showing up with correct names, but they are listed as missing (see screenshot). The function looks like this:
Copy code
def _get_keys(_ts):
    try:
        with open(KEYS_FILE, "r") as f:
            return list(set([line.rstrip() for line in f.readlines()]))
    except FileNotFoundError:
        return []


def _put_key(key):
    with open(KEYS_FILE, "a+") as f:
        f.write(f"{key}\n")
OK, so after a bit more investigation: 1. No matter how many times I run the partitioned job with what batch_id, the partition never gets updated, despite the actual file getting created (ergo, the op ran). However, if I use the backfill functionality in the UI I do get it to update. Is this how it’s supposed to work? 2. Even when the partition tab in the job section shows the partition correctly, there is still no mention of partitioning when I look at the asset directly. Did I misunderstand and you cannot actually see partitions of a materialised asset; i.e. only for software-defined assets?
Even more investigation. I have run the code from here without any changes and I can see the same behaviour (i.e. the runs don’t get registered as partition runs) if I just fill the config in the UI manually. However, when I use the dropdown it creates exactly the same config, but adds 2 tags: dagster/partition and dagster/partition_set. Now, the part that I don’t get: it seems that the registration of a partition run in the UI gets refreshed based on the content of the dagster/partition tag, not what’s in the actual config. So, even if my config looks like this:
Copy code
ops:
  continent_op:
    config:
      continent_name: Africa
when I add a tag
dagster/partition: Europe
if will show partition Europe as green. In the same way, if there is no tag, the run doesn’t register a partition. Should I then just add the tag to my job or is this some sort of bug/I’m missing something? @chris
c
You're correct that the tag is what gets used to populate the UI elements - however, I'm not sure why that tag isn't automatically getting added / if it's expected behavior, for the case where you're explicitly specifying a materialization of a particular asset. Checking with team members more familiar with the UI side of this fxnality
👍 1
okay apologies - so this is pretty confusing (I didn't even realize this), but there's essentially a difference between job partitions and asset partitions. The job partition matrix, which is what you're looking at, is populated when a job's run has the relevant partition tag. The asset materialization matrix, which can be found on the asset's details page, is populated when an asset materialization is logged with a particular partition key. So in order to get a run to be represented on the partition matrix, you're going to want to add that tag. From a sensor/schedule you can do this via the
tags
param on
RunRequest
. Does that make sense?
❤️ 1
t
Ok that makes so much sense now 😅 Thanks so much for looking into this for me. The only thing that I still have an issue with, is that I do log the partition using AssetMaterialization (like you saw in the code snippet I shared), but it seems not to be reflected anywhere in the asset tab. You can see in the 2 screenshots that the job partition worked with the proper tag, but the asset partition is still empty (and again, the op definitely run successfully, because I can see that the parquet file got updated). I didn’t manage to get any materialised assets to show the partitions. Could it be that this is only possible for software defined assets?
c
cc @sandy regarding the asset bit here
s
hey @Tomasz Nowak - currently, the partitions bar only shows on the asset details page if the asset is a software-defined asset
👍 1
t
Great! I kind of got the impression or was supposed to work for the materialized assets as well from the documentation😅 But of that's the case now evening makes sense. Thanks a lot!🤗
s
we should probably make that more clear