Simon Späti
03/05/2024, 1:02 PMdagster_pyspark
resource when I want to run a custom Query (SQL)? This worked with older version:
@op(
out=Out(dagster_type=PropertyDataFrame, is_required=False),
required_resource_keys={"pyspark", "s3"},
description="""This will check if property is already downloaded. If so, check if price or other
columns have changed in the meantime, or if date is very old, download again""",
)
def get_changed_or_new_properties(context, properties: PropertyDataFrame):
# prepare ids and fingerprints from fetched properties
ids_tmp: list = [p["id"] for p in properties]
ids: str = ", ".join(ids_tmp)
<http://context.log.info|context.log.info>("Fetched propertyDetails_id's: [{}]".format(ids))
cols_props = ["propertyDetails_id", "fingerprint"]
cols_PropertyDataFrame = [
"id",
"fingerprint",
"is_prefix",
"rentOrBuy",
"city",
"propertyType",
"radius",
"last_normalized_price",
]
# get a list of existing property_ids with its fingerprint
existing_props: list = (
context.resources.pyspark.spark_session.sql(
"""SELECT propertyDetails_id
, CAST(propertyDetails_id AS STRING)
|| '-'
|| propertyDetails_normalizedPrice AS fingerprint
FROM delta.`<s3a://real-estate/lake/bronze/property>`
WHERE propertyDetails_id IN ( {ids} )
""".format(
ids=ids
)
)
.select("propertyDetails_id", "fingerprint")
.collect()
)
I used specifically context.resources.pyspark.spark_session.sql
. Now with latest pyspark resource I get:
pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: delta; line 5 pos 17
Scout
03/05/2024, 1:02 PMpyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: delta; line 5 pos 17
, suggests that there is an issue with using direct SQL queries on Delta Lake tables in the version of PySpark you are using. This issue may not be directly related to the dagster_pyspark
resource itself, but rather to the underlying PySpark configuration or the Delta Lake setup. Here are a few steps you can take to troubleshoot and potentially resolve the issue: 1. Check PySpark Version Compatibility: Ensure that the version of PySpark you are using is compatible with Delta Lake. Delta Lake has specific compatibility requirements for the Spark version. 2. Include Delta Lake Dependency: If you are using Delta Lake, make sure that the Delta Lake package is included in your PySpark environment. You may need to add the Delta Lake package to the spark.jars.packages
configuration when setting up your PySpark resource. 3. Review PySpark Configuration: Verify that your PySpark configuration includes all necessary options to work with Delta Lake. This might include setting configurations like spark.sql.extensions
to io.delta.sql.DeltaSparkSessionExtension
and spark.sql.catalog.spark_catalog
to org.apache.spark.sql.delta.catalog.DeltaCatalog
. 4. Use Delta Table Utility: Instead of querying the Delta Lake table directly with a file path, consider using the Delta Table utility to load the table and then perform SQL operations. For example: python from delta.tables import DeltaTable delta_table = DeltaTable.forPath(spark_session, "<s3a://real-estate/lake/bronze/property>") delta_df = delta_table.toDF() # Now you can perform SQL operations on delta_df
5. Check Filesystem Configuration: Ensure that your environment is correctly configured to access S3 paths (like s3a://
). This may involve setting up Hadoop configuration options for AWS access keys and secret keys. 6. Consult Documentation and Community: Review the Delta Lake and PySpark documentation for any updates or changes that might affect how you interact with Delta Lake tables. Additionally, consider searching for or asking questions in the community forums or issue trackers for both projects. If you continue to encounter issues, you might want to provide more details about your environment, such as the versions of PySpark and Delta Lake you are using, as well as any relevant configurations. This information can help in diagnosing the problem more accurately.Simon Späti
03/05/2024, 1:59 PMScout
03/05/2024, 1:59 PM