Tomasz Nowak
08/25/2022, 8:35 AMchris
08/25/2022, 8:48 PMTomasz Nowak
08/25/2022, 9:52 PM@op(
config_schema={"batch_id": str},
required_resource_keys={"source_data_path", "stage_data_path"},
)
def stage_op_persist_batch(context):
batch_id = context.op_config["batch_id"]
_put_key(batch_id)
file_path = f"{context.resources.source_data_path}/{batch_id}.pickle"
df = pd.read_pickle(file_path)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=f"{STAGE_PATH}/BatchID={batch_id}",
)
context.log_event(
AssetMaterialization(
asset_key="stage",
description=f"Persisted partition {batch_id} to storage",
partition=batch_id,
)
)
It is passed the following config:
@dynamic_partitioned_config(partition_fn=_get_keys)
def batch_config(batch_id: str):
return {"ops": {"stage_op_persist_batch": {"config": {"batch_id": batch_id}}}}
Using a job:
stage_job_persist_batch = stage_graph_persist_batch.to_job(
name="stage_job_persist_batch",
config=batch_config,
resource_defs={
"source_data_path": source_data_path,
"stage_data_path": stage_data_path,
},
)
So I think I’m using it correctly, but the partitions aren’t getting logged? I was also wondering if this is a reasonable pattern for dealing with this type of use-case in general, as I cannot find many examples of dynamic partitioning.chris
08/25/2022, 10:01 PM_get_keys
function could potentially have issues? Are you seeing the actual partitions in the UI that you expect?Tomasz Nowak
08/26/2022, 5:28 AM_get_keys
works fine, but the runs don’t get registered with the partition. In the UI I can see all the partitions showing up with correct names, but they are listed as missing (see screenshot).
The function looks like this:
def _get_keys(_ts):
try:
with open(KEYS_FILE, "r") as f:
return list(set([line.rstrip() for line in f.readlines()]))
except FileNotFoundError:
return []
def _put_key(key):
with open(KEYS_FILE, "a+") as f:
f.write(f"{key}\n")
Tomasz Nowak
08/26/2022, 12:15 PMTomasz Nowak
08/26/2022, 2:26 PMops:
continent_op:
config:
continent_name: Africa
when I add a tag dagster/partition: Europe
if will show partition Europe as green. In the same way, if there is no tag, the run doesn’t register a partition.
Should I then just add the tag to my job or is this some sort of bug/I’m missing something? @chrischris
08/26/2022, 5:22 PMchris
08/26/2022, 5:54 PMtags
param on RunRequest
. Does that make sense?Tomasz Nowak
08/26/2022, 6:32 PMchris
08/26/2022, 6:33 PMsandy
08/26/2022, 10:24 PMTomasz Nowak
08/26/2022, 10:30 PMsandy
08/26/2022, 10:31 PM