Alexey Kuntsevich
08/13/2020, 4:53 PMalex
08/13/2020, 5:18 PMAlexey Kuntsevich
08/13/2020, 6:45 PMalex
08/13/2020, 6:52 PM@pipeline
time@pipeline
def example():
for customer in fetch_customers():
configured(my_solid, name='{}_my_solid'.format(customer.name))({'customer_id': customer.id})()
Alexey Kuntsevich
08/13/2020, 8:13 PMdef produce_account_ids():
return [%some_stub_ids_here%]
@solid(output_defs=[OutputDefinition(String)])
def search_query_log_solid(context, account_id: int, dtstamp):
df=get_data_from_elsewhere()
fname = f"./log_{account_id}_{dtstamp:%Y%m%d}.parquet"
df.to_parquet(fname)
yield Output(fname)
@pipeline
def scraping_pipeline():
accs = produce_account_ids()
for acc in accs:
configured(
search_query_log_solid,
name=f"scrape_{acc}"
)({'account_id': acc, 'dtstamp': datetime.now()})
So it starts, yields some debug logs, but no result. If I start it as a python script (replacing the configured piece with just a function call, ofc), I get some resulting files. What do I miss?2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - PIPELINE_START - Started execution of pipeline "scraping_pipeline".
pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - ENGINE_EVENT - Executing steps in process (pid: 23854)
event_specific_data = {"error": null, "marker_end": null, "marker_start": null, "metadata_entries": [["pid", null, ["23854"]]]}
pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - ENGINE_EVENT - Finished steps in process (pid: 23854) in 0.91ms
event_specific_data = {"error": null, "marker_end": null, "marker_start": null, "metadata_entries": [["pid", null, ["23854"]]]}
pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - PIPELINE_SUCCESS - Finished execution of pipeline "scraping_pipeline".
pid = 23854
alex
08/13/2020, 9:03 PM()
at the endAlexey Kuntsevich
08/14/2020, 8:23 AM...
some big chunk of error stack here
...
File "/Users/alexeykuntsevich/.local/share/virtualenvs/slogger-R0opV2jW/lib/python3.8/site-packages/dagster/core/execution/api.py", line 565, in create_execution_plan
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
File "/Users/alexeykuntsevich/.local/share/virtualenvs/slogger-R0opV2jW/lib/python3.8/site-packages/dagster/core/system_config/objects.py", line 97, in build
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline scraping_pipeline
Error 1: Missing required field "solids" at the root. Available Fields: "['execution', 'intermediate_storage', 'loggers', 'resources', 'solids', 'storage']".
With all that, does this pipeline design approach make sense? I was thinking if going for two-dimensional PartitionSet would make more sense? Is it supported and would the execution engine support several thousands of partitions (I have around hundred of customer_ids and some hundreds of dates to backfill)?alex
08/14/2020, 2:29 PMconfigured
utility sets config for the solid, not input, so you need to adjust search_query_log_solid
to take that information as config instead of inputs