Simon Späti
07/20/2020, 10:24 AM0.7.x
.
I’m having two small things for now:
1. When I _from_ pyspark.sql.functions _import_ explode
I get module>\n from pyspark.sql.functions import explode\n File "<frozen importlib._bootstrap>", line 971, in _find_and_load\n File "<frozen importlib._bootstrap>", line 945, in _find_and_load_unlocked\nKeyError: \'pyspark.sql\'\n'
. Am I doing something wrong? I need explode in my pyspark code. Full StackTrace in thread.
2. I used some special Druid and Delta Types. I used the dict_with_fields
that is used in the S3Coordinate
here. I guess that’s not the way to go as I also had to manually import the module dict_with_fields. What would be the best way to define a Dict Type as the following:
DeltaCoordinate = dict_with_fields(
'DeltaCoordinate',
fields={
'database': Field(String, description='database or schema or delta table'),
'table_name': Field(String, description='table name of the delta table'),
's3_coordinate_bucket': Field(String, description='s3 bucket'),
's3_coordinate_key': Field(String, description='s3 delta table path'),
},
)
Simon Späti
07/20/2020, 10:25 AMSimon Späti
07/20/2020, 10:25 AM~/Doc/git/dagster-r/src/latest/images/docker/pipelines/mnt master ?18 > dagit -w workspace.yaml 6s py dagster rb 1.9.3-p374
Loading repository...
Traceback (most recent call last):
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 11, in execute_command_in_subprocess
subprocess.check_output(parts, stderr=subprocess.STDOUT)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 336, in check_output
**kwargs).stdout
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 418, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/Users/sspaeti/.venvs/dagster/bin/python3', '-m', 'dagster', 'api', 'repository', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmp8wmo70wo', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmpick8v9u_']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/sspaeti/.venvs/dagster/bin/dagit", line 8, in <module>
sys.exit(main())
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 197, in main
cli(auto_envvar_prefix='DAGIT') # pylint:disable=E1120
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 91, in ui
host_dagit_ui(host, port, path_prefix, storage_fallback, port_lookup, **kwargs)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 102, in host_dagit_ui
workspace, host, port, path_prefix, storage_fallback, port_lookup
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 124, in host_dagit_ui_with_workspace
app = create_app_from_workspace(workspace, instance, path_prefix)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/app.py", line 233, in create_app_from_workspace
locations.append(PythonEnvRepositoryLocation(repository_location_handle))
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/host_representation/repository_location.py", line 246, in __init__
er.name: er for er in sync_get_external_repositories(self._handle)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/snapshot_repository.py", line 27, in sync_get_external_repositories
RepositoryPythonOrigin(repository_location_handle.executable_path, pointer),
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 32, in execute_unary_api_cli_command
execute_command_in_subprocess(parts)
File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 14, in execute_command_in_subprocess
"Error when executing API command {cmd}: {output}".format(cmd=e.cmd, output=e.output)
dagster.serdes.ipc.DagsterIPCProtocolError: Error when executing API command ['/Users/sspaeti/.venvs/dagster/bin/python3', '-m', 'dagster', 'api', 'repository', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmp8wmo70wo', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmpick8v9u_']: b'Traceback (most recent call last):\n File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 193, in _run_module_as_main\n "__main__", mod_spec)\n File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 85, in _run_code\n exec(code, run_globals)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/__main__.py", line 3, in <module>\n main()\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/__init__.py", line 38, in main\n cli(obj={}) # pylint:disable=E1123\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 764, in __call__\n return self.main(*args, **kwargs)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 717, in main\n rv = self.invoke(ctx)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 1137, in invoke\n return _process_result(sub_ctx.command.invoke(sub_ctx))\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 1137, in invoke\n return _process_result(sub_ctx.command.invoke(sub_ctx))\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 956, in invoke\n return ctx.invoke(self.callback, **ctx.params)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 555, in invoke\n return callback(*args, **kwargs)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/api.py", line 103, in command\n output = check.inst(fn(args), output_cls)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/api.py", line 151, in repository_snapshot_command\n return external_repository_data_from_def(recon_repo.get_definition())\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 35, in get_definition\n return repository_def_from_pointer(self.pointer)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 309, in repository_def_from_pointer\n target = def_from_pointer(pointer)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 272, in def_from_pointer\n target = pointer.load_target()\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/code_pointer.py", line 178, in load_target\n module = load_python_file(self.python_file, None)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/code_pointer.py", line 104, in load_python_file\n module = import_module_from_path(module_name, python_file)\n File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/seven/__init__.py", line 95, in import_module_from_path\n spec.loader.exec_module(module)\n File "<frozen importlib._bootstrap_external>", line 678, in exec_module\n File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed\n File "/Users/sspaeti/Documents/git/dagster-rs-latest/src/latest/images/docker/pipelines/mnt/pipelines.py", line 37, in <module>\n from solids import (\n File "/Users/sspaeti/Documents/git/dagster-rs-latest/src/latest/images/docker/pipelines/mnt/solids.py", line 12, in <module>\n from pyspark.sql.functions import explode\n File "<frozen importlib._bootstrap>", line 971, in _find_and_load\n File "<frozen importlib._bootstrap>", line 945, in _find_and_load_unlocked\nKeyError: \'pyspark.sql\'\n'
schrockn
07/20/2020, 1:17 PMschrockn
07/20/2020, 1:17 PM__init__.py
file?)Simon Späti
07/20/2020, 1:57 PMpyspark.sql.functions
(pip install pyspark) and it’s installed on my machine (virtualenv). If so, it shouldn’t be dependent on any local init.py, correct?Simon Späti
07/20/2020, 1:59 PM_from_ pyspark.sql _import_ DataFrame
works fine 🤔Simon Späti
07/20/2020, 2:26 PMPermissive
or Shape
type I should use, or is dict_with_fields
still ok? 🤔Simon Späti
07/20/2020, 4:04 PMfrom types import DeltaCoordinate
same as S3Coordinate
in the old airline example here. But I’m struggling a lot, I tried with `Permissive`and DagsterType
and dagster_type_loader
.
What is the best way to have a DeltaCoordinate that I can use this as a type in my solids as this:
@solid(
required_resource_keys={'spark', 's3'},
description='''Creates the delta table on S3 and returns the DeltaCoordinates
It will remove existing data on that path and or delte existing delta table.''',
config_schema={
#......here is my confic....
},
)
def create_delta_table(
context, data_frame: DataFrame, delta_coordinate: DeltaCoordinate) -> DeltaCoordinate:
#my code here
pass
Any help or hint highly appreciated. That worked in older versions.sandy
07/20/2020, 6:15 PMmake_python_type_usable_as_dagster_type?
sandy
07/20/2020, 6:15 PMSimon Späti
07/20/2020, 8:21 PM"""Type definitions for the ..."""
from dagster import Field, String, make_python_type_usable_as_dagster_type, Dict
# from types import make_python_type_usable_as_dagster_type
from dagster_aws.s3.solids import dict_with_fields
DeltaCoordinate = dict_with_fields(
name='DeltaCoordinate',
fields={
'database': Field(String, description='database or schema or delta table'),
'table_name': Field(String, description='table name of the delta table'),
's3_coordinate_bucket': Field(String, description='s3 bucket'),
's3_coordinate_key': Field(String, description='s3 delta table path'),
},
)
make_python_type_usable_as_dagster_type(python_type=Dict, dagster_type=DeltaCoordinate)
How can I import types.py
. Before I could do _from_ types _import_ DruidCoordinate
. Do I need to add it to workspace.yaml, or how do I do it in 0.8.8? I want to avoid re-defining these types again in my different solidssandy
07/20/2020, 8:22 PMfrom types import DruidCoordinate
should work fine - is it not working for you?Simon Späti
07/20/2020, 8:30 PMsandy
07/20/2020, 8:33 PMprha
07/20/2020, 8:43 PMSimon Späti
07/20/2020, 8:54 PMS3FileHandle
which have it. I copied now the code again (as initially, but including the make_python_type_usable_as_dagster_type
and now it seems to work.
I will now add back again all my solid/pipeline code and check further errors. But hopefully now I will manage. Thanks againSimon Späti
07/20/2020, 9:04 PMtypes.py
. But then it didn’t work anylonger!! If I rename types.py to types2.py, it works.. do you have any idea why this happens, is types.py a reserved name??prha
07/20/2020, 9:06 PMprha
07/20/2020, 9:06 PMtypes
that it’s probably conflicting withprha
07/20/2020, 9:08 PM0.8.7
we made some changes to the order of module resolutionprha
07/20/2020, 9:08 PMprha
07/20/2020, 9:09 PMworkspace.yaml
, might be able to suggest some changes to resolve them more explicitlySimon Späti
07/20/2020, 9:11 PMSimon Späti
07/20/2020, 9:12 PMload_from:
- python_file:
relative_path: pipeline_testing.py
location_name: networkscore
prha
07/20/2020, 9:12 PMprha
07/20/2020, 9:13 PMworking_directory
that is an option to python_file
prha
07/20/2020, 9:14 PMload_from:
- python_file:
relative_path: pipeline_testing.py
location_name: networkscore
working_directory: .
Simon Späti
07/20/2020, 9:17 PMtypes2.py
. types.py
does not work either