https://dagster.io/ logo
s

sephi

03/12/2020, 7:50 AM
Hi, We want to understand what is the best practice for running solids with configurations files. Our simple use case is that we have a pipeline that we need to run on different files (each file with a different configuration). From this https://dagster.phacility.com/D2060 link and this PR #2049 we are not sure what is the preferred direction to take (the former one is more appealing to our use case). Any guidance would be appreciated.
a

abhi

03/12/2020, 8:18 AM
So these are sort of different use cases. Could you elaborate more precisely on what you mean by "running on different files with different configs"?
s

sephi

03/12/2020, 9:16 AM
A guess it's more like the former one. Currently have a several python files that run with YAML configuration files for ETL processes. The configurations include how to run functions from installed packages (e.g. changing the args in dask.to_parquet) or custom funcitons that we have written that mainly manipulate dataframes (e.g. adds or removes fields in dataframes).
s

Simon Späti

03/12/2020, 9:40 AM
I would probably use this: https://docs.dagster.io/latest/tutorial/config this way you can set a default
dask.to_parquet
but always have the convenience to overwrite the default
config
in your yaml with :
Copy code
solids:
  your_ETL_pipeline_fuction
    config:
      output: "dask.to_delta"
👍 1
a

abhi

03/12/2020, 4:07 PM
That’s right. The config system is what you are looking for and if you are looking for ways to hydrate/dehydrate these files via solid config you should check out the input hydration/output materialization configs in the tutorial as well.
s

sephi

03/17/2020, 9:38 AM
Hi, We decided to work with
spark
since it seems to be a bit more mature in this project. Since we want to have a separate pipeline for each data source - and want to make the code generic as possible (and not write a
solid
for every API call in
pyspark
we wrote the following function that is called from the YAML pipeline, however it fails if the function is trying to use a new column that is generated from within the pipeline. 1. Is there a way to write such function that will work without using
collect
(the
df.cache()
did not help) 2. if not - is our only option is to write every step of the ETL as a separate
solid
for every API call (since we have many data sources with many transformations - it will not be optimal in maintaining and generating new pipelines.
@solid(
config={"base_func": Permissive(),
"transform": Permissive()})
def run_transform(context, df: DataFrame):
d  = {"base_func": pyspark.sql.dataframe.DataFrame, "transform": transform.py}
for name, module in d.items():
for func, attr in context.solid_config[name].items():
df = getattr(module, func)(df, **attr)
df  = df.cache()   # did not help
The transform.py is our implementation of
pyspark.sql.functions
The YAML pipeline file is
resources:
spark:
config:
spark_conf:
.....
solids:
read_file:
config:
path: "<hdfs://path/to/file.csv>"
format: "csv"
run_transform:
config:
base_func:
fillna: {value: {"date_col", "2199-12-31 00:00:00}}
dropna: {how: "any", subset: ['col1', 'col2']}
transform:
rename_cols: {renamed_cols: {col1: "colA"}}
assignF: {col: "new_col", func: "concat", attr: ["date_col", "colA"]}
BTW when running
def run_transform
from outside of dagster (e.g. in jupyter notebook) we did not have any problems
2 Views