Can I define my solids in a python class and use t...
# announcements
m
Can I define my solids in a python class and use them in defining the pipeline using the class object, when I creat my dag?
a
i think so? Not sure we’ve tried. The decorators turn the functions in to instances of
SolidDefinition
,
PipelineDefinition
etc so as long as those instances get wired up correctly i think it should work
I recommend giving it a shot with a small toy version to verify
ūüíĮ 1
the top level thing that returns the
RepositoryDefinition
will have to be a function
m
This is what I tried:
Copy code
from dagster import execute_pipeline, pipeline, solid, DagsterType
import pandas as pd
import json
import os


class Foo:
       def __init__(context, self):
       self.target_sheets = {}
       self.targets = None
       self.target_path = r'/Users/foo/target'
       self.target_sheets = self.get_sheet_config()

       def get_sheet_config(self):
       with open(r'config.json') as sheets:
           return json.load(sheets)

       def change_current_directory(self, path):
       dir = os.path.join(os.path.expanduser('~'), path)
       os.chdir(dir)
       return dir
       @solid
   def check_for_targets(self, context):
       <http://context.log.info|context.log.info>(f'Changing paths: {target_path}')
       change_current_directory(target_path)
       files = os.listdir(target_path)
       <http://context.log.info|context.log.info>(f'Available files: {files}')
       if len(files) > 0:
           return True
           <http://context.log.info|context.log.info>(f'Path and files: {target_path}, {files[0]}')
       else:
           return False

   @solid
   def check_sheet(self, context, check: bool):
       if check:
           try:
               change_current_directory(target_path)
               self.targets = pd.ExcelFile(file_name)
               if sorted(list(self.targets.sheet_names)) == sorted(self.target_sheets.keys()):
                   return True
               else:
                   return False
           except Exception as ex:
               <http://context.log.info|context.log.info>(f'Error to check {ex}')
               return False
       else:
           <http://context.log.info|context.log.info>(f'Targets file does not exist')
           return False

foo = Foo()
# dagit -f hubba_targets.py -n targets_pipeline`
@pipeline
def foo_pipeline():
   foo.check_sheet(
       foo.check_sheet()
   )
And the error is ..
Loading repository... Traceback (most recent call last):  File "/opt/anaconda3/envs/grand/bin/dagit-cli", line 8, in <module>    sys.exit(main())  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagit/cli.py", line 209, in main    cli(auto_envvar_prefix='DAGIT')  # pylint:disable=E1120  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/click/core.py", line 829, in call    return self.main(*args, **kwargs)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/click/core.py", line 782, in main    rv = self.invoke(ctx)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/click/core.py", line 1066, in invoke    return ctx.invoke(self.callback, **ctx.params)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/click/core.py", line 610, in invoke    return callback(*args, **kwargs)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagit/cli.py", line 123, in ui    host_dagit_ui(host, port, storage_fallback, reload_trigger, port_lookup, **kwargs)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagit/cli.py", line 131, in host_dagit_ui    return host_dagit_ui_with_execution_handle(  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagit/cli.py", line 154, in host_dagit_ui_with_execution_handle    app = create_app_with_execution_handle(handle, instance, reloader)  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagit/app.py", line 223, in create_app_with_execution_handle    context = DagsterGraphQLInProcessRepositoryContext(  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagster_graphql/implementation/context.py", line 114, in init    self.repository_definition = handle.build_repository_definition()  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 406, in build_repository_definition    obj = self.entrypoint.perform_load()  File "/opt/anaconda3/envs/grand/lib/python3.8/site-packages/dagster/core/definitions/handle.py", line 441, in entrypoint
I generally prefer a class to define things rather than a file. Also decorating a class as solid is the best thing but I shouldn't be suggesting features. Kind of new to dagster
a
I dont see
targets_pipeline
so if you’re trying to target a
@pipeline
via the CLI it will have to be a top level function your
@solid
function on the class are going to need to be
@staticmethod
- we can work with instance methods
m
No that is an old comment
a
how were you invoking dagit? is there any part of the stack trace thats missing? I dont see a specific error
m
Thanks...I will do that. Also, thanks for the prompt responses
dagit -f test.py -n foo_pipeline
a
another thing to try before running dagit is just
python test.py
to make sure it evaluates correctly
m
Hey it works. Changes: Make the solids also static methods and it also didn't like the f string...I don't know why
ūüĎć 2
I found out about the f string using Python test.py
f string issue only when I run Python test.py
Thanks for the help
m
fwiw, you can also use the
SolidDefinition
class directly!
m
I started using dragster today, beside reading about it and really like it. I have to read about SolidDefinition and jump on it.