Hi, I am new to dagster, so sorry for the basic qu...
# ask-community
b
Hi, I am new to dagster, so sorry for the basic question: I have several json endpoints that need to be scraped regularly and get processed by the same logic. I can make an
asset
for each endpoint but then I need to copy paste the same boilerplate code:
Copy code
@asset
def fetch_one():
        return requests.get("<http://example.com/one>").json()

@asset
def fetch_two():
        return requests.get("<http://example.com/two>").json()
    
@asset
def clean_one(fetch_one):
    df = pd.json_normalize(fetch_one)
    return df.dropna()

@asset
def clean_two(fetch_two):
    df = pd.json_normalize(fetch_two)
    return df.dropna()
What would be the recommended way to avoid that? • Do I work with a
config
like https://docs.dagster.io/concepts/configuration/config-schema where each endpoint is a different
RunConfig
? That seems a bit heavy. • Another option is maybe with
context
and
multi-asset
like https://docs.dagster.io/concepts/assets/multi-assets#subsetting-multi-assets ? • Or do I use an asset-factory pattern? (Any example of this?)
🤖 1
h
I had a similar thing where I was doing boilerplate code between Ops. First do it with a few Assets like that. If it works, then write a python function to encapsulate the boilerplate and have your Asset call that function to get the boilerplate code. Simplified example: I have an pure python function and ops calling it:
Copy code
def shell_script_runner(context, script_name):
    execute_shell_command(f"{script_name}")

@op
def script1(context):
    shell_script_runner(context, "hello_world.sh")

@op
def script2(context):
    shell_script_runner(context, "hello_dagster.sh")
I’m thinking you could do something similar for assets.
b
Thx for the suggestion. But script1 and script2 in your example are still very much copy pasted. For a second step in the pipeline, I would need to write the same individual functions. Like process_script1(script1) and process_script2(script2)
h
Yeah, they are copy paste looking because I’m doing Ops and migrating some existing scripts. For ops that are dependent on each other, I alter the op signature so I can make them dependent in the job. Here’s how I make an op dependent:
Copy code
@op
def data_reporter(context):
    pass
    
@op(ins={"start": In(Nothing)})
def data_transform(context):
    pass

@job()
def Sample_etl_job():
    data = data_transform(start=data_reporter())
I think if you’d like a better answer, I’d suggest putting some example of your boilerplate asset code so you can get a better answer.
b
Sure, fair enough. How about this:
Copy code
@asset
def fetch_one():
        return requests.get("<http://example.com/one>").json()

@asset
def fetch_two():
        return requests.get("<http://example.com/two>").json()
    
@asset
def clean_one(fetch_one):
    df = pd.json_normalize(fetch_one)
    return df.dropna()

@asset
def clean_two(fetch_two):
    df = pd.json_normalize(fetch_two)
    return df.dropna()
Now, I have many one, two, .... endpoints
h
Oh I see now, have you thought about making clean_json an Op since it seems to be doing compute on the asset passed to it? I think you’re in the same boat I was. Everything in Dagster tutorial starts from the assets, but if you have an existing process, you should probably start from the Op and Job. Because One and Two could be partition keys. Even Fetch looks like an Op to me.
b
I don't think I need Ops per se. I can rewrite it as assets. But the problem remains. Maybe I need partition keys?
h
yeah maybe as
Copy code
endpoint_partitions_def = StaticPartitionsDefinition(["one", "two", "three"])

@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset():
    …

@asset(partitions_def=endpoint_partitions_def)
def cleaned_asset(raw_json_asset):
   …
b
Thx. I will try that...
s
Hi Carl, What you should do here depends on whether you want to manage these as separate assets or not within Dagster-- it’s hard to say whether you should do that without knowing more about your use case. If you do want to manage them as separate assets, but just want to cut down on boilerplate defining them, you could use an asset factory approach;
Copy code
endpoints = ["one", "two"]
assets=[]

# necessary to create a new scope to make sure endpoint varies
def make_fetch(endpoint):
    @asset(name=f"fetch_{endpoint}")
    def fetch():
        return requests.get("<http://example.com/{endpoint}>").json()
    return fetch

for endpoint in endpoints:
    assets.append(make_fetch(endpoint))
If you don’t want to manage them as separate assets, then partitions make sense as you and Harry have been discussing.
h
if you don’t manage them as separate assets, @sean could you show how you would do the body of the raw_json_asset asset? I’m trying to understand how you get the partition_key used.
Copy code
@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset():
   return requests.get(f"<http://example.com/{get_asset_partition_key()}>").json()
get_asset_partition_key
is a guess at a method
b
Ah I see. So with seperate assets you mean that they represent different nodes in the graph? With partitions, it will be one node (but parametrized in a way). So the last option, would not allow parallel execution?
s
if you don’t manage them as separate assets, @sean could you show how you would do the body of the raw_json_asset asset? I’m trying to understand how you get the partition_key used.
Use `context.partition_key`:
Copy code
@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset(context: AssetExecutionContext):
   return requests.get(f"<http://example.com/{context.partition_key}>").json()
Ah I see. So with seperate assets you mean that they represent different nodes in the graph? With partitions, it will be one node (but parametrized in a way).
Yes.
So the last option, would not allow parallel execution?
The answer here is complicated. It will require multiple runs to materialize the two different partition keys-- those runs can be in parallel though. With Option 1, both endpoints can be done within the same run in parallel.
🙏 1
❤️ 1
b
Great. Thanks a lot. I will try that. Awesame that you guys have this kind of support 🙂
h
@sean, for the factory approach,
Copy code
assets=[]
for endpoint in endpoints:
    assets.append(make_fetch(endpoint))
you’d then add those assets in the Definitions block in the
__init__.py
?
s
you’d then add those assets in the Definitions block in the init.py ?
exactly
🌈 1