https://dagster.io/ logo
Title
e

Eric

11/22/2021, 1:48 AM
Hey all, looking for a way to be able to pass **kwargs to a op. I see the documentation references passing kwargs for Resources. Maybe I missed it somewhere ?
s

Stefan Adelbert

11/22/2021, 2:09 AM
Hi @Eric I'm new to dagster, but I'm interested in your question. In your case, what would you want to pass to the
op
in the
**kwargs
?
My understanding of the framework is that an inputs to an
op
should be described in the
op
config schema or as
Inputs
. You could also give the
op
access to a resource which provides other information for the
op
to use. By adding additional parameters to an
op
means that dagster won't know how to call the
op
and provide relevant values for those parameters.
e

Eric

11/22/2021, 2:46 AM
Hi Stefan, thanks for the response. I should have provided an example for clarity. I have a op that simply reads in either a csv or xlsx file as a pandas data frame. depending on the type it'll either call the underlying read_csv or read_excel function. There will be some situations where I would like to define a schema (e.g {"col1": str, "col2": int, ... etc}) for the dataframe. there are also other keyword args for both of those functions I would like to use whenever needed. So rather than recreating all the parameters for the pandas functions, it would be great if I could simply pass kwargs instead and define them per job. Here is a quick example of the op:
@op(config_schema={
    "path": StringSource
})
def df_from_file(context, **kwargs):
    path = context.op_config["path"]
    
    if not os.path.exists(path):
        raise FileNotFoundError(f"Could not find file at path '{path}' ")
    
    if not path.lower().endswith((".csv", ".xlsx", ".xls")):
        raise ValueError(f"File '{path}' must be of type csv or excel")
    
    if path.endswith(".csv"):
        readf = pd.read_csv
    else:
        readf = pd.read_excel
    
    return readf(path, **kwargs)
s

Stefan Adelbert

11/22/2021, 3:22 AM
OK, so the
op
config contains a
path
, which is used to decide whether to call
read_csv
or
read_excel
. The nature of the
kwagrs
depends on the path too, i.e. you'll need to provide one set of
kwargs
if a CSV is being read and another set of
kwargs
if an Excel file is being read.
I don't think there is a mechanism in dagster to allow an
op
to be called with additional parameters as your example shows, but I'm new to dagster and so maybe I'm missing something. In your scenario, where would those
kwargs
come from? At runtime, what part of the system would be instantiating them? If those
kwargs
are somewhat static, you could create a resource which wraps a dict of dicts which has
csv
and
excel
as top level keys and the as their values two dicts of
kwargs
. And then you could make your
op
dependent on that resource and pull out the relevant
kwargs
depending on the type of the files being read.
Alternatively, you could specify the
kwargs
in the op config, and that might make sense if those
kwargs
need to change a fair amount (like between runs of the job). You could use a dagster [shape](https://docs.dagster.io/_apidocs/config#dagster.Shape) as the data type for the
kwargs
in the config schema, I think.
e

Eric

11/22/2021, 3:33 AM
Right, the path can be defined in the yaml file. That's not a problem. However, in the source file type (csv, or excel) is known when putting in the filepath. In the case of a csv (in my case) I would need the "sep" keyword argument for read_csv. In the case of an excel file (again in my case) I would need the "sheet" keyword arguement to specify a sheet other than the default. For the definition, I was planning on defining the dictionary in the job. But your suggestion of a central Resource is likely much more appropriate and cleaner. Here is an example of the test I was trying to run with the single solid for an excel file input. _Note the extra kwarg params (converters and sheet) that are not defined in the config_schema of the op._
def test_df_from_file_excel_with_converters():
    context = build_op_context(config={
        "path": r"some\path\data.xlsx"
    })

    schema = {
        "ID_NUMBER_COL": int,
        "OTHER_NUMBER_COL": str
    }

    df = df_from_file(context, converters=schema, sheet=2)

    assert df.shape[0] > 1
s

Stefan Adelbert

11/22/2021, 3:43 AM
I haven't written a test for an
op
that is dependent on a resource yet, but I will need to do that really soon. But the idea would be to create a context at test time using [build_op_context](https://docs.dagster.io/_apidocs/execution#dagster.build_op_context) and then you could pass in a resource to that. You could then monkey patch
pd
in your test case and then assert that the relevant function is called on
pd
with the correct
kwargs
from the resource. Something like this (treat this as pseudo code),
@resource
def dummy_resource:
    return {
        "csv": {"a": 1},
        "excel": {"b": 2}
    }

def test_df_from_file_excel_with_converters():
    my_ops.pd = mock.mock()
    context = build_op_context(config={
        "path": r"some\path\data.xlsx"},
        resources={"kwargs": dummy_resource}
    )
    df = my_ops.df_from_file(context)

    assert df.shape[0] > 1
    assert my_ops.pd.read_excel.call_count == 1
    assert my_ops.pd.read_excel.called_with(b=2)
m

Marcel M

11/22/2021, 8:46 AM
This is a prblem I’ve run into as well (being a Dagster Newbie). It seems like such a natural thing to do. @Stefan Adelbert it seems like you want to “call” the op twice, once for the CSV, once for the XSLX by prividing a kw argument. The solution I came up with was define an op factory (https://docs.dagster.io/concepts/ops-jobs-graphs/ops#op-factory). This is my code:
def ingest_factory(name):
    @op(name=f'ingest_{name}',
        required_resource_keys={"openac"},
        out={"df_name": Out(str), "df": Out(pd.DataFrame)}
        )
    def _op(context):
        df_name = name
        <http://context.log.info|context.log.info>(f"Ingesting {df_name} from OpenAC")
        df = context.resources.openac.get_dataset(df_name)

        yield Output(df_name, output_name='df_name')
        yield Output(df, output_name='df')

    return _op()
and then “call” it like:
@graph
def ingest_openac():
    targets = ['acties', 'activiteit_codes', 'verwijzingen']
    targets = ['normtijden', 'verrichting_codes']
    results = []
    for t in targets:
        df_name, df = ingest_factory(t)
        name = save_to_relwh(df_name, df)
        results.append(name)
        _, path = save_to_fswh(df_name, df)
        results.append(path)

    collect_openac_ingest(results)
My use is somewhat convoluted, in your case it would be something simple like:
res1 = my_op_factory('path to csv')
res2 = my_op_factory('path to xslx')
I won’t say this is idiomatic Dagster, but it fit my brain better than defining an additional resource. And worked better than using dynamic output (which I couldn’t fathom)
👍 1
s

Stefan Adelbert

11/22/2021, 11:39 PM
@Marcel M That's really useful. I hadn't read that part of the docs yet, but I can see how the op factory approach solves some other use cases I had in mind. Thanks for that. 🤔 Should your
ingest_factory
return
_op
instead of
_op()
, or is there something I'm missing? I think that the OP (@Eric) needs dynamic runtime behaviour from the
op
where the input is a path to a file, which could be CSV or Excel. As opposed to knowing ahead of time what the filetype is and building the graph accordingly. I'm not a fan of an
op
that has many possible execution paths - there is of course the potential for this
op
to have to keep changing to cater for ever more complex needs. It would be far better to have an
op
for each use case, i.e. one for CSV and a different one for Excel files. But then the problem becomes how to select the correct
op
at runtime based on the filetype that could change from one run to the next. I know there are some patterns in the dagster docs for branching logic in a graph (https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#conditional-branching), but it seemed a little clunky to me, particularly as the number of branches get beyond a few.
@Eric Have a look at
make_values_resource
(https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job) which is more or less the pattern I had in mind.
m

Marcel M

11/24/2021, 9:51 PM
@Stefan Adelbert “Should your 
ingest_factory
 return 
_op
 instead of 
_op()
 , or is there something I’m missing?” You are sharp-eyed! But possibly wrong 😉 I just tested:
return _op()     # works

return _op       # as in the docs, but fails (TypeError: cannot unpack non-iterable OpDefinition object)
Could this be a documentation bug?
s

Stefan Adelbert

11/24/2021, 10:36 PM
@Marcel M Touché! @sandy 👀
s

sandy

11/24/2021, 11:12 PM
@Marcel M - I think the issue is that you need to invoke the op produced by your factory inside the graph that it's part of. e.g.
@graph
def my_graph():
    csv_op = my_op_factory('path to csv')
    xlsx_op = my_op_factory('path to xslx')
    res1 = csv_op()
    res2 = xslx_op()
or, more concisely:
@graph
def my_graph():
    res1 = my_op_factory('path to csv')()
    res2 = my_op_factory('path to xslx')()
m

Marcel M

11/25/2021, 11:09 AM
@sandy That makes sense. I have verified that that works.