https://dagster.io/ logo
#ask-community
Title
# ask-community
g

geoHeil

03/30/2022, 7:28 AM
I am trying to modify my example SFTP sensor from https://github.com/geoHeil/dagster-asset-demo/blob/master/ASSET_DEMO/sensors/example_sensor.py#L89 to get the configuration for i.e. credentials https://docs.dagster.io/concepts/resources#providing-resources-to-a-job externally. I have introduced a
the_credentials
resource:
Copy code
python
@resource(config_schema={"username": str, "password": str})
def the_credentials(context):
    user_resource = context.op_config["username"]

    # TODO: perhaps it is better to read the password from the environment?
    pass_resource = context.op_config["password"]
    return user_resource, pass_resource

@sensor(job=log_file_job_remote, default_status=DefaultSensorStatus.RUNNING)
def my_directory_sensor_SFTP():
    with build_resources(
        { "credentials": the_credentials}
    ) as resources:
        user, password = the_credentials
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect('localhost', port=2222, username=user, password=password)
and also feed it to the sensor. Questions: 1) However, unlike for a normal job where I could i.e. set the config in dagit it is a bit unclear to me where to assemble the configuration of the sensor 2) regarding credentials: is it correct to assume that i.e. the password perhaps should be best read somewhere from the environment? 3)
@sensor(job=log_file_job_remote,
is triggering a job. When I instead want to trigger an asset which works with the file paths and perpas reads some CSV files and stores them in the database - how can I change this to
Assets
?
I think I need a:
Copy code
resource_defs = {
    "credentials": {
        'username': 'foo',
        'password': 'bar'
    },
}
similar to the resource definition above. But: 4) Where would be the best place to store this configuration? What I mean is: Assuming I want to have a generic SFTP Sensor (which could be reused) and outputs runrequests for unseen file names where should this resource_defs go?
It looks like: https://docs.dagster.io/_apidocs/libraries/dagster-ssh might be a useful underlying resource to build upon. But to me it is not yet fully clear how to do so.
Meanwhile (with the following changes):
Copy code
resource_defs = {
    "credentials": { 
        'config': {
            'username': 'foo',
            'password': 'bar'
         }
    }
}

@resource(config_schema={"username": str, "password": str})
def the_credentials(init_context):
    user_resource = init_context.resource_config["username"]

    # TODO: perhaps it is better to read the password from the environment?
    pass_resource = init_context.resource_config["password"]
    return user_resource, pass_resource


with build_resources(
        { "credentials": the_credentials}, resource_config=resource_defs
    ) as resources:
        credentials = resources.credentials
        user, password = credentials
allows to get it running again. But my questions about best practices are still open.
When trying to answer (4) for how to use dagster-ssh:
Copy code
resource_defs = {
    "ssh":{
        "config":{
            "remote_host": "localhost",
            "remote_port": 2222,
            "username": "foo",
            "password": "bar",
            "no_host_key_check": True
        }
    }
}

with build_resources(
        { "ssh": sshresource}, resource_config=resource_defs) as resources:
        ssh = resources.ssh
        ssh.get_connection()
Paramikro fails with:
Copy code
File "/path/to/example/sensors/sftp_sensor.py", line 150, in my_directory_sensor_SFTP
    ssh.get_connection()
  File "/path/to/conda_example_env/lib/python3.9/site-packages/dagster_ssh/resources.py", line 105, in get_connection
    client.connect(
  File "/path/to/conda_example_env/lib/python3.9/site-packages/paramiko/client.py", line 420, in connect
    if our_key != server_key:
  File "/path/to/conda_example_env/lib/python3.9/site-packages/paramiko/pkey.py", line 143, in __eq__
    return self._fields == other._fields

AttributeError: 'NoneType' object has no attribute '_fields'
This is quite confusing for me as:
Copy code
credentials = resources.credentials
user, password = credentials
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect('localhost', port=2222, username=user, password=password)
worked just fine before.
z

Zach

03/30/2022, 2:08 PM
Not sure about the dagster-ssh problem. I've had similar questions about where to store sensor config - for sensors that I want to be able to change the configuration without redeploying I've been storing configuration in AWS Systems Manager Parameter Store and retrieving it dynamically in the sensor evaluation. a downside to this is that a bad configuration deployment can cause erroneous sensor evaluations, so you have to build validation into your configuration deployment process.
g

geoHeil

03/31/2022, 2:36 PM
s

sandy

03/31/2022, 3:07 PM
2) regarding credentials: is it correct to assume that i.e. the password perhaps should be best read somewhere from the environment?
Yes - that's what I would recommend
3)
@sensor(job=log_file_job_remote,
is triggering a job. When I instead want to trigger an asset which works with the file paths and perpas reads some CSV files and stores them in the database - how can I change this to
Assets
?
The recommended pattern for this is to build the job from an asset group. I.e.
Copy code
my_asset_group = AssetGroup.from_modules(...)

@sensor(job=my_asset_group.build_job())
...
or
Copy code
my_asset_group = AssetGroup.from_modules(...)

@sensor(job=my_asset_group.build_job(selection=["asset1", "asset2"]))
...
Does that work for you?
1) However, unlike for a normal job where I could i.e. set the config in dagit it is a bit unclear to me where to assemble the configuration of the sensor
We don't currently have the capability to put config or resources on sensors. It's an issue that we're tracking and considering addressing in our next major release: https://github.com/dagster-io/dagster/issues/3794. If you take a look at Chris's comment on that issue, there's a workaround that allows effectively the same thing: https://github.com/dagster-io/dagster/issues/3794#issuecomment-1026339128
g

geoHeil

03/31/2022, 3:26 PM
1,2,3 are clear to me and look understandable. Do you have any thoughts regarding (4)?
s

sandy

03/31/2022, 4:18 PM
on (4), I'm not sure - is resource config not making it through somehow? are you able to e.g. add prints to see whether it's occurring?
g

geoHeil

03/31/2022, 4:19 PM
no - the error is outside my code. Something is None even though it should not be none (probably with regards to the key). But I do not even want to use the key and are passing username & password in the configuration as you can see.
@sandy regarding (4) https://github.com/geoHeil/dagster-ssh-demo I created this fully reproducible example here. Please can you take a look and comment why the minimal change (switch to the existing resource) is failing the code. So far I am still searching for the error.
s

sandy

04/01/2022, 2:52 PM
where exactly should I be looking inside that repository to see the comparison between what does work and what does not work?
g

geoHeil

04/01/2022, 5:43 PM
the readme explains the direct names of the files i.e. which one goes to which error https://github.com/geoHeil/dagster-ssh-demo
s

sandy

04/01/2022, 7:36 PM
got it - I found the two different examples. I noticed a difference between the implementation inside Dagster's ssh resource and the implementation of the resource that you implemented: we set
look_for_keys=False
while you do not. that could possibly be related to the error? the Dagster
ssh_resource
isn't heavily maintained, so if you have a resource of your own that works, I'd recommend using that
1
3 Views