How do I use the `dagster_pyspark` resource when I...
# ask-ai
s
How do I use the
dagster_pyspark
resource when I want to run a custom Query (SQL)? This worked with older version:
Copy code
@op(
    out=Out(dagster_type=PropertyDataFrame, is_required=False),
    required_resource_keys={"pyspark", "s3"},
    description="""This will check if property is already downloaded. If so, check if price or other
    columns have changed in the meantime, or if date is very old, download again""",
)
def get_changed_or_new_properties(context, properties: PropertyDataFrame):
    # prepare ids and fingerprints from fetched properties
    ids_tmp: list = [p["id"] for p in properties]
    ids: str = ", ".join(ids_tmp)

    <http://context.log.info|context.log.info>("Fetched propertyDetails_id's: [{}]".format(ids))

    cols_props = ["propertyDetails_id", "fingerprint"]
    cols_PropertyDataFrame = [
        "id",
        "fingerprint",
        "is_prefix",
        "rentOrBuy",
        "city",
        "propertyType",
        "radius",
        "last_normalized_price",
    ]
    # get a list of existing property_ids with its fingerprint
    existing_props: list = (
        context.resources.pyspark.spark_session.sql(
            """SELECT propertyDetails_id
                , CAST(propertyDetails_id AS STRING)
                    || '-'
                    || propertyDetails_normalizedPrice AS fingerprint
            FROM delta.`<s3a://real-estate/lake/bronze/property>`
            WHERE propertyDetails_id IN ( {ids} )
            """.format(
                ids=ids
            )
        )
        .select("propertyDetails_id", "fingerprint")
        .collect()
    )
I used specifically
context.resources.pyspark.spark_session.sql
. Now with latest pyspark resource I get:
Copy code
pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: delta; line 5 pos 17
s
The error message you're encountering,
pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY] Unsupported data source type for direct query on files: delta; line 5 pos 17
, suggests that there is an issue with using direct SQL queries on Delta Lake tables in the version of PySpark you are using. This issue may not be directly related to the
dagster_pyspark
resource itself, but rather to the underlying PySpark configuration or the Delta Lake setup. Here are a few steps you can take to troubleshoot and potentially resolve the issue: 1. Check PySpark Version Compatibility: Ensure that the version of PySpark you are using is compatible with Delta Lake. Delta Lake has specific compatibility requirements for the Spark version. 2. Include Delta Lake Dependency: If you are using Delta Lake, make sure that the Delta Lake package is included in your PySpark environment. You may need to add the Delta Lake package to the
spark.jars.packages
configuration when setting up your PySpark resource. 3. Review PySpark Configuration: Verify that your PySpark configuration includes all necessary options to work with Delta Lake. This might include setting configurations like
spark.sql.extensions
to
io.delta.sql.DeltaSparkSessionExtension
and
spark.sql.catalog.spark_catalog
to
org.apache.spark.sql.delta.catalog.DeltaCatalog
. 4. Use Delta Table Utility: Instead of querying the Delta Lake table directly with a file path, consider using the Delta Table utility to load the table and then perform SQL operations. For example:
Copy code
python    from delta.tables import DeltaTable    delta_table = DeltaTable.forPath(spark_session, "<s3a://real-estate/lake/bronze/property>")    delta_df = delta_table.toDF()    # Now you can perform SQL operations on delta_df
5. Check Filesystem Configuration: Ensure that your environment is correctly configured to access S3 paths (like
s3a://
). This may involve setting up Hadoop configuration options for AWS access keys and secret keys. 6. Consult Documentation and Community: Review the Delta Lake and PySpark documentation for any updates or changes that might affect how you interact with Delta Lake tables. Additionally, consider searching for or asking questions in the community forums or issue trackers for both projects. If you continue to encounter issues, you might want to provide more details about your environment, such as the versions of PySpark and Delta Lake you are using, as well as any relevant configurations. This information can help in diagnosing the problem more accurately.
s
@Scout how would I setup spark and delta lake within docker so I wouldn't be dependend on my local state and install of spark , java, and all the jars
s
There was an error processing your request. Please try again.