Resampling in pySpark with tabular data
I'm trying to create a pandas UDF in pyspark to perform resample, but I keep having errors about the structure of the data.
My data has two column, the first one is a dense vector that compress 44 features (is the output of spark.ml.PCA), while the second column is just an ID index
| pca_output| id_index|
+--------------------+-----------+
|[2.73000047560632...| 2194.0|
|[2.44850608593683...| 22983.0|
|[-0.1671784601274...| 83217.0|
|[-1.5240170259869...| 117934.0|
|[2.71468811512717...| 13520.0|
|[2.44325199555203...| 9394.0|
|[2.59260415577290...| 129256.0|
|[0.30590890462495...| 9520.0|
|[-1.9972896872877...| 30837.0|
|[0.97893872166372...| 44594.0|
|[0.25748638769748...| 2194.0|
|[-1.9956717604948...| 136583.0|
|[2.46351832667918...| 13520.0|
|[1.56077668931431...| 30974.0|
|[-0.6866795986598...| 72286.0|
|[-1.9790937590434...| 101421.0|
|[0.54742750900979...| 104264.0|
|[0.77711265099807...| 20996.0|
|[-1.9418766290579...| 30837.0|
|[0.08507079104073...| 116191.0|
+--------------------+-----------+
only showing top 20 rows
Here what I wrote
df = df.withColumn('random', lit(''))
schema = StructType([StructField("random", StringType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def openrationudf(i):
start_time = time.monotonic()
a = i.shape[0]
ratio1 = int(139/a)
oversampled_df1 = i.withColumn("dummy",
explode(array([lit(x) for x in range(ratio1+1)]))).drop('dummy')
oversampled_df1 = oversampled_df1.orderBy(rand())
oversampled_df1 = oversampled_df1.limit(139)
batch_resampled = batch_resampled.unionAll(oversampled_df1)
writeit(batch_resampled)
# batch_resampled = spark.createDataFrame([], major_df.schema)
print('time in seconds: ', time.monotonic() - start_time)
return i['random']
aa = df.select(["id_index", "pca_output"]).filter(col("id_index") <= 10).groupby('id_index').apply(openrationudf).collect()
Py4JJavaError: An error occurred while calling o266.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 157 in stage 27.0 failed 1 times, most recent failure: Lost task 157.0 in stage 27.0 (TID 229, 10.0.0.10, executor driver): java.lang.UnsupportedOperationException: Unsupported data type: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException: Unsupported data type: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
any thought about it?
from Recent Questions - Stack Overflow https://ift.tt/3zmLasY
https://ift.tt/eA8V8J
Comments
Post a Comment