hey all :slightly_smiling_face: Is there any way ...
# ask-community
o
hey all 🙂 Is there any way to aggregate partitions eg. daily task aggregated by monthly task over all days? I see
PartitionMapping
(marked experimental). but it's not clear to me where that gets used and wether the io manager needs to handle it?
s
the default partition mapping should actually do this if you have a monthly task that depends on a daily task. i.e. the
asset_partitions_key_range
and
asset_partitions_time_window
methods on
InputContext
will return the correct values. we don't yet offer the ability to have a single run / job that materializes assets with different partitions definitions
o
ah cool, oddly i'm getting
ValueError: Tried to get asset partitions for an input that correponds to a partitioned asset that is not partitioned with a TimeWindowPartitionsDefinition
for the weekly partitioned asset in
load_input
and working around that by using the range I later get
dagster._core.errors.DagsterInvariantViolationError: Attempting to access asset_key, but it was not provided when constructing the OutputContext
I am defining the asset from a graph if that makes a difference
s
hmm - what version are you on? do you have a code snippet you'd be willing to share that I could try to use to reproduce?
o
version
Copy code
λ  reddit_scrape dagster --version
dagster, version 1.0.2
And repro -- comment line 47 to get the second behaviour
Copy code
import os
import dagster
from dagster import AssetKey, AssetsDefinition, GraphIn, GraphOut, IOManager, MetadataValue, WeeklyPartitionsDefinition, asset, graph, io_manager, op, repository, with_resources, fs_io_manager, DailyPartitionsDefinition
from psaw import PushshiftAPI
import pandas as pd

import datetime as dt
START_DATE = dt.datetime(2022, 6, 23)

from pathlib import Path

class LocalFileSystemIOManager(IOManager):
    """Translates between Pandas DataFrames and CSVs on the local filesystem.
    
    from -
    <https://docs.dagster.io/guides/dagster/software-defined-assets#connecting-assets-to-external-services>
    """

    def _get_fs_path(self, asset_key: AssetKey, partition_key: str) -> str:

        rpath = os.path.join('data', partition_key, *asset_key.path) + ".csv"
        rpath = Path(rpath).absolute()
        rpath.parent.mkdir(parents=True, exist_ok=True)
        return os.path.abspath(rpath)

    def handle_output(self, context, obj: pd.DataFrame):
        """This saves the dataframe as a CSV."""
        
        # context.add_output_metadata({
            # 'asset_partition_key_range': MetadataValue.text(str(context.asset_partition_key_range)),
        #     'asset_partitions_time_window': MetadataValue.text(str(context.asset_partitions_time_window)),
        # })
        fpath = self._get_fs_path(context.asset_key, context.partition_key)
        obj.to_csv(fpath)
        context.add_output_metadata({
            "path": MetadataValue.path(os.path.abspath(fpath)),
            "rows": <http://MetadataValue.int|MetadataValue.int>(len(obj))
        })
        


    def load_input(self, context):
        """This reads a dataframe from a CSV."""
        
        <http://context.log.info|context.log.info>({
            'asset_partition_key_range': str([key for key in context.asset_partition_key_range]),
            'asset_partitions_time_window': str(context.asset_partitions_time_window),
        })

        # y m d
        start_date, end_date = map(lambda x: dt.datetime(*map(int, x.split('-'))), context.asset_partition_key_range)

        results = []
        delta = dt.timedelta(days=1)
        while start_date <= end_date:
            fpath = self._get_fs_path(context.asset_key, start_date.strftime("%Y-%m-%d"))
            start_date += delta
            results.append(pd.read_csv(fpath))

        return pd.concat(results, ignore_index=True)
    
@io_manager
def csv_io():
    return LocalFileSystemIOManager()

def subreddit_scraper_factory(subreddit, start_date=None):


    if start_date is None:
        start_date= START_DATE
        # start_date= dt.datetime(2005, 6, 23)

    partition = DailyPartitionsDefinition(
        start_date, 
        minute_offset=0, 
        timezone=None, 
        fmt=None, 
        end_offset=0, 
        # tags_for_partition_fn=None
    )

    @asset(
        name=f'subreddit_{subreddit}',
        io_manager_key='pandas_to_table',
        partitions_def=partition
    )
    def subreddit_asset(context):
        
        # partition ~ `2022-08-10-00:00`
        partition = context.asset_partition_key_for_output()
        
        <http://context.log.info|context.log.info>(
            f"Processing asset partition '{partition}'"
        )

        y, m, d = partition.split('-')

        y, m, d = map(int, (y, m, d))
        # h, min = map(int, t.split(':'))

        start_time = dt.datetime(y, m, d)
        end_time = start_time + dt.timedelta(days=1)


        api = PushshiftAPI()

        posts_generator = api.search_submissions(
            after=int(start_time.timestamp()),
            before=int(end_time.timestamp()),
            subreddit=subreddit,
            filter=['id', 'url','author', 'title', 'subreddit'],
            # limit=
        )
        posts_dicts = list(map(lambda x: x.d_, posts_generator))
        posts = pd.DataFrame(posts_dicts)

        posts = posts.set_index('id')

        <http://context.log.info|context.log.info>(api.metadata_)


        return posts
    
    return subreddit_asset


def dataset_factory(subreddits, start_date=None):

    if start_date is None:
        start_date = START_DATE

    @op
    def subgraph(df):
        return df

    @op
    def collect(dfs):
        return dfs

    partition = WeeklyPartitionsDefinition(
        start_date, 
        minute_offset=0, 
        timezone=None, 
        # fmt="%d-%m-%Y", 
        end_offset=0, 
        # tags_for_partition_fn=None
    )
    
    @graph(
        name='dataset',
        ins={
            **{f'subreddit_{subreddit}':GraphIn(f'subredit_{subreddit}') for subreddit in subreddits}
        },
        out={"dataset": GraphOut(),}

    )
    def outer(**kwargs):
        # keys, values = kwargs.items()
        return collect(list(map(subgraph, kwargs.values())))
        
    outer = AssetsDefinition.from_graph(outer, 
        keys_by_input_name= {
            **{f'subreddit_{subreddit}':AssetKey(f'subreddit_{subreddit}') for subreddit in subreddits},
        },
        partitions_def=partition,
        # io_manager_key='pandas_to_table', ## How to set io_manager_key for asset from graph?

        resource_defs={
            "io_manager": csv_io,
        },
        keys_by_output_name={
            'dataset': AssetKey('dataset')
        }
        # partition_mappings=
    )

    

    return outer



@repository
def repository():

    return with_resources(
        [
            subreddit_scraper_factory('ProgrammerHumor'),
            subreddit_scraper_factory('science'),
            subreddit_scraper_factory('brisbane'),
            subreddit_scraper_factory('machinelearning'),
            
            dataset_factory((
                'ProgrammerHumor',
                'science',
                'brisbane',
                'machinelearning',
            ))
        ],
        
        resource_defs={
            "pandas_to_table": csv_io,
        },
    )
requirements.txt
Copy code
psaw==0.1.0
dagster==1.0.2
dagit==1.0.2
pandas==1.4.3
hmm I think I understand. setting the io manager in the resource defs of the
AssetsDefinition.from_graph
applies to internal steps and the io manager key needs to be set on the graph out which doesn't accept it and my io manager doesnt work for internal steps. I monkey patched this in
Copy code
class GraphOut(NamedTuple("_GraphOut", [("description", PublicAttr[Optional[str]]), ("io_manager_key", PublicAttr[Optional[str]])])):
    """
    Represents information about the outputs that a graph maps.

    Args:
        description (Optional[str]): Human-readable description of the output.
    """

    def __new__(cls, description: Optional[str] = None, io_manager_key: Optional[str] = None):
        return super(GraphOut, cls).__new__(cls, description=description, io_manager_key=io_manager_key)

    def to_definition(self, name: Optional[str]) -> "OutputDefinition":
        return OutputDefinition(name=name, description=self.description, io_manager_key=self.io_manager_key)
and it works, is there a suggested way? no uses the default io manager anway
s
I haven't yet had a chance to try this out - are you still facing this issue?
o
I have worked around it by defining a downstream asset that acts as an identity function by using @asset i can set the io_manager_key. The issue itself is just that I cannot see a way to set the assets io_manager when defining an asset using AssetsDefinition.from_graph, only the io_manager for the internal ops can be set