https://dagster.io/ logo
#ask-community
Title
# ask-community
n

Nicolas May

07/05/2022, 7:45 PM
Hi channel! I've searched the docs, google, and this channel, but I can't seem to find how to do a basic task... I'm trying to materialize an asset result to GCS... Could someone point me to an example of code that successfully writes the result of a function decorated with
@asset
to a GCS bucket? Thx!
p

prha

07/05/2022, 7:52 PM
Hi Nicolas. I think you should be able to use the same snippet as outlined here, but instead of the
s3_pickle_io_manager
and
s3_resource
, you would use the
gcs_pickle_io_manager
and the `gcs_resource`: https://docs.dagster.io/concepts/io-management/io-managers#using-an-io-manager You can read more about using GCS for IO management here: https://docs.dagster.io/deployment/guides/gcp#using-gcs-for-io-management
n

Nicolas May

07/05/2022, 8:17 PM
Thanks @prha! To keep it super simple, I tried this...
Copy code
@asset
def rides():
    response = requests.get("<https://docs.dagster.io/assets/cereal.csv>")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]

assets_with_io_manager = with_resources(
    [rides],
    resource_defs={
        "io_manager": gcs_pickle_io_manager,
        "gcs": gcs_resource
    },
)
but it only writes locally
p

prha

07/05/2022, 8:44 PM
That’s odd…. I was just able to run that snippet and materialize it to GCS… can you confirm that it’s this code that’s being hit when you materialize?
n

Nicolas May

07/05/2022, 8:45 PM
I've added this to my dagster.yaml and about to test...
Copy code
resources:
    io_manager:
        config:
            gcs_bucket: my-cool-bucket
            gcs_prefix: good/prefix-for-files-
p

prha

07/05/2022, 8:46 PM
fwiw, I had to slightly change your code to add the configured bucket:
Copy code
assets_with_io_manager = with_resources(
    [rides],
    resource_defs={
        "io_manager": gcs_pickle_io_manager.configured({"gcs_bucket": "my_custom_bucket_name"}),
        "gcs": gcs_resource,
    },
)
n

Nicolas May

07/05/2022, 8:49 PM
Thanks I'll give that a try
I tried the
gcs_pickle_io_manager.configured(...)
approach... that also just results in local write
Writing to GCS works fine with
@job
decorator:
Copy code
@job(
    resource_defs={
        "gcs": gcs_resource,
        "io_manager": gcs_pickle_io_manager,
    },
    config={
        "resources": {
            "io_manager": {
                "config": {
                    "gcs_bucket": "my-bucket",
                    "gcs_prefix": "my-prefix",
                }
            }
        }
    }
)
def gcs_job():
    gcs_op()
Why is writing an asset any different? And what is the difference?
p

prha

07/05/2022, 9:01 PM
Are we sure that the
rides
asset is getting picked up by the repository correctly? When I ran your snippet, I got an error when the bucket wasn’t configured. There isn’t a fallback behavior in the gcs io manager to fall back on local writes, so I suspect that something isn’t getting picked up correctly. Can we rename
rides
just to sanity check?
n

Nicolas May

07/05/2022, 9:06 PM
Trying it now
Renamed
rides
to
cereal
... same local write
p

prha

07/05/2022, 9:18 PM
And you’re seeing the resource initialization for gcs and the io manager in the logs?
Just to sanity check, are you using a version of dagster >=
0.15.0
?
n

Nicolas May

07/05/2022, 9:30 PM
v0.15.2
Got it working... but here's the summary... using the
with_resources()
approach, only
io_manager
initializes:
Copy code
Starting initialization of resources [io_manager].
... but I get
Copy code
Starting initialization of resources [gcs, io_manager].
... when defining the resource in the
@asset
decorator and it works:
Copy code
@asset(
    resource_defs={
        "io_manager": gcs_pickle_io_manager.configured(
            {
                "gcs_bucket": "my-bucket",
                "gcs_prefix": "my-prefix",
            }
        ),
        "gcs": gcs_resource,
    },
)
def cereal():
    response = requests.get("<https://docs.dagster.io/assets/cereal.csv>")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]
"works" = "writes to GCS"
p

prha

07/05/2022, 9:40 PM
Definitely something strange going on with
with_resources
. cc @chris to take a look
n

Nicolas May

07/05/2022, 9:48 PM
Thanks for your help @prha!
n

Nicolas May

07/06/2022, 12:37 PM
Thanks @Oliver... I don't think these issues are related
c

chris

07/06/2022, 8:54 PM
hmm. Are you actually placing the result of
with_resources
into the repository?
I wasn't able to reproduce your error with gcs here, and I'm wondering if the reason is because you're placing the wrong definition in the repo
n

Nicolas May

07/06/2022, 8:56 PM
I tried placing the result of
with_resources
in the repo, but it errors... Says what's included in the repo must be a list of assets, jobs, etc.
c

chris

07/06/2022, 8:56 PM
like this?
Copy code
@repository
def the_repo():
    return [*with_resources(...)]
(
with_resources
produces a sequence, so you need to unpack it)
n

Nicolas May

07/06/2022, 8:57 PM
Ah... so I should include the result in the repo?
c

chris

07/06/2022, 8:57 PM
yes
n

Nicolas May

07/06/2022, 8:57 PM
I'll give that a shot, thanks!
c

chris

07/06/2022, 8:58 PM
with_resources
constructs a new asset definition
👍 1
n

Nicolas May

07/07/2022, 4:21 PM
Just to follow up @chris... placing the unpacked
with_resources
asset definition in the repo solved my problem... Thanks again!
c

chris

07/07/2022, 5:25 PM
great!