Hi Guys! I am very new with Dagster, actually I a...
# announcements
i
Hi Guys! I am very new with Dagster, actually I am evaluating with this cool technology. I am trying to setup a pipeline with external parameters. So i have read the documentation and watch some presentation on youtube and I saw some solution for external (not scheduled) based pipeline invocation. Well I am trying to simulate this scenario, but it is not clear for me how can I give parameters to this event based pipeline during development, because I always get error 😉. I am going to put some code, screenshot into the thread. Any help would be appreciated.
Copy code
@pipeline(
    mode_defs=[
        ModeDefinition("Live", resource_defs={"df_reader": dataframe_reader_for_pgsql}),
    ],
    input_defs=[InputDefinition(name="candidate", dagster_type=dict)],
)
def ignition_aggregate_pipeline(candidate):
    candidate = {  # This would be my preset
        "device_inventory_id": 1,
        "device_id_type": 2,
        "device_id_name": "58909",
        "proc_until": "2020-11-18 15:38:23+01",
        "receive_time": "2020-11-19 12:35:34+01",
    }
    ignition_agg_base = new_ignitions_in_range(candidate)
I try to invoke this:
Copy code
@solid(
    required_resource_keys={"df_reader"},
    input_defs=[InputDefinition(name="candidate", dagster_type=dict)],
    output_defs=[OutputDefinition(name="debug", dagster_type=int)],
)
def new_ignitions_in_range(context, candidate: dict) -> int:
I always get this error:
dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline ignition_aggregate_pipeline, received invalid type <class 'dict'> for input "candidate" (at position 0) in solid invocation "new_ignitions_in_range". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
My question is how can I develop with some predefined paramters? How can I pass it when I deployed my pipeline (deployed: I am not here yet 🙂 )
s
hi @Istvan Darvas - how are you executing your pipeline? with
execute_pipeline
? via the command line? in dagit?
i
Hi @sandy!
Copy code
if __name__ == "__main__":
    run_config = {}
    with open("dagster-test2_resources.yaml", 'r') as yaml_cfg:
        try:
            run_config = yaml.safe_load(yaml_cfg)
        except yaml.YAMLError as exc:
            print(exc)

    execute_pipeline(ignition_aggregate_pipeline, run_config=run_config)
I put it into the code, because i would like to use pycharm during development...so i can debug
this is why i would like to also setup a preset for development
but I am still not sure, i can pass parameters as arguments to the decorated pipeline function
s
one way you could accomplish this is:
Copy code
@pipeline(
    mode_defs=[ModeDefinition("Live", resource_defs={"df_reader": dataframe_reader_for_pgsql}),],
)
def ignition_aggregate_pipeline():
    ignition_agg_base = new_ignitions_in_range()


@solid(
    required_resource_keys={"df_reader"},
    output_defs=[OutputDefinition(name="debug", dagster_type=int)],
    config_schema={
        "device_inventory_id": int,
        "device_id_type": int,
        "device_id_name": str,
        "proc_until": str,
        "receive_time": str,
    },
)
def new_ignitions_in_range(context) -> int:
    candidate = context.solid_config


execute_pipeline(
    ignition_aggregate_pipeline,
    run_config={
        "solids": {
            "new_ignitions_in_range": {
                "device_inventory_id": 1,
                "device_id_type": 2,
                "device_id_name": "58909",
                "proc_until": "2020-11-18 15:38:23+01",
                "receive_time": "2020-11-19 12:35:34+01",
            }
        }
    },
)
i
maybe "this is the way" 😄
I created a test solid just returning my initial, development preset
if there is another, better way and there is someone who are not afraid to share it, it would be appreciated
c
@Istvan Darvas was wondering if the code snippet sandy sent above works for you?
i
@cat @sandy I have to admit, i did not noticed 😞 I am going to test it immediately and thanks you warn me again.
This will work, I can see the logic thanks, I just need to rewrite that function, so in this case I need get these parameters from the context pass around
i am going to do this way
Thanks guys
s
glad to help!
👍 1
i
It's really working 😉
@sandy thanks dude!
c
awesome 👍 🎉