```/home/kayvan/projects/dagster/lib/python3.9/sit...
# ask-community
k
Copy code
/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py:93: UserWarning: While in @job context 'ticker_single_pipeline', received an uninvoked op 'get_ticker_data'.
  warnings.warn(warning_message.strip())
/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/context.py:554: UserWarning: Error loading repository location demo:dagster.core.errors.DagsterInvalidDefinitionError: In @job ticker_single_pipeline, received invalid type <class 'str'> for input "start_date" (passed by keyword) in op invocation "get_ticker_data". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.

Stack Trace:
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 224, in __init__
    self._loaded_repositories = LoadedRepositories(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 98, in __init__
    loadable_targets = get_loadable_targets(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets
    else loadable_targets_from_python_package(package_name, working_directory)
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 48, in loadable_targets_from_python_package
    module = load_python_module(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module
    return importlib.import_module(module_name)
  File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/kayvan/projects/dagster-demo/demo/demo/__init__.py", line 1, in <module>
    from .repository import demo, yahoo_finance
  File "/home/kayvan/projects/dagster-demo/demo/demo/repository.py", line 4, in <module>
    from demo.jobs.yahoofin.multi_ticker import ticker_mutli_pipeline
  File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/multi_ticker.py", line 2, in <module>
    from demo.jobs.yahoofin.single_ticker import ticker_single_pipeline
  File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 11, in <module>
    def ticker_single_pipeline(ticker_dict: dict):
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 204, in job
    return _Job()(name)
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 75, in __call__
    ) = do_composition(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 1012, in do_composition
    output = fn(**kwargs)
  File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 12, in ticker_single_pipeline
    data = get_ticker_data(ticker_dict, start_date="2022-05-01")
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 146, in __call__
    return super(SolidDefinition, self).__call__(*args, **kwargs)
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/node_definition.py", line 162, in __call__
    return PendingNodeInvocation(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 387, in __call__
    self._process_argument_node(
  File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 510, in _process_argument_node
    raise DagsterInvalidDefinitionError(

  warnings.warn(
dagster bot responded by community 1
z
Could you please post the code that you're having problems with (probably the code for your
ticker_single_pipeline
job? It looks like your graph isn't being constructed right, possibly due to not passing your inputs between your ops correctly
k
The file with Ops
Single ticker graph file
Job where the graph is called
Passing inputs from one op to other op seems very confusing from the docs
z
Okay there's a few things going on here. Let's start with your graph. One thing to understand is that normal python operations for the most part shouldn't occur within a graph definition, because the graph definition is purely for dagster to determine how the ops are connected. So start_date and end_date for get_ticker_data() need to come from an upstream op, from a graph input, or from an op configuration. Another fundamental piece is that graphs are a generalization of a job - another way of saying that is that a job is an instance of a graph. So usually you'll define the structure of your ops in a graph, then configure that graph with resources and other configuration to determine the behavior of the ops. There's a nice example of this here. You'll also want to look into op configuration vs. op inputs. At a high level, ops can be configured at runtime with values for the options they define in their config_schema. To access these configuration variables within an op, you need to access the
context
variable which is optionally present as the first parameter for every op. So for example, your
get_ticker_data
op would look something like this:
Copy code
@op(
    config_schema={
        "start_date": str,
        "end_date": str,
    }
)
def get_ticker_data(
    context,
    ticker_dict
):
    ticker = ticker_dict["ticker_code"]
    start_date = context.op_config['start_date']
    end_date = context.op_config['end_date']
    data = pdr.get_data_yahoo(ticker, start=start_date, end=end_date)
    return data
In this case you have a start and end date that can be configured at runtime, either in the Dagit launchpad or by passing configuration (like your
default_config
) to a .execute_in_process() call. I also left `ticker_dict`as an op input because it looks like you want that generated dynamically by an upstream op. Inputs are often best suited for situations when the data either needs to be generated during the job by an upstream op, or when you need to declare a dependency between one op and another (meaning one op needs to execute after the other) It also looks like you're trying to do a fan-out in
ticker_multi_pipeline
- you'll want to read up on the dynamic mapping feature to achieve this. I think a good place to start would be to just try to get your
ticker_single_pipeline
job working first, then refactor it to use a dynamic graph to achieve the
ticker_multi_pipeline
functionality. In the end one solution would be to have one graph in total, that uses dynamic mapping to map the operations in
ticker_single_pipeline
across all the ticker_dict items in your config. If you modify your
get_ticker_data
op like I showed above, your graph could look like this:
Copy code
@graph(name="singleTicker")
def ticker_single_pipeline(ticker_dict: dict):
    data = get_ticker_data(
        ticker_dict,
    )
    data = clean_ticker_data(data)
    data = basic_preprocess(data)
    save_ticker_data(ticker_dict, data)
Then you can test that graph with something like this:
Copy code
ticker_single_pipeline_job_testing = ticker_single_pipeline.to_job(name='ticker_single_pipeline_job')
ticker_single_pipeline_job_testing.execute_in_process({'ops': {'get_ticker_data': {'config': {'start_date': '2022/01/01',
                                                     'end_date': '2022/01/01'}}},
              'inputs': {'ticker_dict': {'ticker': 'info'}}}
Overall this is a non-trivial example to start working on, I'd suggest scoping down your work a bit in order to better understand some of the fundamentals like inputs vs. outputs, job compilation, and dynamic mapping.