2021-08-27

Resampling in pySpark with tabular data

I'm trying to create a pandas UDF in pyspark to perform resample, but I keep having errors about the structure of the data.

My data has two column, the first one is a dense vector that compress 44 features (is the output of spark.ml.PCA), while the second column is just an ID index

|          pca_output|   id_index|
+--------------------+-----------+
|[2.73000047560632...|     2194.0|
|[2.44850608593683...|    22983.0|
|[-0.1671784601274...|    83217.0|
|[-1.5240170259869...|   117934.0|
|[2.71468811512717...|    13520.0|
|[2.44325199555203...|     9394.0|
|[2.59260415577290...|   129256.0|
|[0.30590890462495...|     9520.0|
|[-1.9972896872877...|    30837.0|
|[0.97893872166372...|    44594.0|
|[0.25748638769748...|     2194.0|
|[-1.9956717604948...|   136583.0|
|[2.46351832667918...|    13520.0|
|[1.56077668931431...|    30974.0|
|[-0.6866795986598...|    72286.0|
|[-1.9790937590434...|   101421.0|
|[0.54742750900979...|   104264.0|
|[0.77711265099807...|    20996.0|
|[-1.9418766290579...|    30837.0|
|[0.08507079104073...|   116191.0|
+--------------------+-----------+
only showing top 20 rows

Here what I wrote

df = df.withColumn('random', lit(''))
schema = StructType([StructField("random", StringType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def openrationudf(i):
    start_time = time.monotonic()
    a = i.shape[0]
    ratio1 = int(139/a)
    oversampled_df1 = i.withColumn("dummy", 
                                            explode(array([lit(x) for x in range(ratio1+1)]))).drop('dummy')
    oversampled_df1 = oversampled_df1.orderBy(rand())
    oversampled_df1 = oversampled_df1.limit(139)
    batch_resampled = batch_resampled.unionAll(oversampled_df1)
    writeit(batch_resampled)
    # batch_resampled = spark.createDataFrame([], major_df.schema)
    print('time in seconds: ', time.monotonic() - start_time)
    return i['random']

aa = df.select(["id_index", "pca_output"]).filter(col("id_index") <= 10).groupby('id_index').apply(openrationudf).collect()



Py4JJavaError: An error occurred while calling o266.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 157 in stage 27.0 failed 1 times, most recent failure: Lost task 157.0 in stage 27.0 (TID 229, 10.0.0.10, executor driver): java.lang.UnsupportedOperationException: Unsupported data type: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
    at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
    at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
    at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
    at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException: Unsupported data type: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
    at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
    at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
    at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
    at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1934)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

any thought about it?



from Recent Questions - Stack Overflow https://ift.tt/3zmLasY
https://ift.tt/eA8V8J

No comments:

Post a Comment