I’m wondering if there is a way to dynamically add...
# announcements
j
I’m wondering if there is a way to dynamically add solids to the execution graph? I have a daily schedule that retrieves a set of urls of unknown size and for each URL I would like to retrieve the data from the url, parse the data, and upload the parsed results to our database. While I could do this all in one step it would be nice if each url and each parsing of the results of that url could throw errors and fail without stopping the full process. e.g.
Copy code
@solid
def get_urls(_):
	url_list = pd.read_csv('urls_to_update.csv')
    # This can be a list of size 0-100
	for url in url_list:
		yield Output(url, "url")

@solid 
def parse_data(_, url):
	res = requests.get(url).json()

	structured_data = dict(
		url=url,
		col_a=res["col_a"],
		col_b=res["col_b"]	
	)

	yield Output(structured_data, 'result')


@solid
def upsert_data(_, data):
	# Some logic to load the data into warehouse 



@pipeline
def my_dynamic_pipeline():
	upsert_data(
		data=parse_data(
			url=get_urls()
		)
	)
Is this just not feasible? Not a huge issue if not I can work around it.
m
hi @John Mav this isn't currently possible, but we are considering something like this for 0.9.0
would you mind commenting with this use case on our tracking issue https://github.com/dagster-io/dagster/issues/462
j
Thanks Max! That’s kind of what I figured 🙂 glad to hear this is something y’all are thinking of though!
👍 2
m
you can of course write solids with List-typed inputs/outputs to implement this kind of thing, but without the nicely isolated error handling
j
Yeah I had something working before with List input/outputs and then started down this little rabbit hole 😄