Sparklyr: Java heap space reading table from hive

Created on 23 Nov 2016  Â·  4Comments  Â·  Source: sparklyr/sparklyr

I have stored a dataset in hive in orc. And now I am trying to acces it through sparklyr, but I got an error:

Caused by: java.lang.OutOfMemoryError: Java heap space

I used the following script:

library(DBI)
library(sparklyr)
library(dplyr)

Sys.setenv(SPARK_HOME="/usr/hdp/current/spark-client/")

sc <- spark_connect(master="yarn-client",
                    config = list(
                      spark.submit.deployMode= "client",
                      spark.executor.instances= 5,
                      spark.executor.memory= "10G",
                      spark.executor.cores= 4,
                      spark.driver.memory= "8G",
                      spark.yarn.executor.memoryOverhead= 1
                      ))

And then, when I tried to load the table, sparklyr shut down, giving this error:

Error in if (ncol(df) > 0) df else invisible(df) : 
  argument is of length zero
In addition: Warning message:
In value[[3L]](cond) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.lang.IllegalStateException: unread block data
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.sched [... truncated]

and the log output:

16/11/23 10:11:54 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:258)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:310)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:256)
    at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:588)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/11/23 10:11:56 WARN TransportChannelHandler: Exception in connection from lnxbig06.cajarural.gcr/10.1.246.20:34139
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1160)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:939)
    at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:45)
    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/11/23 10:11:55 WARN TransportChannelHandler: Exception in connection from lnxbig04.cajarural.gcr/10.1.246.18:38460
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1160)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:939)
    at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:45)
    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
question

Most helpful comment

Accessing hive table using "tbl" function in dplyr is easy

library(DBI)
library(dplyr)

#change the database from default database to the database that contains the hive table
dbGetQuery(sc, "use workds")  

#Register table in Spark
bookings <- tbl(sc, "bv_bookings")

#Now one can use dplyr verbes 
count(bookings)
head(bookings)

regionDF = bookings %>% 
  filter(fiscal_year_number_int == 2006) %>% 
  group_by(bk_sub_business_entity_name) %>%
  summarise(totalNetBooking = sum(booking_net_amount)) %>%
  collect

Registering a dplyr table using hive sql query

hiveQuery = "select b.business_entity_name, b.sub_business_entity_name,
sum(a.booking_net_amount) total_booking from bv_bookings a, be_h b 
where bk_sub_business_entity_name = b.sub_business_entity_name  
group by b.business_entity_name, b.sub_business_entity_name
" 
bookTbl = tbl(sc, sql(hiveQuery))

head(bookTbl)

bookTbl %>% select(business_entity_name, total_booking)

# Get mean and min for the total_bookings
bookTbl %>% 
  filter(total_booking > 5000) %>%
  mutate(meanBooking = mean(total_booking),
         minBooking = min(total_booking))

All 4 comments

@Tarqui which version of Spark and distribution? How are you loading the table? Could you share your sessionInfo()?

There are a couple instances of this attributed to different library versions being installed:

https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-java-lang-IllegalStateException-unread-block-data/td-p/21836

http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-giving-me-error-of-unread-block-data-td19226.html

Given is library related, you could try to add this sparklyr.csv.embedded = NULL connection option:

{r} sc <- spark_connect(master="yarn-client", config = list( spark.submit.deployMode= "client", spark.executor.instances= 5, spark.executor.memory= "10G", spark.executor.cores= 4, spark.driver.memory= "8G", spark.yarn.executor.memoryOverhead= 1, sparklyr.csv.embedded= NULL ))

@javierluraschi thx for your answer

I am sorry, I was not clear enough. What I wanted is to load in spark dataframe the following select from hive:

table = dbGetQuery(sc,
                   "SELECT *
                    FROM table_orc 
                    WHERE fecha = '2016-07-31'")

But I noticed, that "dbGetQuery", bring data to local dataframe, so that's why I have a java heap error.

I didt it with:

table = tbl(sc,sql("SELECT *
                    FROM table_orc 
                    WHERE fecha = '2016-07-31'"))

I want to ask you if there is another way to acces hive tables without using sql sintax.

R version 3.2.3 (2015-12-10)
Platform: x86_64-redhat-linux-gnu (64-bit)
Running under: CentOS release 6.7 (Final)

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_0.5.0  sparklyr_0.4 DBI_0.4-1   

loaded via a namespace (and not attached):
 [1] magrittr_1.5    R6_2.1.1        assertthat_0.1  rprojroot_1.0-2
 [5] parallel_3.2.3  tools_3.2.3     withr_1.0.2     yaml_2.1.13    
 [9] tibble_1.1      Rcpp_0.12.3     config_0.1.0    digest_0.6.8   
> spark_version(sc)
[1] ‘1.6.0’

Accessing hive table using "tbl" function in dplyr is easy

library(DBI)
library(dplyr)

#change the database from default database to the database that contains the hive table
dbGetQuery(sc, "use workds")  

#Register table in Spark
bookings <- tbl(sc, "bv_bookings")

#Now one can use dplyr verbes 
count(bookings)
head(bookings)

regionDF = bookings %>% 
  filter(fiscal_year_number_int == 2006) %>% 
  group_by(bk_sub_business_entity_name) %>%
  summarise(totalNetBooking = sum(booking_net_amount)) %>%
  collect

Registering a dplyr table using hive sql query

hiveQuery = "select b.business_entity_name, b.sub_business_entity_name,
sum(a.booking_net_amount) total_booking from bv_bookings a, be_h b 
where bk_sub_business_entity_name = b.sub_business_entity_name  
group by b.business_entity_name, b.sub_business_entity_name
" 
bookTbl = tbl(sc, sql(hiveQuery))

head(bookTbl)

bookTbl %>% select(business_entity_name, total_booking)

# Get mean and min for the total_bookings
bookTbl %>% 
  filter(total_booking > 5000) %>%
  mutate(meanBooking = mean(total_booking),
         minBooking = min(total_booking))

Questioned answered by rvij, closing.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Scotturbina picture Scotturbina  Â·  3Comments

VadymBoikov picture VadymBoikov  Â·  3Comments

MarcinKosinski picture MarcinKosinski  Â·  4Comments

joscani picture joscani  Â·  4Comments

dsblr picture dsblr  Â·  4Comments