https://dagster.io/ logo
a

Alexey Kuntsevich

08/13/2020, 4:53 PM
Hi everyone! I got stuck with structuring an ETL pipeline with Dagster, I'd appreciate if someone could point me what do I misunderstand. I'm trying to run a simple processing (load from external source -> some pandas transformations -> offload on to S3 in parquet) over a data partitioned by dates and customer Ids. The process runs daily and will have to be backfilled to some fixed date in the past (this part is clear and documented in the guide). What makes me confused is how to run things for each customer separately. What I used to do when using Luigi was creating a Luigi task (which AFAII is similar to Dagster solid) which acquires the list of customers and then spawns instances of data processing task, one per customer. My attempts to iterate over a solid output failed, and that got me stuck. Running the processing over all customers at once is impossible and I'd prefer to avoid it due to several technical limitations. What are the options in this case? Thanks!
a

alex

08/13/2020, 5:18 PM
how is the customer list computed? We do not yet support mapping over solid outputs, but you may be able to move this to when the pipeline is defined
a

Alexey Kuntsevich

08/13/2020, 6:45 PM
There's a call to some remote api which returns the list of customer id's to be processed. The list might change from time to time, but not that often (once every couple of days)
a

alex

08/13/2020, 6:52 PM
ya so I think one option for now would be to handle this fanout at
@pipeline
time
Copy code
@pipeline
def example():
  for customer in fetch_customers():
    configured(my_solid, name='{}_my_solid'.format(customer.name))({'customer_id': customer.id})()
a

Alexey Kuntsevich

08/13/2020, 8:13 PM
Thanks for the advice! Here's what I'm currently at:
Copy code
def produce_account_ids():
    return [%some_stub_ids_here%]


@solid(output_defs=[OutputDefinition(String)])
def search_query_log_solid(context, account_id: int, dtstamp):
    df=get_data_from_elsewhere()
    fname = f"./log_{account_id}_{dtstamp:%Y%m%d}.parquet"
    df.to_parquet(fname)
    yield Output(fname)



@pipeline
def scraping_pipeline():
    accs = produce_account_ids()
    for acc in accs:
        configured(
            search_query_log_solid,
            name=f"scrape_{acc}"
        )({'account_id': acc, 'dtstamp': datetime.now()})
So it starts, yields some debug logs, but no result. If I start it as a python script (replacing the configured piece with just a function call, ofc), I get some resulting files. What do I miss?
Here's the whole resulting log I get
Copy code
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - PIPELINE_START - Started execution of pipeline "scraping_pipeline".
                 pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - ENGINE_EVENT - Executing steps in process (pid: 23854)
 event_specific_data = {"error": null, "marker_end": null, "marker_start": null, "metadata_entries": [["pid", null, ["23854"]]]}
                 pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - ENGINE_EVENT - Finished steps in process (pid: 23854) in 0.91ms
 event_specific_data = {"error": null, "marker_end": null, "marker_start": null, "metadata_entries": [["pid", null, ["23854"]]]}
                 pid = 23854
2020-08-13 22:11:41 - dagster - DEBUG - scraping_pipeline - 866f1df7-1108-411c-9691-5c0857bf79b3 - PIPELINE_SUCCESS - Finished execution of pipeline "scraping_pipeline".
                 pid = 23854
a

alex

08/13/2020, 9:03 PM
oop you need to invoke the solids too so just add
()
at the end
a

Alexey Kuntsevich

08/14/2020, 8:23 AM
Ah, I thought that passing the parameter dict already invokes the solid. Got your point, thanks. I still get a bunch of errors afterwards, like
Copy code
...
some big chunk of error stack here
...
File "/Users/alexeykuntsevich/.local/share/virtualenvs/slogger-R0opV2jW/lib/python3.8/site-packages/dagster/core/execution/api.py", line 565, in create_execution_plan
    environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
  File "/Users/alexeykuntsevich/.local/share/virtualenvs/slogger-R0opV2jW/lib/python3.8/site-packages/dagster/core/system_config/objects.py", line 97, in build
    raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline scraping_pipeline
    Error 1: Missing required field "solids" at the root. Available Fields: "['execution', 'intermediate_storage', 'loggers', 'resources', 'solids', 'storage']".
With all that, does this pipeline design approach make sense? I was thinking if going for two-dimensional PartitionSet would make more sense? Is it supported and would the execution engine support several thousands of partitions (I have around hundred of customer_ids and some hundreds of dates to backfill)?
a

alex

08/14/2020, 2:29 PM
so the
configured
utility sets config for the solid, not input, so you need to adjust
search_query_log_solid
to take that information as config instead of inputs
the 2-d partition thing would be elegant too but we don’t support it cleanly right now as far as I know
3 Views