https://dagster.io/ logo
#ask-community
Title
# ask-community
v

Vinnie

06/10/2022, 11:25 AM
Hi folks, bit of a general question regarding resource usage and if what I’m trying to do is the intended way: I have a sensor that needs to assume a role in a different account to check an S3 Access Point for new files. If a new file is found, it should trigger a job to fetch it and do stuff™ with it. My original plan was to have the s3 client as a resource, use the
build_resources()
function to pass it to the
get_s3_keys()
function and then to the job/op that needs to download it so I don’t need to assume the role twice. When passing this resource though, I get the error “Object of type S3 is not JSON serializable”. I assume this is because dagster tries to convert the S3 resource into YAML for the job configuration, so passing the s3 client is likely not the “correct” way to go about things. I wouldn’t like to pass the STS credentials, as, well, they’re credentials. Question is, what is the “proper” way to use this S3 Client pattern as a resource? Or to avoid having the AssumeRole call go out multiple times during execution? I couldn’t find anything in the docs, but is it possible to pass an argument to a job, so it can be forwarded to the ops? Something like the following.
Copy code
@job
def foo(s3_client):
    downloaded_file_path = download_from_s3(s3_client)
j

johann

06/10/2022, 3:45 PM
Hi Vinnie,
Object of type S3 is not JSON serializable
When the sensor emits a run request, that run will start in a new subprocess of gRPC container (or elsewhere if you’ve configured a run launcher, e.g. a K8s Job or ECS task). There’s no current way to share a resource across the process boundary. If you include the s3 client resource on the job (by just passing config rather than the actual python object), it will be reinitialized by every op that needs it (because by default with the
multiprocess_executor
, every op is in a new subprocess). Or, with the
in_process_executor
, the resource will only be initialized once for the run and shared across ops
I wouldn’t like to pass the STS credentials, as, well, they’re credentials
A few options here. A common approach is to surface them in the environment of wherever your job is running- might be done with something like vault, or by attaching secrets on K8s, etc. - then using
StringSource
in your resource to get the config from the env. Example config schema: https://github.com/dagster-io/dagster/blob/7ecd4b3d3b28d0852ebec92658442b1cbd15f03[…]odules/libraries/dagster-snowflake/dagster_snowflake/configs.py Example of using the schema, pointing at an env var: https://github.com/dagster-io/dagster/blob/7207a6e2dc3fd3a6e9705ca361b9f5a18204c1e[…]ules/libraries/dagster-snowflake/dagster_snowflake/resources.py
v

Vinnie

06/10/2022, 5:44 PM
Hi @johann, thanks for the reply. Surfacing them would mean building a
try-catch
block in my case since they time out after a few minutes. I guess I’ll just pass the role name and run the
AssumeRole
in the op that needs it. If I do need it again in subsequent ops I can turn the whole
AssumeRole
call into a single op and pass the returned value, that seemed to work for me. I’d just have liked to avoid running the AssumeRole in the sensor and then again when each job starts.
👍 1
j

johann

06/10/2022, 6:01 PM
Yeah we appreciate that feedback! It’s something we’ll definitely take in to account as we think about how to improve the workings of sensors
v

Vinnie

06/10/2022, 6:19 PM
Honestly though using dagster has been such a joy. I literally pulled the trigger on finally finishing the migration because of this use case since we have so many restrictions with the access point (and airflow sensors are terrible). After working with (rather, against) airflow for a while, dagster really feels like a breath of fresh air. I’m super happy with it and have been trying my damndest to get other teams to migrate.
❤️ 1
Hi @johann, this is something I noticed this morning that’s related to this conversation. I just flipped on all my schedules and they all failed; reason for that was I tried to pass a custom IOManager resource with the schedules rather than directly on the job decorator. I’m not sure if I would file this under “content gap”, but I noticed it takes some getting used to which resources can be passed to what. The problem was fairly obvious knowing what I know from our conversation, but maybe in the future you could think of splitting purely config-based resources from things like IOManagers or my S3 Resource attempt to avoid further confusion? Just throwing some ideas at the wall based on what I’ve been noticing. Either way, I do have to thank you and the whole team for being so supportive through and through! I’ll definitely be emailing the community email in a few months when the dust settles at my new role to do some writing on the top secret up-and-coming data platform 🙂
j

johann

06/14/2022, 2:19 PM
Will surface the feedback to the team 🙂
❤️ 1
s

sandy

06/14/2022, 2:40 PM
Hey @Vinnie - when you say you that you passed a custom IOManager with the schedules, do you mean that you constructed it using
build_resources
inside the decorated schedule function?
v

Vinnie

06/14/2022, 2:42 PM
Hey, this is what I had set (I think, it’s a few commits ago)
Copy code
schedules[f"{job_name}_schedule"] = ScheduleDefinition(
    name=f"{job_name}_schedule",
    job=run_datahub_pipeline,
    cron_schedule=job_options["schedule"],
    run_config={
        "resources": {"values": {"config": job_options},
                      "io_manager": dagster_bucket_io_manager}
    }
)
dagster_bucket_io_manager
being:
Copy code
@io_manager
def dagster_bucket_io_manager():
    return s3_pickle_io_manager(
        build_init_resource_context(
            config={"s3_bucket": os.getenv("S3_IO_BUCKET")},
            resources={"s3": s3_resource} # s3_resource being dagster_aws.s3.s3_resource
        )
    )
s

sandy

06/16/2022, 6:04 PM
Ah, got it. Makes sense.
👍 1
v

Vinnie

06/16/2022, 6:40 PM
Glad to be of help! Was there a different way to pass the resource that would work and I missed or is the functionality not supported?
s

sandy

06/16/2022, 8:23 PM
the way that we would recommend passing the resource is when you construct the job. e.g.
Copy code
run_datahub_pipeline = some_graph.to_job(resource_defs={"io_manager": dagster_bucket_io_manager}
or
Copy code
@job(resource_defs={"io_manager": dagster_bucket_io_manager})
def run_datahub_pipeline():
    ...
would that work for you?
v

Vinnie

06/17/2022, 6:37 AM
That was the solution I landed on after seeing the error, so yes 🙂 although to be fair the way I sketched this out is probably misusing what schedules are built for (which explains why the limitation would be there). In 0.15 terms, having it built like I did would mean that each schedule would define/materialize a completely different asset (each schedule dynamically loads from a different source into a separate sink/db table, while there’s only one job), so I likely have some refactoring to do in transforming
run_datahub_pipeline
into a graph and then generating assets and jobs instead of passing the configs through the schedule to the job. Overall I’d still argue a separation in resource definitions between purely config-based resources and “everything else” would make sense for added clarity, but having the asset documentation front and center made it extremely obvious that what I was doing wasn’t the recommended way to go, so maybe other people will just “get it” now.
s

sandy

06/17/2022, 6:38 PM
Got it - let me know if it would be helpful to talk through this on a Zoom some time. We're trying to make our docs more clear, so learning how people experience them and what they try is helpful for us
v

Vinnie

06/20/2022, 7:40 AM
@sandy I’ll DM you so we can set it up 🙂
3 Views