quick question regarding upgrade `0.8`: I’m trying...
# announcements
s
quick question regarding upgrade `0.8`: I’m trying to upgrade my old pipelines, they were running under
0.7.x
. I’m having two small things for now: 1. When I
_from_ pyspark.sql.functions _import_ explode
I get
module>\n    from pyspark.sql.functions import explode\n  File "<frozen importlib._bootstrap>", line 971, in _find_and_load\n  File "<frozen importlib._bootstrap>", line 945, in _find_and_load_unlocked\nKeyError: \'pyspark.sql\'\n'
. Am I doing something wrong? I need explode in my pyspark code. Full StackTrace in thread. 2. I used some special Druid and Delta Types. I used the
dict_with_fields
that is used in the
S3Coordinate
here. I guess that’s not the way to go as I also had to manually import the module dict_with_fields. What would be the best way to define a Dict Type as the following:
Copy code
DeltaCoordinate = dict_with_fields(
    'DeltaCoordinate',
    fields={
        'database': Field(String, description='database or schema or delta table'),
        'table_name': Field(String, description='table name of the delta table'),
        's3_coordinate_bucket': Field(String, description='s3 bucket'),
        's3_coordinate_key': Field(String, description='s3 delta table path'),
    },
)
Full stacktrace to number 1, not sure if it helps:
Copy code
~/Doc/git/dagster-r/src/latest/images/docker/pipelines/mnt master ?18 > dagit -w workspace.yaml                                                                                 6s py dagster rb 1.9.3-p374
Loading repository...
Traceback (most recent call last):
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 11, in execute_command_in_subprocess
    subprocess.check_output(parts, stderr=subprocess.STDOUT)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 336, in check_output
    **kwargs).stdout
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/subprocess.py", line 418, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/Users/sspaeti/.venvs/dagster/bin/python3', '-m', 'dagster', 'api', 'repository', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmp8wmo70wo', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmpick8v9u_']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/sspaeti/.venvs/dagster/bin/dagit", line 8, in <module>
    sys.exit(main())
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 197, in main
    cli(auto_envvar_prefix='DAGIT')  # pylint:disable=E1120
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 91, in ui
    host_dagit_ui(host, port, path_prefix, storage_fallback, port_lookup, **kwargs)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 102, in host_dagit_ui
    workspace, host, port, path_prefix, storage_fallback, port_lookup
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/cli.py", line 124, in host_dagit_ui_with_workspace
    app = create_app_from_workspace(workspace, instance, path_prefix)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagit/app.py", line 233, in create_app_from_workspace
    locations.append(PythonEnvRepositoryLocation(repository_location_handle))
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/host_representation/repository_location.py", line 246, in __init__
    er.name: er for er in sync_get_external_repositories(self._handle)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/snapshot_repository.py", line 27, in sync_get_external_repositories
    RepositoryPythonOrigin(repository_location_handle.executable_path, pointer),
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 32, in execute_unary_api_cli_command
    execute_command_in_subprocess(parts)
  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/api/utils.py", line 14, in execute_command_in_subprocess
    "Error when executing API command {cmd}: {output}".format(cmd=e.cmd, output=e.output)
dagster.serdes.ipc.DagsterIPCProtocolError: Error when executing API command ['/Users/sspaeti/.venvs/dagster/bin/python3', '-m', 'dagster', 'api', 'repository', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmp8wmo70wo', '/var/folders/1s/xgp0swn93wd1b403jsf4t9bc0000gn/T/tmpick8v9u_']: b'Traceback (most recent call last):\n  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 193, in _run_module_as_main\n    "__main__", mod_spec)\n  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py", line 85, in _run_code\n    exec(code, run_globals)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/__main__.py", line 3, in <module>\n    main()\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/__init__.py", line 38, in main\n    cli(obj={})  # pylint:disable=E1123\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 764, in __call__\n    return self.main(*args, **kwargs)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 717, in main\n    rv = self.invoke(ctx)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 1137, in invoke\n    return _process_result(sub_ctx.command.invoke(sub_ctx))\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 1137, in invoke\n    return _process_result(sub_ctx.command.invoke(sub_ctx))\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 956, in invoke\n    return ctx.invoke(self.callback, **ctx.params)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/click/core.py", line 555, in invoke\n    return callback(*args, **kwargs)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/api.py", line 103, in command\n    output = check.inst(fn(args), output_cls)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/cli/api.py", line 151, in repository_snapshot_command\n    return external_repository_data_from_def(recon_repo.get_definition())\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 35, in get_definition\n    return repository_def_from_pointer(self.pointer)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 309, in repository_def_from_pointer\n    target = def_from_pointer(pointer)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/definitions/reconstructable.py", line 272, in def_from_pointer\n    target = pointer.load_target()\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/code_pointer.py", line 178, in load_target\n    module = load_python_file(self.python_file, None)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/core/code_pointer.py", line 104, in load_python_file\n    module = import_module_from_path(module_name, python_file)\n  File "/Users/sspaeti/.venvs/dagster/lib/python3.6/site-packages/dagster/seven/__init__.py", line 95, in import_module_from_path\n    spec.loader.exec_module(module)\n  File "<frozen importlib._bootstrap_external>", line 678, in exec_module\n  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed\n  File "/Users/sspaeti/Documents/git/dagster-rs-latest/src/latest/images/docker/pipelines/mnt/pipelines.py", line 37, in <module>\n    from solids import (\n  File "/Users/sspaeti/Documents/git/dagster-rs-latest/src/latest/images/docker/pipelines/mnt/solids.py", line 12, in <module>\n    from pyspark.sql.functions import explode\n  File "<frozen importlib._bootstrap>", line 971, in _find_and_load\n  File "<frozen importlib._bootstrap>", line 945, in _find_and_load_unlocked\nKeyError: \'pyspark.sql\'\n'
(missing
__init__.py
file?)
s
This module is not a local module, I import it from
pyspark.sql.functions
(pip install pyspark) and it’s installed on my machine (virtualenv). If so, it shouldn’t be dependent on any local init.py, correct?
totally strange is, that two lines above
_from_ pyspark.sql _import_ DataFrame
works fine 🤔
somehow I have pyspark==2.4.5 not 2.4.4, maybe that’s the issue, trying to change the version. What about the 2. question, would that be
Permissive
or
Shape
type I should use, or is
dict_with_fields
still ok? 🤔
I’m a bit lost, as in point two, I’m trying to import my custom-dict with e.g.
from types import DeltaCoordinate
same as
S3Coordinate
in the old airline example here. But I’m struggling a lot, I tried with `Permissive`and
DagsterType
and
dagster_type_loader
. What is the best way to have a DeltaCoordinate that I can use this as a type in my solids as this:
Copy code
@solid(
    required_resource_keys={'spark', 's3'},
    description='''Creates the delta table on S3 and returns the DeltaCoordinates

    It will remove existing data on that path and or delte existing delta table.''',
    config_schema={
        #......here is my confic....
    },
)
def create_delta_table(
    context, data_frame: DataFrame, delta_coordinate: DeltaCoordinate) -> DeltaCoordinate:
    #my code here
    pass
Any help or hint highly appreciated. That worked in older versions.
s
Hi @Simon Späti - are you using
make_python_type_usable_as_dagster_type?
That's required to enable using DeltaCoordinate as a type annotation
s
Thanks @sandy. No, only for DataFrame. OK I’m trying now with DeltaCoordinate. I guess I am a step further. But still fighting imports. If I have my file `types.py`:
Copy code
"""Type definitions for the ..."""

from dagster import Field, String, make_python_type_usable_as_dagster_type, Dict

# from types import make_python_type_usable_as_dagster_type
from dagster_aws.s3.solids import dict_with_fields

DeltaCoordinate = dict_with_fields(
    name='DeltaCoordinate',
    fields={
        'database': Field(String, description='database or schema or delta table'),
        'table_name': Field(String, description='table name of the delta table'),
        's3_coordinate_bucket': Field(String, description='s3 bucket'),
        's3_coordinate_key': Field(String, description='s3 delta table path'),
    },
)

make_python_type_usable_as_dagster_type(python_type=Dict, dagster_type=DeltaCoordinate)
How can I import
types.py
. Before I could do
_from_ types _import_ DruidCoordinate
. Do I need to add it to workspace.yaml, or how do I do it in 0.8.8? I want to avoid re-defining these types again in my different solids
s
I believe
from types import DruidCoordinate
should work fine - is it not working for you?
s
It doesn’t, but I guess it’s more related to new workspace.yaml or DAGSTER_HOME / PYTHONPATH. Or I don’t know, I’m confused, I always get keyError
s
looking at that error, it seems to be happening inside boto as far as I can tell? possibly when boto is loading config?
p
@Simon Späti what’s your workspace.yaml file look like?
s
arg ok, now you solved the mystery, thank you so much!! I stripped it down to almost nothing, and it was boto3.. but I didn’t use boto3 at all that’s why I didn’t care so much, but the import _`from dagster_aws.s3.solids import dict_with_fields`_ imports
S3FileHandle
which have it. I copied now the code again (as initially, but including the
make_python_type_usable_as_dagster_type
and now it seems to work. I will now add back again all my solid/pipeline code and check further errors. But hopefully now I will manage. Thanks again
@sandy @prha Actually just found another mean error, Now I copied my `types_testing.py`into my original
types.py
. But then it didn’t work anylonger!! If I rename types.py to types2.py, it works.. do you have any idea why this happens, is types.py a reserved name??
p
well, I suspect it has to do with how we’re importing modules
there’s a built-in module
types
that it’s probably conflicting with
with
0.8.7
we made some changes to the order of module resolution
to not make it dependent on the working directory
If you shared how you’re loading your code in your
workspace.yaml
, might be able to suggest some changes to resolve them more explicitly
s
OK, this is probably the reason. but it’s very dangerous, the error is a strange keyerror. I guess there should be a list of reserved names then? If you have a trick to get around, would be great as well. But hopefully I’m not twice unlucky and find anoter reserved name 🙂
my workspace.yaml is very simple:
Copy code
load_from:
  - python_file:
      relative_path: pipeline_testing.py
      location_name: networkscore
p
great
we have another argument
working_directory
that is an option to
python_file
so you can do something like:
Copy code
load_from:
  - python_file:
      relative_path: pipeline_testing.py
      location_name: networkscore
      working_directory: .
s
OK that works as well, but only when I use
types2.py
.
types.py
does not work either