Martim Passos
05/03/2021, 4:45 PMfs_io_manager
“allows users to specify a base directory where all the step outputs will be stored”, but the provided example does not explicitly tell me how to define such a directory. Would anyone shed some light into this please? Or is the base directory the same as DAGSTER_HOME
? Also, if I want to store .csv instead of .pickle, should I declare my own IOManager?owen
05/03/2021, 5:07 PMfs_io_manager
, which means it can be set either through the configured api:
fs_io_manager.configured({"base_dir": "path/to/basedir"})
, or by passing in configuration through the run config. You can see an example of the first option in the docs for custom_path_fs_io_manager (where this configuration is required, instead of optional). As for storing information as a csv, you will have to write your own IOManager, although its implementation can be quite similar to the fs_io_manager implementation.Martim Passos
05/03/2021, 8:56 PMpandas_csv_iomanager
. Now, because of the way I’m structuring my project, I happen to have many small, atomic solids inside a composite solid, of which I only intend to persist the last output. Do I need to set OutputDefinition
for all of them (using a default one on the upstream solids) or is there a way to say “just use my custom IOManager in the last solid’s output”?owen
05/03/2021, 9:00 PMMartim Passos
05/03/2021, 9:26 PM@dg.pipeline(mode_defs =[dg.ModeDefinition(resource_defs={"pandas_csv":df_csv_io_manager})])
def main():
catalog_df= catalog_main()
@dg.composite_solid(output_defs=[dg.OutputDefinition(io_manager_key="pandas_csv")])
def catalog_main():
root = read_xml()
outDict = find_uids(root)
formated_table = fill_records(root,outDict)
catalog_df = load(formated_table)
catalog_df = rename_columns(catalog_df)
catalog_df = select_columns(catalog_df)
catalog_df = remove_extension(catalog_df)
catalog_df = remove_duplicates(catalog_df)
catalog_df = reverse_creators_name(catalog_df)
catalog_df = dates_accuracy(catalog_df)
catalog_df = extract_dimensions(catalog_df)
return catalog_df
owen
05/03/2021, 9:42 PMextract_dimensions
solid instead of the composite solid, but I see why this can be cumbersome for your use case@solid
def solid_a(_):
return 1
@solid
def solid_b(_, x):
return x
@composite_solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")])
def my_composite_solid():
x = solid_a()
x = solid_b(x)
# this output will still be processed with the default io_manager
return x
@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_io_manager": fs_io_manager})])
def my_pipeline():
my_composite_solid()
Martim Passos
05/03/2021, 9:49 PMowen
05/03/2021, 9:50 PMMartim Passos
05/03/2021, 10:37 PMsandy
05/03/2021, 10:53 PMMartim Passos
05/03/2021, 11:06 PMsandy
05/04/2021, 6:07 AMLaura Moraes
05/11/2021, 2:17 PMMartim Passos
05/11/2021, 2:19 PMsandy
05/11/2021, 3:47 PMLaura Moraes
05/11/2021, 3:49 PM