HardSchrockCafe
01/21/2022, 12:48 AM@op
def push_to_kafka(parsed_feed, topic):
producer = KafkaProducer(bootstrap_servers='XXXXXXX',
key_serializer=lambda x: x.encode('utf8'),
value_serializer=lambda x: json.dumps(x).encode('utf8'))
for entry in parsed_feed:
producer.send(topic,
key=entry['id'],
value=entry)
get_dagster_logger().info(f"Successfully pushed to Kafka {topic}")
@job
def download_hacker_news_job():
metadata = parse_rss_feed(download_rss_feed())
push_to_kafka(metadata, 'news-metadata')
push_to_kafka(download_article_content(metadata), 'news-content')
Alex Service
01/21/2022, 3:02 AMsandy
01/21/2022, 4:13 PM