https://dagster.io/ logo
#ask-ai
Title
# ask-ai
n

Nehil Jain

02/24/2024, 1:18 AM
Is there a postgres resource available? Like snowflake resource? If not what would a custom implementation look like?
s

Scout

02/24/2024, 1:18 AM
As of the last update from the documentation, there isn't a specific mention of a dedicated PostgreSQL resource analogous to the
SnowflakeResource
provided for Snowflake. However, Dagster integrates well with various databases through the use of resources and I/O managers, and it is common to use SQLAlchemy for database interactions, which can include PostgreSQL. For a custom implementation to interact with PostgreSQL, you would typically define a resource that creates and manages a connection to your PostgreSQL database. This resource can then be used within your ops or assets to perform database operations. Here's a conceptual example based on common practices with database resources in Dagster:
Copy code
python
from dagster import resource, Field, String
import psycopg2

@resource({
    'username': Field(String),
    'password': Field(String),
    'hostname': Field(String),
    'database': Field(String),
})
def postgres_resource(context):
    conn = psycopg2.connect(
        dbname=context.resource_config['database'],
        user=context.resource_config['username'],
        password=context.resource_config['password'],
        host=context.resource_config['hostname']
    )
    return conn

# Usage in an op
@op(required_resource_keys={'postgres'})
def my_op(context):
    conn = context.resources.postgres
    # Perform database operations with conn
In this example,
psycopg2
is used as the PostgreSQL client for Python, and the
postgres_resource
function is decorated with
@resource
, which allows you to configure the connection parameters. The
my_op
function is an example of an op that requires the
postgres
resource and uses it to interact with the database. Please note that this is a conceptual example and not directly from the documentation. For actual implementation, you would need to handle connection pooling, error handling, and other best practices for database connections. Additionally, you might want to use SQLAlchemy as it provides an ORM layer and is commonly used in the Python ecosystem for database interactions, including with PostgreSQL. dagster-snowflake integration reference button Dagster Docs button How do I ingest data into my data wareho... button Snowflake | Dagster Integrations button Thank you for your feedback!
t

Tim Castillo

02/24/2024, 1:22 AM
Apologies for the outdated docs that Scout gave you this time! We don’t have a deliberate PostgresResource, but if you want to get going quickly without having to write one yourself, you can use a standard
SQLAlchemy
instance or any other object to query Postgres and then write a
ConfigurableResource
when you need it (ex. needing to configure the connection at the start of a run)
n

Nehil Jain

02/24/2024, 1:25 AM
Thanks @Tim Castillo giving this a read now
@Scout rewrite the following code as a dagster dag.
Copy code
from tqdm import tqdm

import os
import zipfile
from urllib.request import urlopen
from io import BytesIO


def zipfile_from_github(repo_url):
    http_response = urlopen(repo_url)
    zf = BytesIO(http_response.read())
    return zipfile.ZipFile(zf, "r")


def understand_files_in_repo(repo_url):
    with zipfile_from_github(repo_url) as zip_ref:
        zip_file_list = zip_ref.namelist()
        output = {}
        pbar = tqdm(zip_file_list, desc=f"Total tokens: 0")
        for file_name in pbar:
            if (
                file_name.endswith("/")
                or any(f in file_name for f in [".DS_Store", ".gitignore"])
                or any(file_name.endswith(ext) for ext in [".png", ".jpg", ".jpeg"])
            ):
                continue
            else:
                with zip_ref.open(file_name, "r") as file:
                    file_contents = str(file.read())
                    file_name_trunc = str(file_name).replace("the-algorithm-main/", "")
                    # TODO: Extract metadata from the file
                    output[file_name_trunc] = file_contents
    return output
s

Scout

03/01/2024, 4:08 PM
To rewrite the provided Python script as a Dagster job, we'll define two assets: one for downloading and extracting the zip file from GitHub, and another for processing the files within the zip file. We'll use the
@asset
decorator to define these assets. Here's how you might structure the code as a Dagster job:
Copy code
python
from dagster import asset, IOManager, io_manager
from urllib.request import urlopen
from io import BytesIO
import zipfile
from tqdm import tqdm

class InMemoryIOManager(IOManager):
    def __init__(self):
        self.values = {}

    def handle_output(self, context, obj):
        self.values[context.asset_key] = obj

    def load_input(self, context):
        return self.values[context.upstream_output.asset_key]

@io_manager
def in_memory_io_manager(_):
    return InMemoryIOManager()

@asset(io_manager_key="in_memory_io_manager")
def zipfile_from_github():
    repo_url = "<https://github.com/the-algorithms/Python/archive/refs/heads/main.zip>"
    http_response = urlopen(repo_url)
    zf = BytesIO(http_response.read())
    return zipfile.ZipFile(zf, "r")

@asset(io_manager_key="in_memory_io_manager")
def understand_files_in_repo(zip_ref):
    zip_file_list = zip_ref.namelist()
    output = {}
    pbar = tqdm(zip_file_list, desc=f"Total tokens: 0")
    for file_name in pbar:
        if (
            file_name.endswith("/")
            or any(f in file_name for f in [".DS_Store", ".gitignore"])
            or any(file_name.endswith(ext) for ext in [".png", ".jpg", ".jpeg"])
        ):
            continue
        else:
            with zip_ref.open(file_name, "r") as file:
                file_contents = str(file.read())
                file_name_trunc = str(file_name).replace("the-algorithms-main/", "")
                # TODO: Extract metadata from the file
                output[file_name_trunc] = file_contents
    return output
In this example, we've created an
InMemoryIOManager
to pass data between assets. The
zipfile_from_github
asset fetches and returns the zip file from the specified GitHub URL. The
understand_files_in_repo
asset processes the files within the zip file, skipping certain files based on the provided conditions. Please note that the
tqdm
progress bar may not work as expected in the context of a Dagster job, especially when running in a non-interactive environment like Dagster's Dagit UI. You might want to replace it with Dagster's logging functionality to track progress. Also, the
repo_url
is hardcoded in this example. In a real-world scenario, you would likely want to pass this as a configuration to your assets. This is a conceptual example to illustrate how you might structure your code as a Dagster job. Depending on your specific requirements and environment, you may need to adjust the code a
ccordingly.