Fix rename operation for under replicated files

Description

2019-05-22 13:38:48,356 INFO pname,TrafficDetectionOneTopic,2,application_1558465083344_0029 CheckpointFileManager: Writing atomically to hdfs://10.0.2.15:8020/tmp/temporary-998d3131-f7fb-4d85-acb7-5a5fd5947f8c/state/0/43/2.delta using temp file hdfs://10.0.2.15:8020/tmp/temporary-998d3131-f7fb-4d85-acb7-5a5fd5947f8c/state/0/43/.2.delta.eacb0227-3fda-4eeb-90ef-d11e59095cf0.TID249.tmp
2019-05-22 13:38:48,827 ERROR pname,TrafficDetectionOneTopic,2,application_1558465083344_0029 Utils: Aborting task
java.lang.IllegalStateException: Error committing version 2 into HDFSStateStore[id=(op=0,part=43),dir=hdfs://10.0.2.15:8020/tmp/temporary-998d3131-f7fb-4d85-acb7-5a5fd5947f8c/state/0/43]
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1$$anonfun$close$1.apply$mcV$sp(statefulOperators.scala:362)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1$$anonfun$close$1.apply(statefulOperators.scala:362)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1$$anonfun$close$1.apply(statefulOperators.scala:362)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:277)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.close(statefulOperators.scala:362)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.IllegalStateException): No replica should have been changed during the Tx ( class io.hops.transaction.context.UnderReplicatedBlockContext)
at io.hops.transaction.context.BaseReplicaContext.checkForSnapshotChange(BaseReplicaContext.java:199)
at io.hops.transaction.context.BaseReplicaContext.snapshotMaintenance(BaseReplicaContext.java:165)
at io.hops.transaction.context.TransactionContext.snapshotMaintenance(TransactionContext.java:170)
at io.hops.transaction.EntityManager.snapshotMaintenance(EntityManager.java:119)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp$RenameOperation.snapshotMaintenance(FSDirRenameOp.java:944)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp$2.performTask(FSDirRenameOp.java:680)
at io.hops.transaction.handler.TransactionalRequestHandler.execute(TransactionalRequestHandler.java:100)
at io.hops.transaction.handler.HopsTransactionalRequestHandler.execute(HopsTransactionalRequestHandler.java:50)
at io.hops.transaction.handler.RequestHandler.handle(RequestHandler.java:65)
at io.hops.transaction.handler.RequestHandler.handle(RequestHandler.java:60)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToTransaction(FSDirRenameOp.java:698)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:531)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:415)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:397)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3306)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:787)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename2(ClientNamenodeProtocolServerSideTranslatorPB.java:678)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:996)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1929)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2775)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1535)
at org.apache.hadoop.ipc.Client.call(Client.java:1481)
at org.apache.hadoop.ipc.Client.call(Client.java:1391)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy18.rename2(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename2(ClientNamenodeProtocolTranslatorPB.java:592)
at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy19.rename2(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1861)
at org.apache.hadoop.fs.Hdfs.renameInternal(Hdfs.java:348)
at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:691)
at org.apache.hadoop.fs.FileContext.rename(FileContext.java:966)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
... 28 more

Status

Assignee

Salman Niazi

Reporter

Salman Niazi

Labels

None

Fix versions

Priority

Medium