Glen Aultman-Bettridge
02/21/2024, 11:56 PMScout
02/21/2024, 11:56 PMSSHResource
from the dagster-ssh
library, you can define an asset that requires the ssh
resource and then provide the configuration for the SSHResource
when you instantiate your asset with the with_resources
function. Here's an example from the documentation:
python
from dagster import asset, with_resources
from dagster_ssh import ssh_resource
@asset(required_resource_keys={'ssh'})
def my_asset(context):
context.resources.ssh.sftp_get("/path/to/remote.csv", "path/to/local.csv")
assets = with_resources([my_asset], resource_defs={
'ssh': ssh_resource.configured({
'remote_host': '<http://foo.com|foo.com>',
'key_file': 'path/to/id_rsa',
})
})
In this example, the my_asset
function is an asset that requires an SSH resource to perform an SFTP operation. The ssh_resource
is then configured with the necessary parameters such as remote_host
and key_file
. The with_resources
function is used to bind the ssh_resource
configuration to the asset.
Remember to replace '<http://foo.com|foo.com>'
with the actual remote host you want to connect to and 'path/to/id_rsa'
with the path to your SSH key file. Additional configuration options such as username
, password
, and others can also be provided based on your requirements. working example of ssh asset in Dagster button migrate dagster-ssh ssh_resource to Pyda... button SSH/SFTP | Dagster Integrations button Resource issue button Thank you for your feedback!Glen Aultman-Bettridge
02/21/2024, 11:57 PMScout
02/21/2024, 11:57 PMfrom_env
method provided by the dagster-ssh
library to pull these values from the environment at runtime. Here's how you can modify the previous example to use environment variables for the username and password:
python
from dagster import asset, with_resources
from dagster_ssh import ssh_resource
@asset(required_resource_keys={'ssh'})
def my_asset(context):
context.resources.ssh.sftp_get("/path/to/remote.csv", "path/to/local.csv")
assets = with_resources([my_asset], resource_defs={
'ssh': ssh_resource.configured({
'remote_host': '<http://foo.com|foo.com>',
'username': {'env': 'SSH_USERNAME'},
'password': {'env': 'SSH_PASSWORD'},
'key_file': 'path/to/id_rsa',
})
})
In this configuration, 'SSH_USERNAME'
and 'SSH_PASSWORD'
are the names of the environment variables that store the SSH username and password, respectively. The {'env': 'SSH_USERNAME'}
and {'env': 'SSH_PASSWORD'}
syntax tells Dagster to fetch the values for these fields from the specified environment variables at runtime.
Make sure that the environment variables SSH_USERNAME
and SSH_PASSWORD
are set in the environment where your Dagster code is running. This can be done by exporting them in your shell before running the Dagster code, or by setting them in a .env
file or a similar configuration if you're using a tool that supports this. migrate dagster-ssh ssh_resource to Pyda... button ssh resource connection fails when using... button Environment Variables in Python | Dagste... button Resources | Dagster button Thank you for your feedback!Glen Aultman-Bettridge
02/22/2024, 12:10 AMScout
02/22/2024, 12:10 AMdagster-ssh
library provides an option to disable host key verification via the no_host_key_check
configuration parameter. By default, this is set to True
, which means that host key verification is not performed. This is useful for development environments but is not recommended for production due to security concerns. If you want to perform host key verification, you would set the no_host_key_check
parameter to False
. When set to False
, the SSHResource
will perform host key verification against the known hosts file on the system where the code is running. Here's an example of how to configure the SSHResource
to perform host key verification: python from dagster import asset, with_resources from dagster_ssh import ssh_resource @asset(required_resource_keys={'ssh'}) def my_asset(context): context.resources.ssh.sftp_get("/path/to/remote.csv", "path/to/local.csv") assets = with_resources([my_asset], resource_defs={ 'ssh': ssh_resource.configured({ 'remote_host': '<http://foo.com|foo.com>', 'username': {'env': 'SSH_USERNAME'}, 'password': {'env': 'SSH_PASSWORD'}, 'key_file': 'path/to/id_rsa', 'no_host_key_check': False, # Enable host key verification }) })
With no_host_key_check
set to False
, the SSHResource
will check the host key against the known hosts file. If the host key is not found or does not match, the connection will fail. This helps prevent man-in-the-middle attacks by ensuring that the SSH client is connecting to the correct server. For the host key verification to work, the known hosts file must contain the host key for the server you are connecting to. This file is typically located at ~/.ssh/known_hosts
on Unix-like systems. You can add a host key to the known hosts file by connecting to the server with an SSH client, which will prompt you to verify the host key and, upon acceptance, add it to the file. Please note that the specifics of the dagster-ssh
library's behavior and configuration options may have changed after my knowledge cutoff date, so I recommend checking the latest dagster-ssh
documentation for the most up-to-date information.Glen Aultman-Bettridge
02/22/2024, 1:31 AMScout
02/22/2024, 1:31 AMSSHResource
for SFTP access and the S3Resource
for AWS S3 access. Then you would create an op that uses these resources to perform the file transfer.
Here's a step-by-step guide on how to set this up:
1. Define the SSHResource
for SFTP access as shown in previous examples.
2. Define the S3Resource
using the dagster-aws
library, which provides a resource for interacting with AWS S3. You will need to provide AWS credentials and the S3 bucket name.
3. Create an op that uses both the ssh
and s3
resources to transfer a file from the SFTP server to the S3 bucket.
Below is an example of how you might set this up:
python
from dagster import op, job, with_resources
from dagster_aws.s3 import s3_resource
from dagster_ssh import ssh_resource
# Define the op that performs the file transfer
@op(required_resource_keys={'ssh', 's3'})
def transfer_file_from_sftp_to_s3(context, sftp_filepath, s3_key):
# Use the SSHResource to download the file from the SFTP server
with context.resources.ssh.sftp_client() as sftp:
with sftp.open(sftp_filepath, 'rb') as sftp_file:
file_data = sftp_file.read()
# Use the S3Resource to upload the file to the S3 bucket
context.resources.s3.put_object(Bucket=context.resources.s3_bucket, Key=s3_key, Body=file_data)
<http://context.log.info|context.log.info>(f"File {sftp_filepath} transferred to S3 bucket {context.resources.s3_bucket} with key {s3_key}")
# Define the job with the required resources
@job(resource_defs={
'ssh': ssh_resource.configured({
'remote_host': '<http://sftp.example.com|sftp.example.com>',
'username': {'env': 'SSH_USERNAME'},
'password': {'env': 'SSH_PASSWORD'},
'key_file': 'path/to/id_rsa',
'no_host_key_check': False,
}),
's3': s3_resource.configured({
'region_name': 'us-west-2',
'use_unsigned_session': False,
's3_bucket': 'my-s3-bucket',
's3_prefix': 'my-s3-prefix',
})
})
def my_transfer_job():
transfer_file_from_sftp_to_s3()
# Run the job
if __name__ == "__main__":
result = my_transfer_job.execute_in_process(
run_config={
'ops': {
'transfer_file_from_sftp_to_s3': {
'inputs': {
'sftp_filepath': '/path/to/remote/file.txt',
's3_key': 'path/in/s3/file.txt',
}
}
}
}
)
In this example, the transfer_file_from_sftp_to_s3
op takes two parameters: sftp_filepath
which is the path to the file on the SFTP server, and s3_key
which is the key (path) where the file should be stored in the S3 bucket.
The s3_resource
is configured with the necessaryScout
02/22/2024, 1:32 AMssh_resource
is configured as before, with the addition of the SFTP server details. Please ensure that the AWS credentials provided have the necessary permissions to write to the specified S3 bucket. Also, ensure that the environment variables SSH_USERNAME
and SSH_PASSWORD
are set with the correct values for the SFTP server. Remember to replace placeholder values like <http://sftp.example.com|sftp.example.com>
, path/to/id_rsa
, my-s3-bucket
, and my-s3-prefix
with your actual SFTP server details, SSH key path, S3 bucket name, and S3 prefix. Finally, the job is executed with the execute_in_process
method for testing purposes. In a production environment, you would typically execute the job using Dagster's scheduler or other execution environment. Please note that the specifics of the dagster-aws
and dagster-ssh
libraries' behavior and configuration options may have changed after my knowledge cutoff date, so I recommend checking the latest documentation for the most up-to-date information.