https://dagster.io/ logo
Title
k

Kevin Haynes

01/28/2022, 9:39 PM
Hey y'all - I'm working on creating a new job that executes based on an S3 sensor, so I was following this example The sensor appears to work correctly. When I add new files it kicks off the job as I'd expect. I also followed this example to write a unit test, which passes when run locally with pytest. However, when pytest runs as part of our CI pipeline in GitHub, it throws a
DagsterInvalidConfigError
because the
slack
resource key is not defined. The only difference between my computer and the github action is the OS and the python version (I'm on 3.9.8 but the runner is 3.9.10). The same error is thrown while building the op context for the unit test written for the op itself. Another thing I noticed is that the S3 example (first link) uses the sensor context's
last_run_key
, but I notice the docs indicate that this is deprecated. Any ideas what might be going wrong or suggestions for how I can fix this? I'm attaching the console output for the failure in GitHub, the py file for the op/job/sensor as well as the one for the unit tests.
p

prha

01/28/2022, 10:28 PM
Hi Kevin. For the deprecated
last_run_key
, we recommend using the
context.cursor
instead (it’s more generic). The resulting snippet would look like this:
@sensor(job=my_job)
def my_s3_sensor(context):
    new_s3_keys = get_s3_keys("my_s3_bucket", since_key=context.cursor)
    if not new_s3_keys:
        yield SkipReason("No new s3 files found for bucket my_s3_bucket.")
        return
    for s3_key in new_s3_keys:
        yield RunRequest(run_key=s3_key, run_config={})
        context.update_cursor(s3_key)
k

Kevin Haynes

01/28/2022, 10:35 PM
Thanks @prha I didn't realize it was that simple to just slide the cursor in there and add the update step. It looks like this resolved my problem for the first test for the sensor itself (
test_s3_sensor
) but I'm still getting the same error on the other test for the op (
test_log_file
). Am I building the op context incorrectly, by chance?
p

prha

01/28/2022, 11:07 PM
Hmm, the op context looks okay, but it looks like it’s complaining about the resource config. Can you include the snippet for
lyn_slack_resource
?
if the
lyn_slack_resource
requires config, you either have to provide it in your
build_op_context
call (using
configured
), or provide it in the run_config (from the sensor’s
RunRequest
)
k

Kevin Haynes

01/31/2022, 4:15 PM
This is the definition for
lyn_slack_resource
@configured(slack_resource)
def lyn_slack_resource(_):
    return {"token": os.getenv("SLACK_TOKEN")}
The env variable is set in the actions using a github secret, so it doesn't require any config.
p

prha

01/31/2022, 4:54 PM
The error looks like it’s not finding a value for
token
in the config…. can you try:
@configured(slack_resource)
def lyn_slack_resource(_):
    return {"token": {"env": "SLACK_TOKEN"}}
Wondering if there’s a difference in the actual execution time environment and the definition time environment…
k

Kevin Haynes

01/31/2022, 5:02 PM
🤦 I should have known that. That's how I have our other environment-dependent resources configured. I think there was also a problem with how we were setting the environment variable in the github actions. With these two changes, the test now passes. Thank you @prha for the help!
p

prha

01/31/2022, 5:05 PM
no problem, glad that worked out!