Eric
11/22/2021, 1:48 AMStefan Adelbert
11/22/2021, 2:09 AMop
in the **kwargs
?op
should be described in the op
config schema or as Inputs
. You could also give the op
access to a resource which provides other information for the op
to use.
By adding additional parameters to an op
means that dagster won't know how to call the op
and provide relevant values for those parameters.Eric
11/22/2021, 2:46 AM@op(config_schema={
"path": StringSource
})
def df_from_file(context, **kwargs):
path = context.op_config["path"]
if not os.path.exists(path):
raise FileNotFoundError(f"Could not find file at path '{path}' ")
if not path.lower().endswith((".csv", ".xlsx", ".xls")):
raise ValueError(f"File '{path}' must be of type csv or excel")
if path.endswith(".csv"):
readf = pd.read_csv
else:
readf = pd.read_excel
return readf(path, **kwargs)
Stefan Adelbert
11/22/2021, 3:22 AMop
config contains a path
, which is used to decide whether to call read_csv
or read_excel
. The nature of the kwagrs
depends on the path too, i.e. you'll need to provide one set of kwargs
if a CSV is being read and another set of kwargs
if an Excel file is being read.op
to be called with additional parameters as your example shows, but I'm new to dagster and so maybe I'm missing something.
In your scenario, where would those kwargs
come from? At runtime, what part of the system would be instantiating them?
If those kwargs
are somewhat static, you could create a resource which wraps a dict of dicts which has csv
and excel
as top level keys and the as their values two dicts of kwargs
. And then you could make your op
dependent on that resource and pull out the relevant kwargs
depending on the type of the files being read.kwargs
in the op config, and that might make sense if those kwargs
need to change a fair amount (like between runs of the job). You could use a dagster [shape](https://docs.dagster.io/_apidocs/config#dagster.Shape) as the data type for the kwargs
in the config schema, I think.Eric
11/22/2021, 3:33 AMdef test_df_from_file_excel_with_converters():
context = build_op_context(config={
"path": r"some\path\data.xlsx"
})
schema = {
"ID_NUMBER_COL": int,
"OTHER_NUMBER_COL": str
}
df = df_from_file(context, converters=schema, sheet=2)
assert df.shape[0] > 1
Stefan Adelbert
11/22/2021, 3:43 AMop
that is dependent on a resource yet, but I will need to do that really soon.
But the idea would be to create a context at test time using [build_op_context](https://docs.dagster.io/_apidocs/execution#dagster.build_op_context) and then you could pass in a resource to that.
You could then monkey patch pd
in your test case and then assert that the relevant function is called on pd
with the correct kwargs
from the resource.
Something like this (treat this as pseudo code),
@resource
def dummy_resource:
return {
"csv": {"a": 1},
"excel": {"b": 2}
}
def test_df_from_file_excel_with_converters():
my_ops.pd = mock.mock()
context = build_op_context(config={
"path": r"some\path\data.xlsx"},
resources={"kwargs": dummy_resource}
)
df = my_ops.df_from_file(context)
assert df.shape[0] > 1
assert my_ops.pd.read_excel.call_count == 1
assert my_ops.pd.read_excel.called_with(b=2)
Marcel M
11/22/2021, 8:46 AMdef ingest_factory(name):
@op(name=f'ingest_{name}',
required_resource_keys={"openac"},
out={"df_name": Out(str), "df": Out(pd.DataFrame)}
)
def _op(context):
df_name = name
<http://context.log.info|context.log.info>(f"Ingesting {df_name} from OpenAC")
df = context.resources.openac.get_dataset(df_name)
yield Output(df_name, output_name='df_name')
yield Output(df, output_name='df')
return _op()
and then “call” it like:
@graph
def ingest_openac():
targets = ['acties', 'activiteit_codes', 'verwijzingen']
targets = ['normtijden', 'verrichting_codes']
results = []
for t in targets:
df_name, df = ingest_factory(t)
name = save_to_relwh(df_name, df)
results.append(name)
_, path = save_to_fswh(df_name, df)
results.append(path)
collect_openac_ingest(results)
My use is somewhat convoluted, in your case it would be something simple like:
res1 = my_op_factory('path to csv')
res2 = my_op_factory('path to xslx')
I won’t say this is idiomatic Dagster, but it fit my brain better than defining an additional resource. And worked better than using dynamic output (which I couldn’t fathom)Stefan Adelbert
11/22/2021, 11:39 PMingest_factory
return _op
instead of _op()
, or is there something I'm missing?
I think that the OP (@Eric) needs dynamic runtime behaviour from the op
where the input is a path to a file, which could be CSV or Excel. As opposed to knowing ahead of time what the filetype is and building the graph accordingly.
I'm not a fan of an op
that has many possible execution paths - there is of course the potential for this op
to have to keep changing to cater for ever more complex needs. It would be far better to have an op
for each use case, i.e. one for CSV and a different one for Excel files. But then the problem becomes how to select the correct op
at runtime based on the filetype that could change from one run to the next. I know there are some patterns in the dagster docs for branching logic in a graph (https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#conditional-branching), but it seemed a little clunky to me, particularly as the number of branches get beyond a few.make_values_resource
(https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job) which is more or less the pattern I had in mind.Marcel M
11/24/2021, 9:51 PMingest_factory
return _op
instead of _op()
, or is there something I’m missing?”
You are sharp-eyed! But possibly wrong 😉
I just tested:
return _op() # works
return _op # as in the docs, but fails (TypeError: cannot unpack non-iterable OpDefinition object)
Could this be a documentation bug?Stefan Adelbert
11/24/2021, 10:36 PMsandy
11/24/2021, 11:12 PM@graph
def my_graph():
csv_op = my_op_factory('path to csv')
xlsx_op = my_op_factory('path to xslx')
res1 = csv_op()
res2 = xslx_op()
or, more concisely:
@graph
def my_graph():
res1 = my_op_factory('path to csv')()
res2 = my_op_factory('path to xslx')()
Marcel M
11/25/2021, 11:09 AM