https://dagster.io/ logo
Title
h

HardSchrockCafe

01/21/2022, 12:48 AM
Hey folks. I'm trying to use the same op twice in a job, but passing different configuration to it each time. I'm quite new to the whole config_schema stuff. Is this possible? Code below (obviously Dagster doesn't like the fact I'm using strings as input variables right now):
@op
def push_to_kafka(parsed_feed, topic):
    producer = KafkaProducer(bootstrap_servers='XXXXXXX',
                             key_serializer=lambda x: x.encode('utf8'),
                             value_serializer=lambda x: json.dumps(x).encode('utf8'))
    for entry in parsed_feed:
        producer.send(topic,
                      key=entry['id'],
                      value=entry)
    get_dagster_logger().info(f"Successfully pushed to Kafka {topic}")


@job
def download_hacker_news_job():
    metadata = parse_rss_feed(download_rss_feed())
    push_to_kafka(metadata, 'news-metadata')
    push_to_kafka(download_article_content(metadata), 'news-content')
a

Alex Service

01/21/2022, 3:02 AM
Check out this section on reusing ops: https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#using-the-same-op-twice I haven’t tried it, but my guess is that you would reference the op alias in your config
👍 1
s

sandy

01/21/2022, 4:13 PM