Hi, could somebody help me here? <https://dagster....
# ask-community
r
Hi, could somebody help me here? https://dagster.slack.com/archives/C01U954MEER/p1690719637014879 I think my current code is not the best (and not right) way for my use case:
Copy code
from datetime import datetime
import os
from stat import S_ISDIR, S_ISREG
import zipfile
from paramiko import SFTPClient, SSHClient
import pandas as pd

import requests
from dagster import FilesystemIOManager, FreshnessPolicy, Out, op, asset, graph, with_resources, OpExecutionContext
from dagster_ssh import SSHResource, ssh_resource

def list_files(sftp_client: SFTPClient, path='.') -> list[tuple[str, datetime]]:
    file_list = []
    for entry in sftp_client.listdir_attr(path):
      mode = entry.st_mode
      full_path = os.path.join(path, entry.filename)
      if S_ISDIR(mode):
        file_list.extend(list_files(sftp_client, full_path))
      elif S_ISREG(mode):
        creation_date = datetime.fromtimestamp(entry.st_mtime)
        file_list.append((full_path, creation_date))
    return file_list

@asset(required_resource_keys={'cj_ssh'}, freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60))
def file_paths_from_sftp(context: OpExecutionContext) -> list[str]:
    cj_ssh: SSHResource = context.resources.cj_ssh
    ssh_client: SSHClient = cj_ssh.get_connection()
    sftp_client = ssh_client.open_sftp()
    
    files = list_files(sftp_client)

    # Sort by creation date
    files.sort(key=lambda x: x[1])

    <http://context.log.info|context.log.info>(f"Found {len(files)} files on SFTP server")
    return [file[0] for file in files]

@asset(required_resource_keys={'cj_ssh'})
def files_from_sftp(context: OpExecutionContext, file_paths_from_sftp: list[str]) -> list[tuple[str, str]]:
    cj_ssh: SSHResource = context.resources.cj_ssh
    ssh_client: SSHClient = cj_ssh.get_connection()
    sftp_client = ssh_client.open_sftp()
    
    if not os.path.exists("zips"):
       os.mkdir("zips")
    for file_path in file_paths_from_sftp:
      local_file = "zips/" + os.path.basename(file_path)
      <http://context.log.info|context.log.info>(f"Downloading file from {file_path} to {local_file}")
      sftp_client.get(file_path, local_file)
      <http://context.log.info|context.log.info>(f"Downloaded file from {file_path} to {local_file}")
      yield (local_file, file_path)

@asset()
def csvs_from_sftp(context: OpExecutionContext, files_from_sftp: list[tuple[str, str]]) -> list[str]:
    if not os.path.exists("csvs"):
       os.mkdir("csvs")
    for file in files_from_sftp:
       local_file = file[0]
       remote_file = file[1]
       with zipfile.ZipFile(local_file,"r") as zip_ref:
          for file in zip_ref.filelist:
             if file.filename.lower().endswith(".txt") or file.filename.lower().endswith(".csv"):
                csv_path = zip_ref.extract(file, "csvs")
                yield csv_path
             else:
                <http://context.log.info|context.log.info>(f"Skipping file {file.filename} in zip {local_file}")

@asset()
def csv_data_from_csvs(csvs_from_sftp: list[str]) -> list[pd.DataFrame]:
    for csv in csvs_from_sftp:
       yield pd.read_csv(csv)