https://dagster.io/ logo
#announcements
Title
# announcements
n

Noah K

11/11/2020, 9:07 AM
I did it using resources rather than solidconfigs and that's seemed to be easier
e

Eric Bellet

11/11/2020, 9:08 AM
Have you an example? I’m learning Dagster
n

Noah K

11/11/2020, 9:09 AM
Just had to redact some bucket names 🙂
e

Eric Bellet

11/11/2020, 9:10 AM
I didn’t see this https://github.com/dagster-io/dagster/discussions/ for questions please 🙏🏻🙏🏻🙏🏻… I removed my question from here
thanks 😄
Ok, how about the for bucle? I received this error
dagster.core.errors.DagsterInvariantViolationError: Attempted to iterate over an InvokedSolidOutputHandle. This object represents the output "result" from the solid "setDatesToProcess". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
Copy code
from dagster import (
    DagsterType,
    InputDefinition,
    OutputDefinition,
    String,
    execute_pipeline,
    pipeline,
    solid,
)
from datetime import datetime, timedelta


@solid
def setDatesToProcess(context, startDt, endDt):
    startDt = datetime.strptime(startDt, "%Y-%m-%d")
    endDt = datetime.strptime(endDt, "%Y-%m-%d")
    dates = [startDt + timedelta(days=x) for x in range((endDt - startDt).days + 1)]
    return dates

@solid
def obtainDataFromKafka(context, date):
    <http://context.log.info|context.log.info>(f"Extracting data from {date}")
    return ["Example", "Data"]

@solid
def saveFileS3(context, data, date):
    path = f"s3:mybucket/myfolder/{date}"
    <http://context.log.info|context.log.info>(f"Saving {data} in the path {path}")
    return path

@solid
def copys3FileToTableRedshift(context, path):
    <http://context.log.info|context.log.info>(f"Copying to Redshift using S3 file {path}")


@pipeline
def my_pipeline():
    dates = setDatesToProcess()

    for date in dates:
        copys3FileToTableRedshift(saveFileS3(obtainDataFromKafka(date)))


run_config = {
    "solids": {
        "setDatesToProcess": {
            "inputs": {
                "startDt": {"value": "2020-10-05"},
                "endDt": {"value": "2020-10-10"}
            }
        }
    }
}

if __name__ == "__main__":
    execute_pipeline(my_pipeline, run_config=run_config)
n

Noah K

11/11/2020, 9:23 AM
Loops in a pipeline is not something Dagster supports
(yet)
e

Eric Bellet

11/11/2020, 9:24 AM
Ok, do you have an estimation date?
n

Noah K

11/11/2020, 9:24 AM
I don't work on Dagster itself, just another new user 🙂
I've just been poking them about loops too.
😋 1
e

Eric Bellet

11/11/2020, 9:24 AM
Heheh ok thank you very much 🙂
n

Noah K

11/11/2020, 9:25 AM
Basically when working with pipelines, remember the Python code is just a DSL for generating a purely static PipelineConfig object
So all values flowing around between solids are just placeholders
e

Eric Bellet

11/11/2020, 9:30 AM
I see, thanks again
s

sashank

11/11/2020, 4:18 PM
cc @cat re: loops
c

cat

11/11/2020, 4:40 PM
hey, yup i’m currently working on it. don’t have a firm eta yet. feel free to message me with your use case. the v0 is intended to only do one level of fan-out without a collect step
🦜 1
e

Eric Bellet

11/11/2020, 4:41 PM
6 Views