반응형

10.0.0.171    kafka01 zookeeper01

10.0.0.172    kafka02 zookeeper02

10.0.0.173    kafka03 zookeeper03


192.168.30.171    kafka01 zookeeper01

192.168.30.172    kafka02 zookeeper02

192.168.30.173    kafka03 zookeeper03


## kafka vm 생성

$ openstack flavor create --id ka1 --ram 8192 --disk 160 --vcpus 2 kafka


$ openstack server create --image 7498cf9d-bd2e-4401-9ae9-ca72120272ed \

       --flavor ka1  --nic net-id=03a6de58-9693-4c41-9577-9307c8750141,v4-fixed-ip=10.0.0.171 \

       --key-name magnum-key --security-group default kafka01

$ openstack ip floating create --floating-ip-address 192.168.30.171 public

$ openstack ip floating add 192.168.30.171 kafka01





## Oracle Java 8 설치

$ sudo add-apt-repository ppa:webupd8team/java

$ sudo apt-get update

$ sudo apt-get install oracle-java8-installer


## 여러 버전을 Java 를 설치했을 때 관리

$ sudo update-alternatives --config java





## zookeeper 설치

## https://zookeeper.apache.org/doc/r3.4.9/zookeeperStarted.html

## http://apache.mirror.cdnetworks.com/zookeeper/zookeeper-3.4.9/


$ mkdir -p downloads && cd downloads

$ wget http://apache.mirror.cdnetworks.com/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz


$ sudo tar -C /usr/local -xzvf zookeeper-3.4.9.tar.gz

$ cd /usr/local

$ sudo ln -s zookeeper-3.4.9/ zookeeper


$ vi /usr/local/zookeeper/conf/zoo.cfg

tickTime=2000

dataDir=/var/lib/zookeeper

clientPort=2181

initLimit=5

syncLimit=2

server.1=zookeeper01:2888:3888

server.2=zookeeper02:2888:3888

server.3=zookeeper03:2888:3888


$ vi /usr/local/zookeeper/bin/zkEnv.sh

56     ZOO_LOG_DIR="/var/log/zookeeper"


$ sudo mkdir -p /var/log/zookeeper && sudo chown -R stack.stack /var/log/zookeeper


## zookeeper myid 는 서버마다 지정

$ sudo mkdir -p /var/lib/zookeeper && sudo chown -R stack.stack /var/lib/zookeeper

$ vi /var/lib/zookeeper/myid

1


$ vi ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-8-oracle

export ZOOKEEPER_HOME=/usr/local/zookeeper

PATH=$PATH:$ZOOKEEPER_HOME/bin


$ . ~/.bashrc

$ zkServer.sh start


## zookeeper 설치 확인

$ zkCli.sh -server zookeeper01:2181





## Kafka 설치

## https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04


## https://kafka.apache.org/downloads.html

## https://kafka.apache.org/documentation.html


$ cd downloads

$ wget http://apache.mirror.cdnetworks.com/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz

$ sudo tar -C /usr/local -xzvf kafka_2.11-0.10.0.1.tgz

$ cd /usr/local && sudo chown -R stack.stack kafka_2.11-0.10.0.1

$ sudo ln -s kafka_2.11-0.10.0.1/ kafka


## broker id 는 서버마다 고유하게 줘야 함

$ vi /usr/local/kafka/config/server.properties

20 broker.id=0

56 log.dirs=/var/lib/kafka

112 zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181

117 delete.topic.enable = true


$ sudo mkdir -p /var/lib/kafka && sudo chown -R stack.stack /var/lib/kafka

$ sudo mkdir -p /var/log/kafka && sudo chown -R stack.stack /var/log/kafka


$ vi ~/.bashrc

export KAFKA_HOME=/usr/local/kafka

PATH=$PATH:$KAFKA_HOME/bin


$ . ~/.bashrc

$ nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties > /var/log/kafka/kafka.log 2>&1 &


## kafkaT : kafka cluster 관리

$ sudo apt-get -y install ruby ruby-dev build-essential

$ sudo gem install kafkat --source https://rubygems.org --no-ri --no-rdoc

$ vi ~/.kafkatcfg

{

  "kafka_path": "/usr/local/kafka",

  "log_path": "/var/lib/kafka",

  "zk_path": "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181"

}


## kafka partition 보기

$ kafkat partitions


## kafka data 테스트

$ echo "Hello, World" | kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic TutorialTopic > /dev/null

$ kafka-console-consumer.sh --zookeeper zookeeper01:2181,zookeeper02:2181,zookeeper03:2181 --topic TutorialTopic --from-beginning


$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

$ kafka-topics.sh --list --zookeeper localhost:2181


$ kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message


$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning


## Replica 3 테스트

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:

    Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1


$ kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1

my test message 2

^C


$ kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic


## 서버 한대 다운

$ kafka-server-stop.sh


$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:

    Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,1


$ kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic


## 토픽 삭제

$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic my-replicated-topic

$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic TutorialTopic









반응형
Posted by seungkyua@gmail.com
,
반응형

사전에 hadoop 을 먼저 설치합니다.

http://www.ahnseungkyu.com/150


1. git source 받기

$ git clone https://git-wip-us.apache.org/repos/asf/tajo.git tajo


2. ubuntu 12.04 LTS 에서 git 으로 apache https 접속 에러시 해결

error: gnutls_handshake() failed: A TLS packet with unexpected length was received. while accessing https://git-wip-us.apache.org/repos/asf/tajo.git/info/refs

fatal: HTTP request failed


$ sudo apt-get install build-essential fakeroot dpkg-dev

$ mkdir ~/git-openssl

$ cd ~/git-openssl

$ sudo apt-get source git

$ sudo apt-get build-dep git

$ sudo apt-get install libcurl4-openssl-dev

$ sudo dpkg-source -x git_1.7.9.5-1.dsc

$ cd git-1.7.9.5

$ sudo vi debian/control


:%s/libcurl4-gnutls-dev/libcurl4-openssl-dev/g              # 참조 파일을 변경


$ sudo dpkg-buildpackage -rfakeroot -b


# 테스트 에러가 발생하면debian/rules 파일에서 Test 삭제

$ sudo vi debian/rules

TEST=test                # 해당 라인 삭제


sudo dpkg -i ../git_1.7.9.5-1_amd64.deb


3. Tajo 소스 빌드

$ cd tajo

$ mvn clean package -DskipTests -Pdist -Dtar


4. Tajo 바이너리 설치 (현재 버전은 0.9.0 임)

$ cd

$ tar xzvf /home/stack/Git/tajo/tajo-dist/target/tajo-0.9.0-SNAPSHOT.tar.gz


5. tajo-env.sh 설정

$ cd tajo-0.9.0-SNAPSHOT

$ vi conf/tajo-env.sh


export HADOOP_HOME=/home/stack/hadoop-2.4.0

export JAVA_HOME=/usr/local/jdk1.7.0_51


6. tajo 실행

$ cd bin

$ ./start-tajo.sh


7. 테스트

$ mkdir -p table1

$ cd table1

$ cat > data.csv

1|abc|1.1|a

2|def|2.3|b

3|ghi|3.4|c

4|jkl|4.5|d

5|mno|5.6|e

<CTRL + D>


# hadoop fs 에 올리기

$ hadoop fs -moveFromLocal data.csv /

$ hadoop fs -ls /

Found 1 items 

-rw-r--r--   3 stack supergroup         60 2014-06-05 17:32 /data.csv


# tajo 로 검색하기

$ cd ../bin

$ ./tsql


# 로컬파일로 테이블 생성하기

default> create external table table1 (id int, name text, score float, type text) using csv with ('csvfile.delimiter'='|') location 'file:/home/stack/tajo-0.9.0-SNAPSHOT/table1';


# hdfs 로 테이블 생성하기

default> create external table hdfs_table1 (id int, name text, score float, type text) using csv with ('csvfile.delimiter'='|') location 'hdfs://localhost:9000/data.csv';


default> \d table1

default> select * from hdfs_table1 where id > 2;



반응형
Posted by seungkyua@gmail.com
,
반응형

ubuntu 12.04 LTS 기반으로 설치


1. Java 설치

    http://www.ahnseungkyu.com/139


2. 패키지 설치

$ sudo apt-get install build-essential maven cmake libssl-dev


3. proxy 를 사용한다면 다음을 수정

$ vi /home/stack/.m2/settings.xml


<settings>

  <proxies>

    <proxy>

      <active>true</active>

      <protocol>http</protocol>

      <host>xx.xx.xx.xx</host>

      <port>8080</port>

      <nonProxyHosts>localhost|127.0.0.1|192.168.75.136|192.168.230.136|ubuntu</nonProxyHosts>

...


$ cd /Hadoop-src/hadoop-2.4.0-src/hadoop-hdfs-project/hadoop-hdfs-httpfs/downloads

$ wget http://archive.apache.org/dist/tomcat/tomcat-6/v6.0.36/bin/apache-tomcat-6.0.36.tar.gz


$ keytool -v -alias mavensrv -import \

> -file /usr/share/ca-certificates/extra/XXX.crt \

-keystore trust.jks


4. protocol buffer 소스 다운로드, 컴파일, 설치 (2.5 이상을 설치)

$ wget https://protobuf.googlecode.com/files/protobuf-2.5.0.tar.gz

$ tar xvfz protobuf-2.5.0.tar.gz

$ cd protobuf-2.5.0

$ ./configure

$ make

$ sudo make install                              # /usr/local/lib 에 관련 라이브러리가 설치됨

$ sudo ldconfig


5. Hadoop 소스 다운로드 및 패키징

$ wget http://apache.mirror.cdnetworks.com/hadoop/common/hadoop-2.4.0/hadoop-2.4.0-src.tar.gz

$ tar xvfz hadoop-2.4.0-src.tar.gz

$ cd hadoop-2.4.0-src

$ mvn package -Pdist,native -DskipTests -Dtar -X


6. 소스 파일 및 컴파일 된 바이너리 파일 찾기

$ cd ./hadoop-dist/target


$ cp -R ./hadoop-2.4.0/ ~/.


7. 하둡 환경변수 설정

$ vi ~/.bashrc


# Hadoop

export HADOOP_PREFIX="/home/stack/hadoop-2.4.0"

export PATH=$PATH:$HADOOP_PREFIX/bin

export PATH=$PATH:$HADOOP_PREFIX/sbin

export HADOOP_MAPRED_HOME=${HADOOP_PREFIX}

export HADOOP_COMMON_HOME=${HADOOP_PREFIX}

export HADOOP_HDFS_HOME=${HADOOP_PREFIX}

export YARN_HOME=${HADOOP_PREFIX}


# Native Path

export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_PREFIX}/lib/native

export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib/native"


$ source ~/.bashrc


8. local 에서 ssh 자동 접속 설정

$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys


9. hadoop-env.sh 설정

$ vi $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh


export JAVA_HOME="/usr/local/jdk1.7.0_51"

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_PREFIX/lib/native

export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib/native"


10. yarn-env.sh 설정

$ vi $HADOOP_PREFIX/etc/hadoop/yarn-env.sh


export HADOOP_YARN_USER=${HADOOP_YARN_USER:-yarn}

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_PREFIX/lib/native

export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib/native"


11. Hadoop 데이터 및 시스템 파일을 저장할 디렉토리 생성 (hdfs-site.xml, mapred-site.xml 파일 참조)

$ mkdir -p ${HADOOP_PREFIX}/hadoop/dfs/name

$ mkdir -p ${HADOOP_PREFIX}/hadoop/dfs/data

$ mkdir -p ${HADOOP_PREFIX}/hadoop/mapred/system

$ mkdir -p ${HADOOP_PREFIX}/hadoop/mapred/local


12. core-site.xml 설정

$ vi $HADOOP_PREFIX/etc/hadoop/core-site.xml


<configuration>

    <property>

        <name>fs.default.name</name>

        <value>hdfs://localhost:9000</value>

        <final>true</final>

    </property>

</configuration>


13. hdfs-site.xml 설정

$ vi $HADOOP_PREFIX/etc/hadoop/hdfs-site.xml


<configuration>

    <property>

        <name>dfs.namenode.name.dir</name>

        <value>file:/home/stack/hadoop-2.4.0/hadoop/dfs/name</value>

        <final>true</final>

    </property>


    <property>

        <name>dfs.datanode.data.dir</name>

        <value>file:/home/stack/hadoop-2.4.0/hadoop/dfs/data</value>

        <final>true</final>

    </property>


    <property>

        <name>dfs.permissions</name>

        <value>false</value>

    </property>

</configuration>


14. mapred-site.xml 설정

$ cp $HADOOP_PREFIX/etc/hadoop/mapred-site.xml.template $HADOOP_PREFIX/etc/hadoop/mapred-site.xml

$ vi $HADOOP_PREFIX/etc/hadoop/mapred-site.xml


<configuration>

    <property>

        <name>mapreduce.framework.name</name>

        <value>yarn</value>

    </property>


    <property>

        <name>mapred.system.dir</name>

        <value>file:/home/stack/hadoop-2.4.0/hadoop/mapred/system</value>

        <final>true</final>

    </property>


    <property>

        <name>mapred.local.dir</name>

        <value>file:/home/stack/hadoop-2.4.0/hadoop/mapred/local</value>

        <final>true</final>

    </property>

</configuration>


15. yarn-site.xml 설정

$ vi $HADOOP_PREFIX/etc/hadoop/yarn-site.xml


<configuration>


<!-- Site specific YARN configuration properties -->

    <property>

        <name>yarn.nodemanager.aux-services</name>

        <value>mapreduce_shuffle</value>

    </property>


    <property>

        <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

        <value>org.apache.hadoop.mapred.ShuffleHandler</value>

    </property>

</configuration>


16. NameNode 포맷

hdfs namenode -format


17. 데몬 실생

$ cd ${HADOOP_PREFIX}/sbin


# hdfs 데몬 실행

$ start-all.sh


# yarn 데몬 실행

$ start-yarn.sh


18. 하둡 데몬 확인 (Pseudo-Distributed Mode 일 때 5개가 떠 있어야 함)

$ jps


13861 NameNode                             # 네임노드

14347 SecondaryNameNode              # 세컨더리 네임노드

14070 DataNode                               # 데이터 노드

14526 ResourceManager                  # yarn 리소스 매니저 (네임노드)

14745 NodeManager                        # yarn 노드 매니저 (데이터노드)


# yarn Resource Manager 접속

http://localhost:8088


# yarn Node Manager 접속

http://localhost:8042/node


# 네임노드 접속

http://localhost:50070


# 노드 리포트

$ hdfs dfsadmin -report





반응형
Posted by seungkyua@gmail.com
,
반응형
테이블명 : tb_emprelation, 컬럼패밀리 source, target, read

hbase(main):001:0>create 'tb_emprelation', {NAME=>'source'}, {NAME=>'target'}, {NAME=>'read'}

source 컬럼패밀리에 mail 컬럼에 값을 넣을 경우
hbase(main):002:0>put 'tb_emprelation', 'source:email', 'skanddh@gmail.com'
반응형
Posted by seungkyua@gmail.com
,
반응형
HBase eclipse 환경 세팅

1. 새로운 워크스페이스를 만든다.
2.  HBaseEmptyWorkspace.tar.gz 을 다운받아 새로운 워크스페이스에  .metadata 드렉토리를 푼다.
3. 서브버전을 eclipse software install로 설치한다.  (SubClipse Site 에 것을 설치하면 됨)
4. Subclipse 를 설치하면 CollabNet Subversion 을 다운받아 JavaHL을 추가로 설치한다.
    CollabNet Subversion은 /opt/subversion 경로로 설치된다.
5. SVN Repositoty 에서 http://svn.apache.org/repos/asf/hbase/ 를 Root URL로 세팅한다.
6. eclipse 의 Project -> Build Automatically 를 해제.
7. checkout 으로 New Project Wizard 를 선택 ->  Java Project -> HBase-trunk로 프로젝트 이름을 세팅
8. 디폴트 아웃풋 폴더로 PROJECT_HOME/build/eclipse-classes 로 폴더를 생성하여 지정 

9. cmd 로 워크스페이스의 HBase-trunk 폴더로 이동하여 mvn eclipse:eclipse 를 치면 .project 파일이 다시 생성됨


Ant 스트립트

<project name="EmpRelationHadoop" default="compile">

<property name="classes.dir" value="build/classes"/>

<property name="jar.dir" value="build/jar"/>

<property name="library.dir" value="/Users/skanddh/.m2/repository"/>


<path id="libraries">

        <fileset dir="${library.dir}">

            <include name="**/*.jar"/>

        </fileset>

    </path>

    <target name="clean">

        <delete dir="build/classes"/>

    <delete dir="build/jar"/>

    </target>


    <target name="compile">

        <mkdir dir="${classes.dir}"/>

        <javac srcdir="src" destdir="${classes.dir}" classpathref="libraries"/>

    </target>


    <target name="jar" depends="compile">

        <mkdir dir="${jar.dir}"/>

        <jar destfile="${jar.dir}/EmpRelationHadoop.jar" basedir="${classes.dir}">

            <manifest>

                <attribute name="Main-Class" value="hbase.RelationCalculator"/>

            </manifest>

        </jar>

    </target>


    <!-- target name="run">

        <java jar="${jar.dir}/EmpRelationHadoop.jar" fork="true"/>

    </target -->


</project>




반응형
Posted by seungkyua@gmail.com
,
반응형
hadoop 의 구성을 보면 Namenode 와 Datanode 가 있다. Namenode 를 일종의 마스터로 보면 Datanode 는 슬레이브로 볼 수 있으며 Namenode 와 Datanode 사이에는 주기적으로 통신을 한다.

Datanode 가 파일 시스템 정보나 기타 헬스체크를 주기적으로 Namenode 로 보내는데 이 때의 통신방법은 RPC 이다.
Namenode 가 기동 될 때 RPC Server 가 같이 기동되며 Namenode 는 DatanodeProtocol 인터페이스를 구현하였기 때문에 Datanode 가 이 메소드를 호출하는 RPC 통신을 하게 된다.

RPC 통신 구현 방법은 간단한데 Datanode 가 적합한 메소드(DatanodeProtocol 메소드 중의 하나)를 호출하면 ipc.Client 클래스를 이용하여 메소드 명,  파라미터 갯수, 루프를 돌면서 (파라미터 타입, 파라미터 값) 을 보내면 -RPC.Invocation클래스를 이용하면 관련 정보를 순서대로 보낸 - Namenode 는 ipc.Server 를 통해서 메소드명, 파라미터 갯수, 루프를 돌면서 (파라미터 타입, 파라미터 값) 을 받아 - RPC.Invation클래스를 이용하면 순서대로 받을 수 있다 - Namenode 의 적합한 메소드(DatanodeProtocol 메소드 중의 하나)를 invoke 한다.
invoke 후에는 리턴 값을 받아서 ipc.Server 를 통해 리턴 클래스명, 리턴 클래스의 값을 Datanode 에 보낸다.
Datanode 는 ipc.Client 를 통해서 리턴 클래스명, 리턴 클래스의 값을 받아 처음에 호출한  메소드의 결과값으로 리턴된다.

RPC의 핵심은 Proxy 클래스와 Reflection API 를 사용한다는 것. 
반응형
Posted by seungkyua@gmail.com
,
반응형
Datanode 는 Proxy.newProxyInstance(...) 로 DatanodeProtocol 인터페이스를 구현한 Proxy 객체를 구해서
다른 데이터노드와 통신한다.

이렇게 되면 DatanodeProtocol 의 메소드를 호출 할 때 마다 Proxy 객체 생성시에 저장된 InvokerHandler.invoke 메소가 호출된다. 결국 중요한 것은  InvokerHandler.invoke  메소드이다. (실제 작업하게 되는 메소드)

ObjectWritable value = (ObjectWritable) CLIENT.call(new Invocation(method, args), address);
return value.get();

Invocation 클래스는 대충 호출할 메소드와 파라미터를 저장한 것이라고 하면 CLIENT 클래스가 중요하다.

CLIENT 가 call 을 호출하면 내부적으로는 다음과 같은 진행을 한다.
1. 쓰레드 타입의 Connection 객체 생성하여 재사용을 위해 Hashtable 에 저장하고 쓰레드를 시작한다.
    Connection 객체에는 InetSocketAddr을 이용하여 DataInputStream 과 DataOutputStream 을 가지고 있다.
2. Call(param) 으로 call 객체를 파라미터만 저장하여 생성
3. call 객체를 Hashtable 에 저장하고 out.writeInt(call.id); call.param.write(out) 와 같이 파라미터를 보낸다.
4. Connection 쓰레드의 run 이 끝나서 결과를 받으르 때까지 타임아웃만큼 기다린다.  call.wait(timeout);
5. Connection 쓰레드의 run 메소드에서 id = in.readInt(); 후 Hashtable 의 call 객체를 remove 한다.
6. in.readBoolean(); 으로 에러여부를 읽고 value.readFields(in); value 값을 읽어 들인다.
7. call 객체에 value 를 세팅하고 call 객체의 notify() 를 호출한 후 Connection 쓰레드를 Hashtable에서
    제거한 후 쓰레드를 종료한다.
반응형
Posted by seungkyua@gmail.com
,
반응형
configuration 의 "dfs.data.dir" 의 값으로 DataNode와 그에 속한 쓰레드를 실행하며 값은space 나 comma 로 분리되고 분리된 갯수 만큼 DataNode를 실행한다. (ServerSocket 을 전달함)

DataNode 생성자에서 서버소켓을 실행, 서버 포트는 "dfs.datanode.port" 의 값으로 시작되며 디폴트는 50010 이고 서버가 갯수 만큼 실행 될 때 +1 씩 포트가 증가한다.

DataNode 가 갖는 쓰레드
1. 자체 Thread : Namenode에 Heartbeat, blockReport 를 보냄
2. DataXceiveServer : ServerSocket 을 전달받아 클라이언트 접속이 있으며 DataXceiver 스레드를 만든다.
3. DataXceiver  : 클라이언트가 보낸 데이터를 읽는다. (in : InputStream, reply : OutputStream)
    1. in.read() : 1 byte 를 읽어서 operation 을 알아낸다.
    operation 이 write block 일 경우
    2. in.readBoolean() : 리포트를 해야할 블럭인지 boolean 값을 읽어낸다.
    3. block.readFields(in) : block id와 block의 길이 len 을 읽는다.
    4. in.readInt() : block 을 복사할 타켓의 갯수를 읽는다.
    5. DatanodeInfo.readFields(in) : 데이터 노드명, 용량, 남은용량, 마지막 업데이트 값을 타겟수만큼 읽는다.
    6. in.read() : 인코딩 타입을 1 byte 읽는다.
    7. in.readLong() : 이후 읽어야할 실제 데이터 길이 len 을 읽는다.

    현재 데이터노드 서버에 파일을 생성하여 outputstream 을 만든다. 
    첫번째는 자신의 데이터 노드 이므로 다음 타겟인 데이터노드와 소켓으로 접속하여 out2 와 in2 를 생성한다.
    1. out2.write(OP_WRITE_BLOCK) : 1 byte 값이 block write operation을 보낸다.
    2. out2.writeBoolean(shouldReportBlock) 으로 리포트 해야할 블럭인지를 보낸다.
    3. block.wirte(out2) : block id와 block의 길이 len 을 쓴다.
    4. out2.writeInt(target.length -1) : 복사해야할 남은 타겟의 갯수를 쓴다.
    5. DatanodeInfo.write(out2) : 루프를 돌면서 남은 데이터노드의 정보를 보낸다.
    6. out2.write(encodingType) : 1 byte 의 인코딩 타입값을 쓴다.
    7. out2.writeLong(len) : 데이터 길이 len 을 쓴다.

    8. in.read(buf, 0, len) : 루프를 돌면서 데이터를 다 읽는다.
    9. out.write(buf,0, len), out2.write(buf, 0, len) : 루프를 돌면서 로컬 파일(out) 과 out2 에 데이터를 쓴다.

    10. out2.flush() : 복사하는 타겟 out2 를 flush 한다.
    11. in2.readLong() : long 값의 WRITE_COMPLETE 를 읽는다. 복사된 타겟이 잘 전달됐는지 확인
    12. LocatedBlock.readFields(in2) : block 을 읽고 DatanodeInfo의 갯수를 읽고 루프돌면서 DatanodeInfo 읽음
                                                     이것으로 복사된 타겟의 정보를 얻을 수 있음
    13. data.finalizeBlock(b) : DataNode 의 속성인 FSDataset 의 dirTree 에 block과 파일을 저장
    14. receivedBlockList.add(b) : Namenode에 보고할 block을 리스트에 저장한다. 
                                               리스트에 저장되면 Datanode 자체쓰레드에서 관련 정보를 Namenode에 보냄
    15. reply.writeLong(WRITE_COMPLETE) : 클라이언트에게 complete 메세지를 보냄
    16. LocatedBlock.write(reply) : block, DatanodeInfo 갯수, 루프를 돌면서 DatanodeInfo.write 를 보낸다.
반응형
Posted by seungkyua@gmail.com
,
반응형
DatanodeInfo 속성은?
1. UTF8 name : <host>:<port> 로 구성된 값
2. long capacityBytes : 총 수용 가능한 bytes 수
3. long remainingBytes : 수용 가능한 남은 bytes 수
4. long lastUpdate : capacityBytes, remainingBytes 값을 세팅한 시간 System.currentTimeMillis()
5. TreeSet blocks : Block 타입의 데이터를 저장

Block 타입의 속성은?
1. long blkid : "blk_" 시작하는 파일명 뒤의 long 값을 저장
2. len : block 파일의 크기

Block 을 write 할 때의 순서
1. out.writeLong(blkid); 
2. out.writeLong(len);

DatanodeInfo 를 write 할 때의 순서
1. name.write(out);
2. out.writeLong(capacityBytes);
3. out.writeLong(remainingBytes);
4. out.writeLong(lastUpdate);

여기서 UTF8 은 어떻게 write 할까?

UTF8 타입의 속성?
1. byte[] bytes : byte 값
2. length : String 을 utf8 bytes 로 변환할 때 char 가 1,2,3 byte 로 변환될 수 있으므로 변환한 String의 길이

UTF8 write 순서
1. out.writeShort(length);
2. out.write(bytes, 0, length);
반응형
Posted by seungkyua@gmail.com
,
반응형
UTF8 타입에는 byte[] 와 length 를 가지고 있다.

1. set(String string) 을 사용하여 string -> byte[] 로 저장할 때 DataOutputBuffer 사용
    String -> charAt(i) 루프를 돌면서 utf8 byte 로 변환하여 DataOutputBuffer 의 byte[] 에 저장 ->
    System.arraycopy로 UTF8 byte[] 에 복사

2. toString() 으로 byte[] -> String 으로 리턴할 때 DataInputBuffer 와 DataOutputBuffer 사용
    UTF8 byte[] -> DataInputBuffer 의 byte[] 에 저장 -> 
    DataInputBuffer를 argument로  DataOutputBuffer 의 byte[] 에 저장 (out.write(in)) -> 
    DataOutputBuffer의 byte[] 를 StringBuffer에 append 하여 리턴

여기서.. 의문.. 왜 2 번에서 UTF8 byte[] 를 DataOutputBuffer byte[] 에 arraycopy를 하지 않을까?

간단함.. 그냥 그런 api 를 만들지 않았음.. byte[] 를 받아서 저장하는 것은 DataInputBuffer 에만 존재
DataOutputBuffer 는 DataInputBuffer 를 이용해서 byte[] 를 저장.

중요한 것은 DataOuputBuffer 는 DataOutputStream 을 상속받았지만 DataOutputStream 에는 없는 buffer 기능을 추가하였음. (내부 클래스로 ByteArrayOutputStream 을 상속받은 Buffer 클래스가 존재)

DataInputBuffer 도 DataInputStream 을 상속받있지만 똑같이 buffer 기능을 추가.

반응형
Posted by seungkyua@gmail.com
,