Hi all, I’m having a difficult time to understand ...
# ask-community
r
Hi all, I’m having a difficult time to understand how assets work in an ETL pipeline. The tutorial demonstrates assets, but the data is kept within Dagster. However, in a real life scenario data is often stored in a database. In general, how should an ETL pipeline be defined in Dagster and what are the best practices? Some related questions that arise; 1. What should the asset return? Simply a copy of the loaded data? 2. Should all ETL steps be included in an asset? 3. Can an asset exists out of ops? E.g. one for each step in the ETL pipeline.
x
Hi Remco, I'm new with dagster and I had the same difficulty to understand assets. I can try to explain you that I have understood. (not sure all is right but I do my best). An asset represent which data and how you want to have them. It can pull data from an external source (like from an url) or from others assets output. The asset materialization will create data from the asset. How data are materialized (local file, s3, db) is the io_manager responsability) => the asset work only on data but not on the way it's materialzed. It can be only a copy of a loaded data or modify them (aggregation from several assets, filter...) An asset is a particular op, and it's materialization can be retrieve by it's AssetKey. You can make an ETL pipeline only with assets.
r
Thanks for clarifying Xavier. The dots seem to connect a bit more for me. So for an ETL pipeline one would have an asset for the Extracted data and an asset for the Transformed data. While the IOManager takes care of Loading the Transformed asset in a database and the Extracted asset in a local file?
x
yes
I wrote a simple standalone example with two assets : • the first 'titanic_source' download the csv from the titanic passengers • the second asset 'titanic_survivors' transform the previous data by filtering only the survivors the first asset uses builtin fs_io_manager which write the data on a local file the 2nd return a List[dict] handled by custom csv_io_manager which write it in a file
Copy code
import csv
import io
from csv import DictReader
from typing import List

import requests
from dagster import SolidExecutionContext, io_manager
from dagster.core.definitions.asset_in import AssetIn
from dagster.core.definitions.decorators.asset_decorator import asset
from dagster.core.definitions.decorators.repository_decorator import repository
from dagster.core.definitions.resource_definition import resource
from dagster.core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.execution.with_resources import with_resources
from dagster.core.storage.fs_io_manager import fs_io_manager
from dagster.core.storage.io_manager import IOManager


@resource()
def titanic_source(context):
    return (
        "<https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv>"
    )


class CsvIOManager(IOManager):
    def __init__(self, file_path: str) -> None:
        self._file_path = file_path
        super().__init__()

    def load_input(self, _: InputContext) -> List[dict]:
        with open(self._file_path) as f:
            r = DictReader(f, dialect="unix")
            return [row for row in r]

    def handle_output(self, context: OutputContext, obj: List[dict]) -> None:

        <http://context.log.info|context.log.info>(f"obj = {obj}")
        with open(self._file_path, "w") as f:
            writer = csv.DictWriter(f, fieldnames=obj[0].keys())
            writer.writeheader()
            for row in obj:
                writer.writerow(row)


@io_manager
def csv_io_manager():
    return CsvIOManager("data.csv")


@asset(
    name="extract_asset",
    io_manager_key="extract_io_manager",
    required_resource_keys={"source_url"},
)
def extract_asset(context: SolidExecutionContext):
    url = context.resources.source_url
    response = requests.get(url)

    if response.status_code != 200:
        raise requests.RequestException(f"cannot get url {url}")

    context.add_output_metadata({"url": url})

    return response.text


@asset(
    name="transform_asset",
    ins={"extract": AssetIn("extract_asset", input_manager_key="extract_io_manager")},
    io_manager_key="transform_io_manager",
)
def titanic_survivors(context: SolidExecutionContext, extract) -> List[dict]:
    file = io.StringIO(extract)
    data = DictReader(file, dialect="unix")
    survivors = [row for row in data if row["Survived"] == "1"]
    print(f"survivor = {survivors}")
    return survivors


assets_with_resources = with_resources(
    [extract_asset, titanic_survivors],
    resource_defs={
        "extract_io_manager": fs_io_manager.configured({"base_dir": "."}),
        "transform_io_manager": csv_io_manager,
        "source_url": titanic_source,
    },
)


all_assets_job = define_asset_job(
    name="all_assets_job",
    description="run all assets",
)


@repository
def repo():
    return [assets_with_resources, all_assets_job]
r
Thanks Xavier, this helps a lot! Greatly appreciate the time and effort you put into helping me :)
x
welcome Remco : ) I think the officials dagster samples are not relevant for a beginner and simple examples like that are better to understand the logic
good luck for your job and don't hesitate if you have some other questions
r
I agree! They lack in providing examples and best practices in the context of common use cases, e.g. ETL pipelines. You too, have a nice day!
❤️ 1
s
@Xavier BALESI’s answer is accurate, and I'll add some additional details.
1. Can an asset exists out of ops? E.g. one for each step in the ETL pipeline.
Am I understanding correctly that you want to be able to use multiple ops to create a single asset? Graph-backed assets let you accomplish this: https://docs.dagster.io/concepts/assets/software-defined-assets#graph-backed-assets. @Remco Loof - do you remember which docs pages you used to learn what you've learned so far? I'm asking because I'd like to edit them to clear up this question
👍 2
r
@sandy thanks for stepping in! The main goal of my question was to understand how dagster and in specific assets should be used to declare ETL pipelines. Since, I could not find a clear/concrete example that deals with a simple ETL pipeline where data is loaded in a database. The IOManager is not mentioned in the tutorial although I think it is an essential part of dagster. This is one of the pages along with the hacker_news project. The latter I found too complex and unclear for starters to derive how certain parts can be used for their own pipeline. Besides I believe that it lacks documentation for an example. For me and likely others it would be really helpful if the documentation or blog could include a simpler example of an ETL pipeline compared to the hacker_news project. For instance, one that loads data from the internet, consequently makes some transformations, and is loaded in a Postgres database using the IOManager. This would demonstrate the best practices for dagster and assets for a common use case. For instance, should each ETL step be included in an asset? Is the Loading part included in the asset definition or using the IOManager. It would also be good to include a more complex examples that deals with an incremental ETL pipeline (i.e. where only new data is extracted), since these are often used due to performance issues. Is this even possible (probably using Partitions)? Or does this violate the design principles of an asset? Thanks again for your help! Please let me know whether this clarifies what I am missing in the documentation 🙂
Just thought a bit more about it. I think it would be great and useful to include an example project structure with some explanations, e.g. similar to this one of FastAPI.