sephi
03/12/2020, 7:50 AMabhi
03/12/2020, 8:18 AMsephi
03/12/2020, 9:16 AMSimon Späti
03/12/2020, 9:40 AMdask.to_parquet
but always have the convenience to overwrite the default config
in your yaml with :
solids:
your_ETL_pipeline_fuction
config:
output: "dask.to_delta"
abhi
03/12/2020, 4:07 PMsephi
03/17/2020, 9:38 AMspark
since it seems to be a bit more mature in this project.
Since we want to have a separate pipeline for each data source - and want to make the code generic as possible (and not write a solid
for every API call in pyspark
we wrote the following function that is called from the YAML pipeline, however it fails if the function is trying to use a new column that is generated from within the pipeline.
1. Is there a way to write such function that will work without using collect
(the df.cache()
did not help)
2. if not - is our only option is to write every step of the ETL as a separate solid
for every API call (since we have many data sources with many transformations - it will not be optimal in maintaining and generating new pipelines.
@solid(
config={"base_func": Permissive(),
"transform": Permissive()})
def run_transform(context, df: DataFrame):
d = {"base_func": pyspark.sql.dataframe.DataFrame, "transform": transform.py}
for name, module in d.items():
for func, attr in context.solid_config[name].items():
df = getattr(module, func)(df, **attr)
df = df.cache() # did not help
The transform.py is our implementation of pyspark.sql.functions
The YAML pipeline file is
resources:
spark:
config:
spark_conf:
.....
solids:
read_file:
config:
path: "<hdfs://path/to/file.csv>"
format: "csv"
run_transform:
config:
base_func:
fillna: {value: {"date_col", "2199-12-31 00:00:00}}
dropna: {how: "any", subset: ['col1', 'col2']}
transform:
rename_cols: {renamed_cols: {col1: "colA"}}
assignF: {col: "new_col", func: "concat", attr: ["date_col", "colA"]}
def run_transform
from outside of dagster (e.g. in jupyter notebook) we did not have any problems