Miguel Rodriguez
05/01/2023, 11:17 PMio_manager
does?
class JsonIOManager(IOManager):
def _get_path(self, context) -> str:
if context.has_partition_key:
return "/".join(context.asset_key.path + [context.asset_partition_key])
else:
return "/".join(context.asset_key.path)
def handle_output(self, context, obj):
if obj is None:
return
with open(self._get_path(context), "w") as file:
file.write(json.dumps(obj))
def load_input(self, context):
return json.load(open(self._get_path(context.upstream_output)))
should i be inheriting from ConfigurableIOManager
instead? i'm a bit confused when i should use one over the other? also is there an example for how to inherit from UPathIOManager
the api looked a bit overwhelming blob smile sweatyuhan
05/01/2023, 11:35 PMcontext.instance.storage_directory()
, so the easiest path could be to call that instance method in your _get_path
yuhan
05/01/2023, 11:36 PMMiguel Rodriguez
05/01/2023, 11:37 PMMiguel Rodriguez
05/01/2023, 11:57 PMdagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "test_raw_json"::
AttributeError: 'OutputContext' object has no attribute 'instance'
and i just checked my installation and I am on dagster==1.3.2
yuhan
05/02/2023, 12:04 AMclass MyIOManager(ConfigurableIOManager):
root_path: str
def _get_path(self, asset_key: AssetKey) -> str:
return self.root_path + "/".join(asset_key.path)
def handle_output(self, context: OutputContext, obj):
# write_csv(self._get_path(context.asset_key), obj)
print(self.get_resource_context().instance.storage_directory())
def load_input(self, context: InputContext):
return read_csv(self._get_path(context.asset_key))
yuhan
05/02/2023, 12:05 AMself.get_resource_context().instance.storage_directory()
Miguel Rodriguez
05/02/2023, 12:08 AMAttributeError: 'OutputContext' object has no attribute 'get_resource_context'
Miguel Rodriguez
05/02/2023, 12:09 AM[
"__annotations__",
"__class__",
"__del__",
"__delattr__",
"__dict__",
"__dir__",
"__doc__",
"__enter__",
"__eq__",
"__exit__",
"__format__",
"__ge__",
"__getattribute__",
"__gt__",
"__hash__",
"__init__",
"__init_subclass__",
"__le__",
"__lt__",
"__module__",
"__ne__",
"__new__",
"__reduce__",
"__reduce_ex__",
"__repr__",
"__setattr__",
"__sizeof__",
"__str__",
"__subclasshook__",
"__weakref__",
"_asset_info",
"_config",
"_dagster_type",
"_events",
"_log",
"_mapping_key",
"_metadata",
"_name",
"_op_def",
"_partition_key",
"_pipeline_name",
"_resource_config",
"_resources",
"_resources_cm",
"_run_id",
"_step_context",
"_step_key",
"_user_events",
"_user_generated_metadata",
"_version",
"_warn_on_step_context_use",
"add_output_metadata",
"asset_info",
"asset_key",
"asset_partition_key",
"asset_partition_key_range",
"asset_partition_keys",
"asset_partitions_def",
"asset_partitions_time_window",
"config",
"consume_events",
"consume_logged_metadata",
"dagster_type",
"get_asset_identifier",
"get_asset_output_identifier",
"get_identifier",
"get_logged_events",
"get_logged_metadata",
"get_output_identifier",
"get_run_scoped_output_identifier",
"has_asset_key",
"has_asset_partitions",
"has_partition_key",
"log",
"log_event",
"mapping_key",
"metadata",
"name",
"op_def",
"partition_key",
"pipeline_name",
"resource_config",
"resources",
"run_id",
"step_context",
"step_key",
"version",
]
yuhan
05/02/2023, 12:13 AMyuhan
05/02/2023, 12:14 AMself
rather than context
Miguel Rodriguez
05/02/2023, 12:18 AM