hello here, is there anyone who uses `assets` with...
# ask-community
b
hello here, is there anyone who uses
assets
with
pyspark + deltalake
. I am trying to understand how I can materialize assets as
deltalake
tables to
s3
.
dagster bot responded by community 1
z
I've done this - is there anything specifically that you're having trouble with? it's essentially just like the pyspark example repo, except you use an asset instead of ops, and your spark session needs to be configured to have the delta-io jar. then it's just
obj.write.format("delta").save(path)
b
I have tried this , but having issues with
spark
now https://dagster.slack.com/archives/C01U954MEER/p1661339761830559
z
where is the error showing up? is there more to the stack trace than that line?
b
Copy code
2022-08-24 11:52:39 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 1 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 1)
2022-08-24 11:52:39 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 1 - make_people - STEP_WORKER_STARTING - Launching subprocess for "make_people".
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/24 11:53:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/24 11:53:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2022-08-24 11:53:18 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - STEP_WORKER_STARTED - Executing step "make_people" in subprocess.
2022-08-24 11:53:18 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2022-08-24 11:53:18 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2022-08-24 11:53:18 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - LOGS_CAPTURED - Started capturing logs for step: make_people.
2022-08-24 11:53:18 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - STEP_START - Started execution of step "make_people".
2022-08-24 11:53:44 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - STEP_OUTPUT - Yielded output "result" of type "DataFrame". (Type check passed).

[Stage 0:>                                                          (0 + 0) / 1]

[Stage 0:>                                                          (0 + 1) / 1]

                                                                                
11:54:08 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-08-24 11:54:08 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 113 - make_people - STEP_SUCCESS - Finished execution of step "make_people" in 50.23s.
2022-08-24 11:54:09 +0000 - dagster - DEBUG - make_and_filter_data - 93eb470b-5ce0-4d38-b1dd-57e308cb9db0 - 1 - filter_over_50 - STEP_WORKER_STARTING - Launching subprocess for "filter_over_50".
22/08/24 11:54:10 WARN AbstractConnector: 
java.io.IOException: Thread signal failed
	at java.base/sun.nio.ch.NativeThread.signal(Native Method)
	at java.base/sun.nio.ch.ServerSocketChannelImpl.implCloseSelectableChannel(ServerSocketChannelImpl.java:365)
	at java.base/java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(AbstractSelectableChannel.java:242)
	at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.close(AbstractInterruptibleChannel.java:112)
	at org.sparkproject.jetty.server.ServerConnector.close(ServerConnector.java:371)
	at org.sparkproject.jetty.server.AbstractNetworkConnector.shutdown(AbstractNetworkConnector.java:104)
	at org.sparkproject.jetty.server.Server.doStop(Server.java:444)
	at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94)
here is a part of error
z
is there more to it? usually the top of a pyspark error isn't particularly informative
b
z
hmm a shot in the dark is maybe an out of memory error?
I'm assuming the dataframe doesn't get written out to disk?
b
Nope, this is just the same example from dagster
hey @Zach, thanks for comments. the issue was about spark image. I tried to install java and pyspark on top of the dagster provided docker. But then I install dagster components on top of spark provided dockerfile. Most probably the issue was java itself.
it is running like a charm now.
thanks D
z
great! yeah spark installs are never straightforward
b
hey @Zach, just another issue with localmode it work but when I try to submit jobs to K8S with multiple executors , it fails. I think the executors are unable to connect back to driver, because it is a job, not a pod , right? Do you have any suggestions?
Failed to connect to dagster-run-66fd6f2a-84d1-4edc-816a-07a1e56813a1-7527w:34517
z
I don't unfortunately, I don't know anything about K8S so that's out of my ballpark. a shot in the dark would be checking whether the right ports are open on the driver container, but you've probably already done that
b
hey @Zach thanks for comment. The issue was about spark client networking. https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode the pyspark runs in client mode when dagster k8s job started and a headless service is required for driver-executer communication.
🎉 1