matas
06/26/2020, 12:29 PMCris
06/26/2020, 9:25 PMKing Chung Huang
06/27/2020, 7:52 PMpath
file that can either take a single string or a list of strings (corresponding to the path
argument in dask.dataframe.read_parquet
). Separately, the two value types can be written as:
Field(String, …)
Field([String], …)
Is there something like a union type that can combine the two types?Binh Pham
06/27/2020, 10:48 PM@solid
def test_solid(context):
objs = map(lambda x: x, [1, 2, 3])
return objs
AttributeError: Can't pickle local object 'test_solid.<locals>.<lambda>'
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_plan.py", line 153, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 272, in core_dagster_event_sequence_for_step
for evt in _create_step_events_for_output(step_context, user_event):
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 302, in _create_step_events_for_output
for evt in _set_intermediates(step_context, step_output, step_output_handle, output):
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 314, in _set_intermediates
value=output.value,
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediates_manager.py", line 130, in set_intermediate
paths=self._get_paths(step_output_handle),
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediate_store.py", line 89, in set_value
return self.set_object(obj, context, dagster_type, paths)
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediate_store.py", line 42, in set_object
key, obj, serialization_strategy=dagster_type.serialization_strategy
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster_aws/s3/object_store.py", line 42, in set_object
serialization_strategy.serialize(obj, bytes_io)
File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/types/marshal.py", line 70, in serialize
pickle.dump(value, write_file_obj, PICKLE_PROTOCOL)
Shaun Ryan
06/28/2020, 10:40 AMRafal
06/28/2020, 2:59 PMKing Chung Huang
06/28/2020, 4:56 PM@input_hydration_config
with required_resource_keys
that are dependent on the config? For example, in the following config, I want to say that a glue
resource is required if the hydration is specified to be glue
. Otherwise, it's not required.
@input_hydration_config(
Selector(
{
"glue": Permissive({
"database": Field(String, is_required=True, description="Name of the Glue Data Catalog database."),
"table": Field(String, is_required=True, description="Name of the table in the database."),
}),
"csv": Permissive({
"path": Field(ReadPathType, is_required=True, description="Path to read from."),
}),
"parquet": Permissive({
"path": Field(ReadPathType, is_required=True, description="Path to read from."),
"columns": Field([String], is_required=False, description="Fields names to read in as columns."),
}),
}
),
required_resource_keys = {"glue"} # But, only if "glue" is specified in the input hydration config
)
King Chung Huang
06/28/2020, 6:51 PMpath
is required, or glue_database
and glue_table
are required, but not both. columns
is optional in either case. I've been looking at Selector
for (path
or (glue_database
and glue_table
)), but I can't think of a way to set this up. Is such a thing possible?
@input_hydration_config(
Selector(
{
"parquet": Permissive({
"path": Field(ReadPathType, is_required=True, description="Path to read from."),
"glue_database": Field(String, is_required=True, description="Glue database to read from."),
"glue_table": Field(String, is_required=True, description="Glue table to read from."),
"columns": Field([String], is_required=False, description="Fields names to read in as columns."),
}),
}
)
)
Shaun Ryan
06/28/2020, 8:44 PMdagster-celery worker start --config-yaml celery_config.yaml
narom
06/29/2020, 9:04 AMv0.7.13
and I'm currently trying to update it into v0.8.5
. I'm encountering an error where the scheduler for v0.8.5
can't find my pipelines
directory.
For context, here's how v0.7.13
was structured:
dagster
- solids
- test.py
- pipelines
- test.py
- scheduler.py
- repository.py
- repository.yaml
- dagster.yaml
And here's how I structured v0.8.5
:
dagster
- solids
- test.py
- pipelines
- test.py
- repository.py
- dagster.yaml
- workspace.yaml
Everything's working fine except for the scheduled run which is raising ModuleNotFoundError: No module named 'pipelines'
. It can access the pipelines
folder when ran manually (using dagster run and by running the generated script) but not when scheduled.
Added a simplified version of my repository.py
here: https://gist.github.com/santosnarom/7f42ba94d29fb8395f06ae2b12b8cb85nate
06/29/2020, 5:15 PMDanny
06/29/2020, 7:41 PMdagster:
image: company/project
build: .
command: sh -c "cp pipelines/dagster.yaml tmp/dagster/ && dagit -h 0.0.0.0 -p 9090 -w pipelines/workspace.yaml"
environment:
DAGSTER_HOME: tmp/dagster
ports:
- 9090:9090
volumes:
- ./tmp/dagster:/src/tmp/dagster
Worked fine on 0.7.13, but today I upgraded to 0.8.5 and getting this error:
dagster.core.errors.DagsterInvariantViolationError: DAGSTER_HOME must be absolute path: tmp/dagster
Changing the path to be absolute in the docker compose file is a no go, it'll break on another dev's machine. Any non-hacky suggestions?Danny
06/29/2020, 9:00 PMdagster.core.errors.DagsterLaunchFailedError: Host <http://127.0.0.1:9090> failed sanity check. It is not a dagit server.
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/utils.py", line 14, in _fn
return fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 19, in launch_pipeline_execution
return _launch_pipeline_execution(graphene_info, execution_params)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 49, in _launch_pipeline_execution
run = do_launch(graphene_info, execution_params, is_reexecuted)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 40, in do_launch
pipeline_run.run_id, external_pipeline=external_pipeline
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 964, in launch_run
return self._run_launcher.launch_run(self, run, external_pipeline=external_pipeline)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/launcher/__init__.py", line 80, in launch_run
self.validate()
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/launcher/__init__.py", line 74, in validate
host=self._address
My dagster.yaml config:
run_launcher:
module: dagster_graphql.launcher
class: RemoteDagitRunLauncher
config:
address: "<http://127.0.0.1:9090>"
dagit:
execution_manager:
disabled: False
max_concurrent_runs: 4
This worked fine before on 0.7.13.wbonelli
06/30/2020, 2:09 AMDanny
06/30/2020, 3:27 PMlaunchPipelineExecution
mutation to start a pipeline. Looking to have it start in process on the dagit instance. I can't figure out the new 0.8 selector I need to specify. Here's what I got so far:
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: "<<in_process>>",
repositoryName: "my_repository",
pipelineName: "my_pipeline",
},
mode: "default",
}
)
Getting this error:
RESPONSE >>> {'data': {'launchPipelineExecution': {'__typename': 'PipelineNotFoundError', 'message': 'Could not find Pipeline <<in_process>>.my_repository.my_pipeline', 'pipelineName': 'my_pipeline'}}}
I'm trying to copy how the dagster_graphql test suite does it but not succeeding. On a side note, might be useful to incorporate "execute pipelines over graphql" machinery into the main dagster_graphql API, because it's very useful for use cases like mine where I have pipelines launching other pipelines, and it's a shame all that useful client code is buried in tests.King Chung Huang
06/30/2020, 7:03 PMalias
as in df_test().alias("another_name")
, but that doesn't seem to be correct. df_test
is a solid here.
@pipeline(
mode_defs = [
ModeDefinition("default", resource_defs={'glue': glue_resource})
]
)
def scratch_pipeline():
result1 = df_test()
result2 = df_test(result1)
Kevin
06/30/2020, 7:19 PMPatrick Merlot
06/30/2020, 10:20 PMsolids:
get_list_tables:
inputs:
table_list:
value:
- "table1"
- "table2"
but Dagster doesn't like it.Cam Marquis
07/01/2020, 2:06 PMDanny
07/01/2020, 4:12 PMrepository.yaml
https://docs.dagster.io/docs/deploying/celery#4-dagster-codeBinh Pham
07/01/2020, 9:26 PMsolids:
solid_a:
config:
dtype:
col_a: type_option_a
solid_b:
config:
dtype:
col_b: type_option_a
col_c: type_option_b
Cris
07/02/2020, 3:53 PMDanny
07/02/2020, 7:18 PMBinh Pham
07/02/2020, 8:32 PMMuthu
07/02/2020, 9:11 PMFran Sanchez
07/02/2020, 9:33 PMMessage: Cannot query field "isOptional" on type "ConfigTypeField".
Path:
Locations: [{"line":106,"column":7}]
It happened after upgrading from 0.8.3 to 0.8.5 in a testing environment.Fran Sanchez
07/02/2020, 9:57 PM@solid(output_defs=[OutputDefinition(name="num", dagster_type=int, is_required=False)])
def optional_input_output(_, num: Optional[int] = None):
if num:
yield Output(num, "num")
@solid
def takes_optional(_, num: Optional[int] = 333) -> int:
return num*2
@pipeline
def optional_pipeline():
takes_optional(optional_input_output())
If I don't provide any input to to optional_input_output
then takes_optional
is always skipped, regardless of the input marked as Optional and having a default value... how can I achieve this then?user
07/02/2020, 10:38 PMjohann
07/02/2020, 10:45 PMJohn Mav
07/02/2020, 11:28 PMrequired_resource_keys
to a solid would it look something like
@solid(
required_resource_keys={'s3','database'}
)
def my_solid(context):
# stuff
And then the pipeline that invokes the solid would look something like
@pipeline(
mode_defs = [
ModeDefinition("test", resource_defs={'s3': s3_resource, 'database': postgres_resource})
]
)
def my_pipeline():
r = my_solid()