gazpot
02/27/2021, 6:33 PMdef get_market_ohlc_factory(name, market):
@solid(name=name, required_resource_keys={"api"})
def _get_market_ohlc(context):
ts_ohlc = context.resources.api.get_ohlc(market)
return ts_ohlc
return _get_market_ohlc
This runs as expected
@solid(required_resource_keys={"api"})
def _get_market_ohlc(context):
market = 'BTC/USDT'
ts_ohlc = context.resources.api.get_ohlc(market)
return ts_ohlc
The purpose of the factory is to pass parameters form a list to create solids. The pipeline func looks like this. The get_market_list()
returns a python list from a file.
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"postgres": postgres_resource,
"api": api_resource,
"redis": redis_resource})
])
def run_pipeline():
market_list = get_market_list()
for market in market_list:
ts_ohlc = get_market_ohlc_factory(name=f'_get_market_ohlc', market=market)
schrockn
02/27/2021, 6:46 PMget_market_ohlc_factory(name=f'_get_market_ohlc', market=market)()
from dagster import pipeline, solid
def get_market_list():
return ["A", "B"]
def get_market_ohlc_factory(name, market):
@solid(name=name)
def _get_market_ohlc(context):
pass
# ts_ohlc = context.resources.api.get_ohlc(market)
# return ts_ohlc
return _get_market_ohlc
@pipeline(
# mode_defs=[
# ModeDefinition(
# resource_defs={
# "postgres": postgres_resource,
# "api": api_resource,
# "redis": redis_resource,
# }
# )
# ]
)
def run_pipeline():
market_list = get_market_list()
for market in market_list:
solid_def_for_market = get_market_ohlc_factory(name=f"market_{market}", market=market)
solid_def_for_market()
gazpot
02/28/2021, 3:17 AMdagster.core.errors.DagsterInvariantViolationError: Attempted to index in to an InvokedSolidOutputHandle. This object represents the output "result" from the solid "_process_data_BTC_USDT". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
Reading some posts, this seems like a separate issue and related to loops in pipelines. What is strange is that the first two solids run correctly and they are all returning similar numpy arrays (have also tried pandas and list). Before I make a new thread, wondering if there is a maximum depth of solids which can be executed in a for loop?