https://dagster.io/ logo
Title
j

Joe Schmid

11/08/2021, 8:24 PM
Hello, I am trying to get an AssetMaterialization test running based on the website, but am running into the following after using:
from dagster import job, op, Output
o

owen

11/08/2021, 8:58 PM
what's the error you're seeing? just based on the import statement, you probably want to import
AssetMaterialization
from dagster as well, if you haven't already
j

Joe Schmid

11/08/2021, 9:00 PM
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "my_materialization_op": The above exception was caused by the following exception: NameError: name 'AssetMaterialization' is not defined
Trying to import now...will test and let you know.
Okay, it looks like it should be working...however, I'm not seeing anything updating in the remote_storage_path location:
o

owen

11/08/2021, 9:24 PM
as written, this op will just output the object that is inside
remote_storage_path
, which in this case is the string
'.csv'
. if you want dagster to write the contents of
df
to a remote storage system, you have a few options. What remote storage system are you using, and how do you usually store stuff there?
j

Joe Schmid

11/08/2021, 9:27 PM
Ah, that makes sense. At this time, I am just trying to save locally (same folder as the read_csv). Eventually, we'll use Azure.
o

owen

11/08/2021, 9:33 PM
ah ok! what does the pipeline that you're trying to build look like? will you be using this output (or the remote_storage_path) in other ops? or do you just want to read the data in this file, modify it, and store it remotely all in one op?
j

Joe Schmid

11/08/2021, 9:35 PM
For this scenario, I was hoping to read, modify, and store remotely all in one. This is my first try with materializations, so still exploring all available options.
o

owen

11/08/2021, 9:40 PM
for the local case, you can just add a
<http://df.to|df.to>_csv("your_chosen_path.csv")
before your AssetMaterialization event, which will store it to whatever local path you want. to store remotely, you'll probably want to get the contents of the df as a csv-formatted string (which would be
csv_string = <http://df.to|df.to>_csv()
), at which point you'd have to call the azure API to upload that string to a specific location
j

Joe Schmid

11/08/2021, 9:41 PM
Sounds good...appreciate the help!
o

owen

11/08/2021, 9:41 PM
no problem 👍
j

Joe Schmid

11/08/2021, 9:46 PM
Sorry, a couple questions from one of my coworkers: Do I have it correct that AssetMaterializations are for persisting op outputs for other downstream op’s to be able to use? Also, is it best to use AssetMaterializations in prod only, and skip them for dev/testing?
o

owen

11/08/2021, 10:03 PM
AssetMaterializations are just events that let Dagster know what's going on inside your job. They'll be stored in the event database, and will help you visualize/track how assets are changed/updated over time. Depending on your needs and how you deploy your jobs, it might be useful to emit AssetMaterializations in all environments, or just prod, but there's no single right answer there.
The abstraction used for persisting op outputs for other downstream ops to use is called an IOManager
these are useful, because it means you can define two io_managers, one that stores a pandas dataframe to a local file on your machine, and another one that stores that dataframe to a blob on Azure, and just switch out the implementation between your dev / prod environments (no need to change code inside the ops)
j

Joe Schmid

11/09/2021, 7:17 PM
Hi Owen, I'm trying to use fs_io_manager for local testing and am getting the following error. Am I approaching the base path in the incorrect way? (Dagster documentation snapshot included too)
o

owen

11/09/2021, 7:22 PM
when you call configured, you're setting specific configurable fields on the fs_io_manager. One of those configurable fields is "base_path", which is base directory under which it will store all of its files. The argument to this
configured
function is a dictionary where each key is the name of a configurable field, and the value is the value you want to set it to. So in this case you'd want this to be
fs_io_manager.configured({"base_path": "/mnt/c/psf/dagstertesting/"})
(i.e. "set base_path to be /mnt/c/...")
j

Joe Schmid

11/09/2021, 7:28 PM
Okay, thank you...I originally had it set up like what you suggested, but got the same error message that I just sent. Anything else I might be missing from this?
o

owen

11/09/2021, 7:32 PM
ah sorry, this should be "base_dir", not "base_path". apologies for the confusion/bad docs example
j

Joe Schmid

11/09/2021, 7:36 PM
Ah, that worked...very exciting, thanks!
Is there a way to have the job output something other than File? (i.e .csv ?)
o

owen

11/09/2021, 10:32 PM
@Joe Schmid there is, but not a built-in one. you would need to write your own IOManager that implements this functionality. This would probably look very similar to the default PickledObjectFilesystemIOManager , but the handle_output function would write to a csv directly (rather than serializing the object using pickle and writing it to a generic file).
j

Joe Schmid

11/09/2021, 10:33 PM
Got it, thanks!
Hi @owen, for the sample extract job below, would you recommend any changes to the job/op structure?  And where would AssetMaterializations be used?
job: ODBC Extract DAG

op: open connection to ODBC data source

op: pull table list & definitions

op: for each table:
- read rows into dataframe
- save dataframe to csv
- save dataframe to SQL table in db example-db-yyyymmdd (create db if it does not exist)

op: send notification to slack
o

owen

11/17/2021, 5:50 PM
hi @Joe Schmid! In general, this seems like a pretty reasonable setup. I'm not familiar with ODBC, but do you need to pass that opened connection object to the ops that interface with your database? if so, I'd recommend using a resource for that functionality, and adding it as a
required_resource_key
on the ops that talk to the database. This has a few benefits, including allowing you to substitute this resource for a mock one if you want to test your ops. As for the AssetMaterializations, you can optionally yield an AssetMaterialization every time that you save a dataframe to a SQL table. This won't impact the functionality of your job in any way, and is totally optional, but it will help Dagster keep track of changes to that table over time, which might be useful. You could also yield AssetMaterializations for those saved csv files if you want those to be tracked as well.
You might also want to look at the Dynamic Mapping functionality. This would let you write the "for each table" op in a way that just processes a single table at a time, and then Dagster will just make a copy of that op for each of the tables in the table list. This way, multiple of these ops can run at the same time, and if one fails, you don't have to restart the entire job, just the one that didn't succeed. Might not be a concern for your use case, but figured I'd mention
j

Joe Schmid

11/18/2021, 11:50 PM
Thanks, Owen! We'll try it out.