https://dagster.io/ logo
g

gazpot

02/27/2021, 6:33 PM
Hello all, I'm having a bit of trouble getting a solid factory to work. The solid runs as expected when it is not encapsulated in a wrapper function, however I must not be passing the correct information to the decorator. If anyone could look at the examples below and let me know if it looks off. Thanks for your help. This is the factory, and does not get invoked by the pipeline.
Copy code
def get_market_ohlc_factory(name, market):
    @solid(name=name, required_resource_keys={"api"})
    def _get_market_ohlc(context):

        ts_ohlc = context.resources.api.get_ohlc(market)

        return ts_ohlc

    return _get_market_ohlc
This runs as expected
Copy code
@solid(required_resource_keys={"api"})
def _get_market_ohlc(context):

    market = 'BTC/USDT'
    
    ts_ohlc = context.resources.api.get_ohlc(market)

    return ts_ohlc
The purpose of the factory is to pass parameters form a list to create solids. The pipeline func looks like this. The
get_market_list()
returns a python list from a file.
Copy code
@pipeline(
        mode_defs=[
            ModeDefinition(
                resource_defs={"postgres": postgres_resource,
                               "api": api_resource,
                               "redis": redis_resource})
        ])
def run_pipeline():
    market_list = get_market_list()
    for market in market_list:
      ts_ohlc = get_market_ohlc_factory(name=f'_get_market_ohlc', market=market)
s

schrockn

02/27/2021, 6:46 PM
I think the issue is that you need to do
get_market_ohlc_factory(name=f'_get_market_ohlc', market=market)()
get_market_ohlc_factory returns a solid, which needs to be called
you are also going to give the solids unique names or aliases
here’s a snippet that does what you want:
Copy code
from dagster import pipeline, solid


def get_market_list():
    return ["A", "B"]


def get_market_ohlc_factory(name, market):
    @solid(name=name)
    def _get_market_ohlc(context):
        pass
        # ts_ohlc = context.resources.api.get_ohlc(market)
        # return ts_ohlc

    return _get_market_ohlc


@pipeline(
    # mode_defs=[
    #     ModeDefinition(
    #         resource_defs={
    #             "postgres": postgres_resource,
    #             "api": api_resource,
    #             "redis": redis_resource,
    #         }
    #     )
    # ]
)
def run_pipeline():
    market_list = get_market_list()
    for market in market_list:
        solid_def_for_market = get_market_ohlc_factory(name=f"market_{market}", market=market)
        solid_def_for_market()
hope that helps!
g

gazpot

02/28/2021, 3:17 AM
Thanks for the quick reply @schrockn. Yes that worked, it make sense now. One other thing. The loop in the above example seems to be working for the first two solids in a sequence, however when it gets to a third one I am now getting an error
dagster.core.errors.DagsterInvariantViolationError: Attempted to index in to an InvokedSolidOutputHandle. This object represents the output "result" from the solid "_process_data_BTC_USDT". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
Reading some posts, this seems like a separate issue and related to loops in pipelines. What is strange is that the first two solids run correctly and they are all returning similar numpy arrays (have also tried pandas and list). Before I make a new thread, wondering if there is a maximum depth of solids which can be executed in a for loop?
2 Views