I did it using resources rather than solidconfigs ...
# announcements
n
I did it using resources rather than solidconfigs and that's seemed to be easier
e
Have you an example? Iโ€™m learning Dagster
n
Just had to redact some bucket names ๐Ÿ™‚
e
I didnโ€™t see this https://github.com/dagster-io/dagster/discussions/ for questions please ๐Ÿ™๐Ÿป๐Ÿ™๐Ÿป๐Ÿ™๐Ÿปโ€ฆ I removed my question from here
thanks ๐Ÿ˜„
Ok, how about the for bucle? I received this error
dagster.core.errors.DagsterInvariantViolationError: Attempted to iterate over an InvokedSolidOutputHandle. This object represents the output "result" from the solid "setDatesToProcess". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
Copy code
from dagster import (
    DagsterType,
    InputDefinition,
    OutputDefinition,
    String,
    execute_pipeline,
    pipeline,
    solid,
)
from datetime import datetime, timedelta


@solid
def setDatesToProcess(context, startDt, endDt):
    startDt = datetime.strptime(startDt, "%Y-%m-%d")
    endDt = datetime.strptime(endDt, "%Y-%m-%d")
    dates = [startDt + timedelta(days=x) for x in range((endDt - startDt).days + 1)]
    return dates

@solid
def obtainDataFromKafka(context, date):
    <http://context.log.info|context.log.info>(f"Extracting data from {date}")
    return ["Example", "Data"]

@solid
def saveFileS3(context, data, date):
    path = f"s3:mybucket/myfolder/{date}"
    <http://context.log.info|context.log.info>(f"Saving {data} in the path {path}")
    return path

@solid
def copys3FileToTableRedshift(context, path):
    <http://context.log.info|context.log.info>(f"Copying to Redshift using S3 file {path}")


@pipeline
def my_pipeline():
    dates = setDatesToProcess()

    for date in dates:
        copys3FileToTableRedshift(saveFileS3(obtainDataFromKafka(date)))


run_config = {
    "solids": {
        "setDatesToProcess": {
            "inputs": {
                "startDt": {"value": "2020-10-05"},
                "endDt": {"value": "2020-10-10"}
            }
        }
    }
}

if __name__ == "__main__":
    execute_pipeline(my_pipeline, run_config=run_config)
n
Loops in a pipeline is not something Dagster supports
(yet)
e
Ok, do you have an estimation date?
n
I don't work on Dagster itself, just another new user ๐Ÿ™‚
I've just been poking them about loops too.
๐Ÿ˜‹ 1
e
Heheh ok thank you very much ๐Ÿ™‚
n
Basically when working with pipelines, remember the Python code is just a DSL for generating a purely static PipelineConfig object
So all values flowing around between solids are just placeholders
e
I see, thanks again
s
cc @cat re: loops
c
hey, yup iโ€™m currently working on it. donโ€™t have a firm eta yet. feel free to message me with your use case. the v0 is intended to only do one level of fan-out without a collect step
๐Ÿฆœ 1
e