频道栏目
首页 > 网络 > 云计算 > 正文

Hadoop源码分析——数据节点写数据2

2016-06-23 09:08:11         来源:lfdanding的专栏  
收藏   我要投稿

数据接收
客户端写往数据节点的数据由org.apache.hadoop.hdfs.server.datanode.BlockReceiver.java中的receiveBlock方法接收

 void receiveBlock(
      DataOutputStream mirrOut, // output to next datanode
      DataInputStream mirrIn,   // input from next datanode
      DataOutputStream replyOut,  // output to previous datanode
      String mirrAddr, DataTransferThrottler throttlerArg,
      int numTargets) throws IOException {

      mirrorOut = mirrOut;
      mirrorAddr = mirrAddr;
      throttler = throttlerArg;

    try {
      // write data chunk header
      if (!finalized) {
        BlockMetadataHeader.writeHeader(checksumOut, checksum);
      }
      if (clientName.length() > 0) {
        responder = new Daemon(datanode.threadGroup, 
                               new PacketResponder(this, block, mirrIn, 
                                                   replyOut, numTargets,
                                                   Thread.currentThread()));
        responder.start(); // start thread to processes reponses
      }

      /* 
       * Receive until packet length is zero.
       */
      while (receivePacket() > 0) {}

      // flush the mirror out
      if (mirrorOut != null) {
        try {
          mirrorOut.writeInt(0); // mark the end of the block
          mirrorOut.flush();
        } catch (IOException e) {
          handleMirrorOutError(e);
        }
      }

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();
      }

      // if this write is for a replication request (and not
      // from a client), then finalize block. For client-writes, 
      // the block is finalized in the PacketResponder.
      if (clientName.length() == 0) {
        // close the block/crc files
        close();

        // Finalize the block. Does this fsync()?
        block.setNumBytes(offsetInBlock);
        datanode.data.finalizeBlock(block);
        datanode.myMetrics.incrBlocksWritten();
      }

    } catch (IOException ioe) {
      LOG.info("Exception in receiveBlock for " + block + " " + ioe);
      IOUtils.closeStream(this);
      if (responder != null) {
        responder.interrupt();
      }
      cleanupBlock();
      throw ioe;
    } finally {
      if (responder != null) {
        try {
          responder.join();
        } catch (InterruptedException e) {
          throw new IOException("Interrupted receiveBlock");
        }
        responder = null;
      }
    }
  }

receiveBlock()将大部分处理工作交给receivePacket()。BlockReceiver.receiveBlock()循环调用receivePacket(),处理这次写请求的所有数据包。当所有的数据接收完毕后,它进行下述清理工作:发送一个空的数据包到下游节点,通知后面的数据节点这次写操作结束;等待PacketResponder线程结束,线程结束,表明所有的响应已经发送给上游节点,同时,本节点的处理结果也已经通知名字节点;关闭文件,并更新数据块文件的长度,然后,通过FSDataset.finalizeBlock()提交数据块。至此,写数据请求处理完毕
receivePacket()代码如下:

 /** 
   * Receives and processes a packet. It can contain many chunks.
   * returns size of the packet.
   */
  private int receivePacket() throws IOException {

    int payloadLen = readNextPacket();

    if (payloadLen <= 0) {
      return payloadLen;
    }

    buf.mark();
    //read the header
    buf.getInt(); // packet length
    offsetInBlock = buf.getLong(); // get offset of packet in block
    long seqno = buf.getLong();    // get seqno
    boolean lastPacketInBlock = (buf.get() != 0);

    int endOfHeader = buf.position();
    buf.reset();

    if (LOG.isDebugEnabled()){
      LOG.debug("Receiving one packet for " + block +
                " of length " + payloadLen +
                " seqno " + seqno +
                " offsetInBlock " + offsetInBlock +
                " lastPacketInBlock " + lastPacketInBlock);
    }

    setBlockPosition(offsetInBlock);

    // First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
        mirrorOut.flush();
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }

    buf.position(endOfHeader);        
    int len = buf.getInt();

    if (len < 0) {
      throw new IOException("Got wrong length during writeBlock(" + block + 
                            ") from " + inAddr + " at offset " + 
                            offsetInBlock + ": " + len); 
    } 

    if (len == 0) {
      LOG.debug("Receiving empty packet for " + block);
    } else {
      offsetInBlock += len;

      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                            checksumSize;

      if ( buf.remaining() != (checksumLen + len)) {
        throw new IOException("Data remaining in packet does not match " +
                              "sum of checksumLen and dataLen");
      }
      int checksumOff = buf.position();
      int dataOff = checksumOff + checksumLen;
      byte pktBuf[] = buf.array();

      buf.position(buf.limit()); // move to the end of the data.

      /* skip verifying checksum iff this is not the last one in the 
       * pipeline and clientName is non-null. i.e. Checksum is verified
       * on all the datanodes when the data is being written by a 
       * datanode rather than a client. Whe client is writing the data, 
       * protocol includes acks and only the last datanode needs to verify 
       * checksum.
       */
      if (mirrorOut == null || clientName.length() == 0) {
        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
      }

      try {
        if (!finalized) {
          //finally write to the disk :
          out.write(pktBuf, dataOff, len);

          // If this is a partial chunk, then verify that this is the only
          // chunk in the packet. Calculate new crc for this chunk.
          if (partialCrc != null) {
            if (len > bytesPerChecksum) {
              throw new IOException("Got wrong length during writeBlock(" + 
                                    block + ") from " + inAddr + " " +
                                    "A packet can have only one partial chunk."+
                                    " len = " + len + 
                                    " bytesPerChecksum " + bytesPerChecksum);
            }
            partialCrc.update(pktBuf, dataOff, len);
            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
            checksumOut.write(buf);
            LOG.debug("Writing out partial crc for data len " + len);
            partialCrc = null;
          } else {
            checksumOut.write(pktBuf, checksumOff, checksumLen);
          }
          datanode.myMetrics.incrBytesWritten(len);
          /// flush entire packet before sending ack
          flush();

          // update length only after flush to disk
          datanode.data.setVisibleLength(block, offsetInBlock);
          dropOsCacheBehindWriter(offsetInBlock);
        }
      } catch (IOException iex) {
        datanode.checkDiskError(iex);
        throw iex;
      }
    }

    // put in queue for pending acks
    if (responder != null) {
      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                      lastPacketInBlock); 
    }

    if (throttler != null) { // throttle I/O
      throttler.throttle(payloadLen);
    }

    return payloadLen;
  }

receivePacket()方法一开始就调用readNextPacket(),通过该方法最少读入一个写请求数据包。读入的数据放在类型为ByteBuffer的缓冲区buf中。ByteBuffer是java.nio.Buffer的子类
readNextPacket()的实现比较琐碎,需要保证缓冲区剩余空间可以读入整个数据包,必须考虑数据包的长度(必要时需要重新分配缓冲区)、并根据需要调整数据在缓冲区中的位置。总而言之,readNextPacket()一次读入一个完整的数据包。

BlockReceiver.receivePacket()从缓冲区读入数据包的包头信息,并赋值到对象的成员变量,接下来的准备动作是:调用setBlockPosition()方法,设置写数据时的文件位置。代码如下

  buf.mark();
    //read the header
    buf.getInt(); // packet length
    offsetInBlock = buf.getLong(); // get offset of packet in block
    long seqno = buf.getLong();    // get seqno
    boolean lastPacketInBlock = (buf.get() != 0);

    int endOfHeader = buf.position();
    buf.reset();

    if (LOG.isDebugEnabled()){
      LOG.debug("Receiving one packet for " + block +
                " of length " + payloadLen +
                " seqno " + seqno +
                " offsetInBlock " + offsetInBlock +
                " lastPacketInBlock " + lastPacketInBlock);
    }

    setBlockPosition(offsetInBlock);

我们知道,写数据的时候其实需要同时写两个文件,数据块文件和它的校验信息文件,BlockReceiver.setBlockPositiion()会设置这两个文件的写位置,为后续持久化数据做准备。设置数据块文件的写位置比较简单,但校验信息文件的就相对复杂了,这又是一个和分块校验相关的调整。校验信息的计算需要数据以块的形式组织,但用户以流的形式写数据,如果写(追加)数据的开始点落在某个校验块的中间,需要在确定校验信息文件写位置的时候,计算不完整数据校验块(图中的校验块n)的校验信息,为后续写校验数据做准备。
这里写图片描述

setBlockPZ喎"http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vc2l0aW9uKCm3vbeo1rvT0NK7uPayzsr9b2Zmc2V0SW5CbG9ja6OsvLTK/b7dv+nOxLz+tcTQtM671sOjrMjnufu4w8671sOyu7Sm09rQo9Hpv+m1xLHfveejrL7Nu+G199PDY29tcHV0ZVBhcnRpYWxDaHVua0NyYygpvMbL48nPyvbQo9Hp0MXPoqGjuMO3vbeo0OjSqsj9uPayzsr9o6y9+Mjrt723qMqxo6zHsMPmwb249rLOyv1ibGtvZmYsY2tvZma1xNa1yOfJz828o6y12sj9uPayzsr9Ynl0ZXNQZXJDaGVja3N1bcrH0KPR6b/ptcS089ChoaO4+b7d1eLQqbLOyv2jrMrXz8i/ydLUvMbL47P2srvN6tX7yv2+3b/ptcSzpLbIc2l6ZVBhcnRpYWxDaHVua6OsyLu686Ost9bF5Lu6s+XH+LbByOvK/b7do6y8xsvj1eLQqcr9vt21xNCj0em6zaGj0KPR6brNtcS94bn7saO05tTaQmxvY2tSZWNlaXZlcrXEs8nUsbHkwb9wYXJ0aWFsQ3Jj1tCjrLrzw+a74brN0MLQtMjrtcTK/b7d0rvG8KOsvMbL47P20MK1xNCj0em6zbXE1rWjrLKisaOz1tTa0KPR6dDFz6LOxLz+tcS1scewzrvWw6Gj16LS4qOsttTT2sr9vt2/6c7EvP6jrNDC0LTI67XEyv2+3bvh17e809Tayv2+3b/pzsS8/rXEzsS8/s6yo6y1q9Cj0enQxc+izsS8/qOsyOe5+7bU06a1xMrH0ru49rK7zerV+8r9vt3Qo9Hpv+mjrMTHw7SjrM7EvP61xNfuuvOyv7fWo6i002Nrb2Zmv6rKvLXEtdi3vaOpu+Gxu7j80MKjrLb4srvE3LzytaW12NaxvdO9+NDQ17e806GjtPrC68jnz8KjujwvcD4NCjxwcmUgY2xhc3M9"brush:java;"> /** * Sets the file pointer in the local block file to the specified value. */ private void setBlockPosition(long offsetInBlock) throws IOException { if (finalized) { if (!isRecovery) { throw new IOException("Write to offset " + offsetInBlock + " of block " + block + " that is already finalized."); } if (offsetInBlock > datanode.data.getLength(block)) { throw new IOException("Write to offset " + offsetInBlock + " of block " + block + " that is already finalized and is of size " + datanode.data.getLength(block)); } return; } if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) { return; // nothing to do } long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + offsetInBlock / bytesPerChecksum * checksumSize; if (out != null) { out.flush(); } if (checksumOut != null) { checksumOut.flush(); } // If this is a partial chunk, then read in pre-existing checksum if (offsetInBlock % bytesPerChecksum != 0) { LOG.info("setBlockPosition trying to set position to " + offsetInBlock + " for " + block + " which is not a multiple of bytesPerChecksum " + bytesPerChecksum); computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum); } if (LOG.isDebugEnabled()) { LOG.debug("Changing block file offset of block " + block + " from " + datanode.data.getChannelPosition(block, streams) + " to " + offsetInBlock + " meta file offset to " + offsetInChecksum); } // set the position of the block file datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum); } /** * reads in the partial crc chunk and computes checksum * of pre-existing data in partial chunk. */ private void computePartialChunkCrc(long blkoff, long ckoff, int bytesPerChecksum) throws IOException { // find offset of the beginning of partial chunk. // int sizePartialChunk = (int) (blkoff % bytesPerChecksum); int checksumSize = checksum.getChecksumSize(); blkoff = blkoff - sizePartialChunk; LOG.info("computePartialChunkCrc sizePartialChunk " + sizePartialChunk + " " + block + " offset in block " + blkoff + " offset in metafile " + ckoff); // create an input stream from the block file // and read in partial crc chunk into temporary buffer // byte[] buf = new byte[sizePartialChunk]; byte[] crcbuf = new byte[checksumSize]; FSDataset.BlockInputStreams instr = null; try { instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length); } finally { IOUtils.closeStream(instr); } // compute crc of partial chunk from data read in the block file. partialCrc = new PureJavaCrc32(); partialCrc.update(buf, 0, sizePartialChunk); LOG.info("Read in partial CRC chunk from disk for " + block); // paranoia! verify that the pre-computed crc matches what we // recalculated just now if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) { String msg = "Partial CRC " + partialCrc.getValue() + " does not match value computed the " + " last time file was closed " + FSInputChecker.checksum2long(crcbuf); throw new IOException(msg); } //LOG.debug("Partial CRC matches 0x" + // Long.toHexString(partialCrc.getValue())); }

上述准备工作完成后,receivePacket()把请求的数据包发送到下游的数据节点,通过这种方式,写数据的数据包就会流经数据流管道上的所有数据节点。代码如下

 if (mirrorOut != null && !mirrorError) {
      try {
        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
        mirrorOut.flush();
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }

接下来要提及的是数据长度为0的数据包,这是一种特殊的数据包,当客户端长时间没有输出数据时,就会发送这样的心跳包,用于维护客户端和数据流管道上的各个数据节点的TCP连接。,如果数据包的长度不是0,就要将请求包中的数据和校验和写入文件。代码如下:

 if (len == 0) {
      LOG.debug("Receiving empty packet for " + block);
    } else {
      offsetInBlock += len;

      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                            checksumSize;

      if ( buf.remaining() != (checksumLen + len)) {
        throw new IOException("Data remaining in packet does not match " +
                              "sum of checksumLen and dataLen");
      }
      int checksumOff = buf.position();
      int dataOff = checksumOff + checksumLen;
      byte pktBuf[] = buf.array();

      buf.position(buf.limit()); // move to the end of the data.

      /* skip verifying checksum iff this is not the last one in the 
       * pipeline and clientName is non-null. i.e. Checksum is verified
       * on all the datanodes when the data is being written by a 
       * datanode rather than a client. Whe client is writing the data, 
       * protocol includes acks and only the last datanode needs to verify 
       * checksum.
       */
      if (mirrorOut == null || clientName.length() == 0) {
        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
      }

      try {
        if (!finalized) {
          //finally write to the disk :
          out.write(pktBuf, dataOff, len);

          // If this is a partial chunk, then verify that this is the only
          // chunk in the packet. Calculate new crc for this chunk.
          if (partialCrc != null) {
            if (len > bytesPerChecksum) {
              throw new IOException("Got wrong length during writeBlock(" + 
                                    block + ") from " + inAddr + " " +
                                    "A packet can have only one partial chunk."+
                                    " len = " + len + 
                                    " bytesPerChecksum " + bytesPerChecksum);
            }
            partialCrc.update(pktBuf, dataOff, len);
            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
            checksumOut.write(buf);
            LOG.debug("Writing out partial crc for data len " + len);
            partialCrc = null;
          } else {
            checksumOut.write(pktBuf, checksumOff, checksumLen);
          }
          datanode.myMetrics.incrBytesWritten(len);
          /// flush entire packet before sending ack
          flush();

          // update length only after flush to disk
          datanode.data.setVisibleLength(block, offsetInBlock);
          dropOsCacheBehindWriter(offsetInBlock);
        }
      } catch (IOException iex) {
        datanode.checkDiskError(iex);
        throw iex;
      }
    }

    // put in queue for pending acks
    if (responder != null) {
      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                      lastPacketInBlock); 
    }

    if (throttler != null) { // throttle I/O
      throttler.throttle(payloadLen);
    }

    return payloadLen;

主要关注点包括:客户端写数据的时候,只有最后一个数据节点需要通过verifyChunks()方法对数据进行校验,数据流管道中的其他节点不需要执行这项检查。校验发现数据有问题,会上报名字节点,并抛出异常通知调用者BlockReceiver.receivePacket();数据包如果成功接收并顺利写入磁盘后,则需要将处理的结果,通过PacketResponder.enqueue()方法,发送给PacketResponder线程,并由该线程继续后面的处理。

相关TAG标签 数据 节点 源码
上一篇:Hadoop源码分析——数据节点写数据1
下一篇:Openstackliberty创建实例快照源码分析1
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站