Noah K
11/27/2020, 5:28 AMpaul.q
11/27/2020, 5:31 AMNoah K
11/27/2020, 5:32 AMpaul.q
11/27/2020, 5:36 AMNoah K
11/27/2020, 5:41 AMKeval
11/27/2020, 11:35 AMAlexandre Miyazaki
11/28/2020, 1:58 PMPaul Wyatt
11/30/2020, 6:57 PMBalázs Dukai
11/30/2020, 7:44 PMbag_cleaned_schema
which I want to pass in the configuration, and preferably only declare it at once place and refer to that in the solid inputs.
Now I have this configuration:
solids:
bag_ahn_date:
inputs:
bag_cleaned: "bag_cleaned.pandactueelbestaand"
acquisition_dates: "ahn3.acquisition_dates"
bag_cleaned_schema: "bag_cleaned"
bag_ahn_date_index:
inputs:
bag_cleaned_schema: "bag_cleaned"
bag_ahn_overlap:
inputs:
bag_cleaned: "bag_cleaned.pandactueelbestaand"
buffer: "schema_name.buffer_table"
bag_cleaned_schema: "bag_cleaned"
bag_ahn_overlap_index:
inputs:
bag_cleaned_schema: "bag_cleaned"
And I would like to have a configuration like this one, where the global
is not a solid, but a way to declare a gobal input parameter
solids:
global:
bag_cleaned_schema: "bag_cleaned"
bag_ahn_date:
inputs:
bag_cleaned: "bag_cleaned.pandactueelbestaand"
acquisition_dates: "ahn3.acquisition_dates"
bag_cleaned_schema: $bag_cleaned_schema
bag_ahn_date_index:
inputs:
bag_cleaned_schema: $bag_cleaned_schema
bag_ahn_overlap:
inputs:
bag_cleaned: "bag_cleaned.pandactueelbestaand"
buffer: "schema_name.buffer_table"
bag_cleaned_schema: $bag_cleaned_schema
bag_ahn_overlap_index:
inputs:
bag_cleaned_schema: $bag_cleaned_schema
Kaushik Visvanathan
11/30/2020, 9:10 PM0.9.3
allows dagster to infer input and output descriptions from docstrings. The issue is it appears to do this at runtime and throw an error if the docstring isn't formatted properly (e.g if you use hyphens instead of colons after argument names). Is there a way to turn this off? The docstring parser module brought in by dagster throws this error at runtime:
pypi__36__docstring_parser_0_7_1_linux_x86_64/docstring_parser/google.py", line 106, in _build_meta
before, desc = text.split(":", 1)
ValueError: not enough values to unpack (expected 2, got 1)
if you have for example:
Args:
arg_name - description of arg
instead of:
Args:
arg_name: description of the arg
Istvan Darvas
12/01/2020, 11:47 AMNoah K
12/01/2020, 1:58 PMNoah K
12/01/2020, 4:52 PMNoah K
12/01/2020, 4:53 PMNoah K
12/01/2020, 4:54 PMWilliam Sheu
12/01/2020, 5:40 PMXu Zhang
12/01/2020, 6:21 PMNoah K
12/02/2020, 3:43 PMdagster.check.CheckError: Invariant failed. Description: Pipeline run condition_asset (20b0921f-2547-4e10-a237-05b83b27f696) in state PipelineRunStatus.STARTED, expected PipelineRunStatus.NOT_STARTED
Noah K
12/02/2020, 3:44 PMTobias Macey
12/02/2020, 9:25 PMTobias Macey
12/02/2020, 9:25 PMDaniel Kim
12/03/2020, 1:49 PMfrom dagster import (
execute_pipeline,
pipeline,
solid
)
@solid
def get_name(_, plant_code: str):
print(f"Plant code: {plant_code}")
@solid
def hello(context, plant_code: str):
<http://context.log.info|context.log.info>('Hello, {name}!'.format(name=plant_code))
@pipeline
def hello_pipeline():
get_name()
hello()
if __name__ == "__main__":
run_config = {
"solids": {
"get_name": {"inputs": {"plant_code": {"value": "Plant1"}}},
"hello": {"inputs": {"plant_code": {"value": "Plant1"}}}
}
}
execute_pipeline(hello_pipeline, run_config=run_config)
# want to do something like this:
value = ["Plant1", "Plant2", "Plant3"]
for plant_code in value:
execute_my_pipeline(plant_code) etc...
I can create a pipeline for one plant code "Plant1", but let's say I want to execute this set of tasks in this pipeline for each additional plant code. With the run_config, not sure if it is possible to loop through a list of values passed into the "value" parameter. I hope my question makes sense. Any help is much appreciated!Michiel Ghyselinck
12/03/2020, 2:23 PMdhume
12/03/2020, 7:20 PMdagster_ge
library?Daniel Kim
12/03/2020, 8:19 PMuser
12/03/2020, 10:50 PMRichard Fisher
12/04/2020, 9:22 AMbob
12/04/2020, 5:21 PMmher/flower:0.9.5
for the Flower pod.Alexandre Miyazaki
12/04/2020, 7:11 PMgenerate_job_params
is a solid (which is a simple date manipulation) and spark_solid
is another solid. What happens when I run is that I get an error because spark_solid
needs APPLICATION_PARAMS to be defined, and it is but with a <dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x10920c790>
where I expected to be a formatted datetime.
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"spark": spark_resource})]
)
def pipeline():
yd = generate_job_params()
os.environ["APPLICATION_PARAMS"] = f"--app-name CountCSV --input-csv-path /tmp/aksmiyazaki/dummy.csv --group-column name --output-data-path /tmp/aksmiyazaki/{yd}"
spark_solid = create_spark_solid("spark_process_simple_csv", "GroupedCount")
spark_solid()
Dimitris Stafylarakis
12/04/2020, 9:47 PM