Hi, I’m new to orchestration tools like Dagster an...
# ask-community
c
Hi, I’m new to orchestration tools like Dagster and I’m trying to replicate one of our Fivetran Custom Connector data pipelines as a proof of concept. I have managed to get the data I want from the API and have it in a python dict. I then use json.dumps on the dict and output it in my asset. I have assigned the gcs_pickle_io_manager to the asset and it is loading the file successfully into my GCS bucket. The issue is then when I try to load it into Snowflake. It has a weird character at the beginning and end of the JSON string which is causing it to fail. Depending on which text editor I open it with it changes, but I think it is b-, which indicates that it is a byte string? How do I create the JSON file in the GCS bucket without the characters in the front? There seems to be limited documentation on the IO manager, so not sure if there are options there to change? Thanks!
r
gcs_pickle_io_manager will create python pickle files not json
I'm guessing this is happening:
Copy code
>>> d = {'a': 1, 'b':2}
>>> j = json.dumps(d)
>>> pickle.dumps(j)
b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x10{"a": 1, "b": 2}\x94.'
c
So does this mean we would need to develop our own IO managers for this?
Yes, I think those are the characters it is adding to the start and end
r
how are you loading the resulting file into snowflake?
out of dagster?
c
Yes, we want to load the file into Snowflake. I was looking at the Snowflake Library but it looked like you could only execute a query or load a file from a local parquet file. The API returns JSON data, so would assume we’d need to use execute a query for this. However, doing it this way you need to loop through the JSON to load them in one at a time, otherwise it loads the whole JSON file into a single row. Just seen that there’s another library now called Snowflake with pandas which I hadn’t seen before, maybe that can achieve loading it in without trying to go through a JSON file in cloud storage?
r
yeah that might be easier. few options here (I'm no dagster expert) • custom io manager that can read from gcs_pickles and write to snowflake • non-asset job that you write the json.dumps to gcs yourself • try the snowflake+pandas io manager for the whole thing and avoid intermediary
c
Ok, yeah, that does seem to match up with what I was thinking in the end, thanks. The reason we want it in JSON still is to be able to load directly into a VARIANT column in the raw table in Snowflake. We would then use dbt to stage the data, pulling it out of the variant column. I think this is much easier this way, because it is much more robust to any schema changes coming from the API responses, we can just store the data raw as JSON and then pull out whatever data we want in the staging view model. If we convert it to a dataframe, I’m not sure how this would work with the pandas IO manager as there isn’t much documentation on how things like schema changes are handled? I assume it just likes to load into a premade table, so it would error if the dataframe has changed from the original table definition?
r
yeah I'm not a fan of pandas dataframes either. hang around and you'll get an official reply from support. The io manager interface is quite flexible so someone who works at Dagster will have better ideas than me
c
Still really helpful to chat through it and confirm what I was originally thinking, thanks Ravi. Yeah would be good to see what the best method is for this as my experience is with Fivetran, which handles most of this for you, you just have to return it the data back in a specific JSON format.
Looks like if we knew the data structure and knew that it wouldn’t be changing then the pandas IO manager could be the solution, but not for this particular use case
s
I've used json lines (rather than pickling) as the serialization format for much of my data. A custom io manager that takes a list of dicts (or a pandas dataframe, or a generator function for large files that you don't want to pull into memory) and then outputs and inputs that format can be quite flexible. If you use the UPathIOManager, then you can output and input to local or cloud by just changing the
base_path
. Most of the cloud warehouses can deal with json lines files directly.
c
Hi Sean, just so I understand correctly. Are you suggesting using the UPathIOManager to output the JSON file to a GCS Bucket instead of using the GCS IO manager and then use the Snowflake IO Manager to execute the copy command into the relevant Variant table in Snowflake? or did you mean use UPathIO to save the json file locally, then use the Snowflake IO manager to execute a PUT command and load the file into an internal stage and then COPY it in from there instead?
s
I meant to send to GCS (or any other cloud location). However, you could stage things locally if that is more convenient. I tend to stage the files to GCS so that I can reuse as an asset for loading into other data stores (like clickhouse) if needed.
The cool thing about a UPathIOmanager is that simply changing the base path from "gs://..." to "s3://" will allow you to use the same logic to write to GCS, S3, HDFS, or even a local file.
I'm not a snowflake user, but with other data warehouses, there are often capabilities for reading directly from cloud storage. For loading to bigquery or clickhouse, for example, I simply have an op that issues the SQL query to load from cloud storage and then supplies an AssetMaterialization. No data transit back to the op.
c
Ah ok, yeah that sounds like my first solution of the 2 that I asked you about. Snowflake can copy directly from the cloud storage (whichever of the main 3 providers) directly into a table, which is similar to what you’ve said. I will have a look at the UPathIOManager and see if I can get it working, thanks!