Java – Hadoop 2.2.0 add-on file with AlreadyBeingCreatedException

Hadoop 2.2.0 add-on file with AlreadyBeingCreatedException… here is a solution to the problem.

Hadoop 2.2.0 add-on file with AlreadyBeingCreatedException

I’m having a problem with the Hadoop 2.2.0 append operation. I append some bytes to an hdfs file via the HDFS java API. First, if the file doesn’t exist before the append operation, I’ll create the object file, with the following code:

String fileUri = "hdfs://hadoopmaster:9000/in/append_test.txt";
 create the hdfs file, if not exists
HdfsClient.createPathIfNotExist(fileUri);
 do 5 times append operation
for (int i=0; i<5; i++){
    HdfsClient.appendTo(fileUri, ("append content"+i).getBytes("UTF-8"));
}

createPathIfNotExist function:

Path p = null;
FileSystem fs = null;
try {
    fs = FileSystem.get(URI.create(uri), conf);
    p = new Path(uri);
    if (!fs.exists(p)) {
    if (uri.charAt(uri.length() - 1) == '/'){ //create a directory
        if(fs.mkdirs(p)){
                create successfully
            }
        }else{ //create a file
        FSDataOutputStream fos = fs.create(p);
            fos.close();
        }
    } else{
        System.out.println(uri + "existing");
    }
} catch (IOException e) {
    e.printStackTrace();
} finally{
    if (fs != null)
    try {
       fs.close();
       fs = null;
    } catch (IOException e) {
       e.printStackTrace();
        }
}

appendTo function:

ByteArrayInputStream in = null;
OutputStream out = null;
FileSystem fs = null;
try {
   in = new ByteArrayInputStream(bytes);
   fs = FileSystem.get(URI.create(uri), conf);
   out = fs.append(new Path(uri)); get append outputstream
   IOUtils.copyBytes(in, out, bufferSize, false);
} catch(Exception e){
   e.printStackTrace();
} finally{
   if (in != null) IOUtils.closeStream(in);
   if (out != null) IOUtils.closeStream(out);
   if (fs != null){
    try {
           fs.close();
           fs = null;
    } catch (IOException e) {
       e.printStackTrace();
    }
   }
}

The result is that append_test.txt is created, but the content is only:

append content0

and an exception occurs:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to create file [/in/append_test.txt] for [DFSClient_ NONMAPREDUCE_-1148656837_1] on client [192.168.141.1], because this file is already being created by [DFSClient_NONMAPREDUCE_2099912242_1] on [192.168.141.1]
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2320)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59572)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
        at org.apache.hadoop.ipc.Client.call(Client.java:1300)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
        at com.sun.proxy.$Proxy10.append(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy10.append(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245)
        at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520)
        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306)
        at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160)
        at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:130)
        at org.lh.blog.message.hadoop.HdfsClient$2.run(HdfsClient.java:1)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:356)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
        at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:121)
        at org.lh.blog.message.hadoop.HdfsClient.appendTo(HdfsClient.java:110)
        at org.lh.blog.message.test.HdfsClientTests.testCreateFileBeforeAppend(HdfsClientTests.java:26)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
        at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
        at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

That is, it only performs one append operation after creating a non-existent file, and the other four append operations fail with the above error.
I created the file before appending, but it shows AlreadyBeingCreatedException, I’m a little confused.

I also have some attempts. I found that hdfs files created with the java API cannot be appended. However, HDFS files created by the hdfs command (etc., “hdfs dfs -put”) can be appended.

Can you help me and give me some advice?

Thanks and regards.

Solution

To solve this problem,

  1. Read the contents of the file and store it in a variable.
  2. Add the new content that you want to append to this variable.
  3. Recreate the file and write your content back to it.

The process worked well for me and solved the problem.

The APPEND operation is costly, and this problem arises if you try it in parallel. So recreate the file and write the contents back into it instead of append.

Related Problems and Solutions