I have stored a dataset in hive in orc. And now I am trying to acces it through sparklyr, but I got an error:
Caused by: java.lang.OutOfMemoryError: Java heap space
I used the following script:
library(DBI)
library(sparklyr)
library(dplyr)
Sys.setenv(SPARK_HOME="/usr/hdp/current/spark-client/")
sc <- spark_connect(master="yarn-client",
config = list(
spark.submit.deployMode= "client",
spark.executor.instances= 5,
spark.executor.memory= "10G",
spark.executor.cores= 4,
spark.driver.memory= "8G",
spark.yarn.executor.memoryOverhead= 1
))
And then, when I tried to load the table, sparklyr shut down, giving this error:
Error in if (ncol(df) > 0) df else invisible(df) :
argument is of length zero
In addition: Warning message:
In value[[3L]](cond) :
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.lang.IllegalStateException: unread block data
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.sched [... truncated]
and the log output:
16/11/23 10:11:54 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:258)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:310)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:256)
at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:588)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/11/23 10:11:56 WARN TransportChannelHandler: Exception in connection from lnxbig06.cajarural.gcr/10.1.246.20:34139
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1160)
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:939)
at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:45)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/11/23 10:11:55 WARN TransportChannelHandler: Exception in connection from lnxbig04.cajarural.gcr/10.1.246.18:38460
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1160)
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:939)
at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:45)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
@Tarqui which version of Spark and distribution? How are you loading the table? Could you share your sessionInfo()
?
There are a couple instances of this attributed to different library versions being installed:
Given is library related, you could try to add this sparklyr.csv.embedded = NULL
connection option:
{r}
sc <- spark_connect(master="yarn-client",
config = list(
spark.submit.deployMode= "client",
spark.executor.instances= 5,
spark.executor.memory= "10G",
spark.executor.cores= 4,
spark.driver.memory= "8G",
spark.yarn.executor.memoryOverhead= 1,
sparklyr.csv.embedded= NULL
))
@javierluraschi thx for your answer
I am sorry, I was not clear enough. What I wanted is to load in spark dataframe the following select from hive:
table = dbGetQuery(sc,
"SELECT *
FROM table_orc
WHERE fecha = '2016-07-31'")
But I noticed, that "dbGetQuery", bring data to local dataframe, so that's why I have a java heap error.
I didt it with:
table = tbl(sc,sql("SELECT *
FROM table_orc
WHERE fecha = '2016-07-31'"))
I want to ask you if there is another way to acces hive tables without using sql sintax.
R version 3.2.3 (2015-12-10)
Platform: x86_64-redhat-linux-gnu (64-bit)
Running under: CentOS release 6.7 (Final)
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=en_US.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] dplyr_0.5.0 sparklyr_0.4 DBI_0.4-1
loaded via a namespace (and not attached):
[1] magrittr_1.5 R6_2.1.1 assertthat_0.1 rprojroot_1.0-2
[5] parallel_3.2.3 tools_3.2.3 withr_1.0.2 yaml_2.1.13
[9] tibble_1.1 Rcpp_0.12.3 config_0.1.0 digest_0.6.8
> spark_version(sc)
[1] ‘1.6.0’
Accessing hive table using "tbl" function in dplyr is easy
library(DBI)
library(dplyr)
#change the database from default database to the database that contains the hive table
dbGetQuery(sc, "use workds")
#Register table in Spark
bookings <- tbl(sc, "bv_bookings")
#Now one can use dplyr verbes
count(bookings)
head(bookings)
regionDF = bookings %>%
filter(fiscal_year_number_int == 2006) %>%
group_by(bk_sub_business_entity_name) %>%
summarise(totalNetBooking = sum(booking_net_amount)) %>%
collect
Registering a dplyr table using hive sql query
hiveQuery = "select b.business_entity_name, b.sub_business_entity_name,
sum(a.booking_net_amount) total_booking from bv_bookings a, be_h b
where bk_sub_business_entity_name = b.sub_business_entity_name
group by b.business_entity_name, b.sub_business_entity_name
"
bookTbl = tbl(sc, sql(hiveQuery))
head(bookTbl)
bookTbl %>% select(business_entity_name, total_booking)
# Get mean and min for the total_bookings
bookTbl %>%
filter(total_booking > 5000) %>%
mutate(meanBooking = mean(total_booking),
minBooking = min(total_booking))
Questioned answered by rvij, closing.
Most helpful comment
Accessing hive table using "tbl" function in dplyr is easy
Registering a dplyr table using hive sql query