大数据系统-数据采集架构及开发流程 置顶!

  |   0 评论   |   0 浏览

数据采集 整体开发文档


集群规模设计

如何确定集群规模

确定集群数据来源

  1. 前端埋点产生的日志数据
  2. 业务系统日常进行业务产生的业务数据

集群的数据规模

根据日活用户量确定每日产生日志数据规模
如果当前项目整体用户注册量为2000w左右,那么如果是非活跃项目类型。那么日活用户应该在100W左右。

以每个人每天产生100条日志数据,一条数据按1k计算。

100W * 100 * 1k / 1024 / 1024 = 95.36G

每天产生的数据即为95G左右。
根据日活确定业务数据量
日活用户100W的话,计算当日产生业务比如下单的人数大概为10W,每次业务产生10条数据。

10W * 10 *1 k /1024/1024 = 0.953G

计算数仓要存储的数据量
日志数据每日 95.36G + 业务数据 0.953 G = 	96.313
取一个冗余值100G,但是数据在进入数仓的时候会进行压缩。

ODS层进行LZO压缩,压缩率为10%左右。ODS层只需要10G

DWD层进行LZO压缩和parquet列式存储 ,10%左右压缩率,DWD层也只要10G

DWS+DWT层,不会进行压缩,但是数据经过了处理过滤。会减少。 应该有50G左右。

HDFS的备份策略,会将数据备份三份。因此,总数据量要*3

因此,保存1年的数据 就是 365 * (10+10+50)G /1024 *3= 74. 8T 数据

再留下30%的存储冗余 74.8 /0.7 = 106.9T

计算Kafka需要的存储量

日志数据每日100G,kafka设定保存3天数据量 300G

kafka有副本机制 留一个副本 300G*2 =600G

再留30%冗余 600G/0.7 = 1T

服务器性能指标

20核CPU 40线程 128G内存 16T硬盘

服务器数量规划

106.9T 数据 + 1T / 16T ≈ 7 台

1G内存可算128M的数据 1一个线程。

100G数据需要800G的内存。 800/128 ≈ 7

1G数据是8个128M,8个计算块。 100G数据 800个块需要800个线程 800/40 = 20台

从内存和存储上看,需要7台服务器。

从CPU上看,需要20台。但是如果yarn配置每个线程虚拟为2个核心的话。只需要10台。

那么就按10台来进行集群的设置。

集群节点安排

1(204)2(205)3(206)4(207)5(208)6(209)7(210)8(211)9(212)10(213)11(214)12(215)
------------
NameNodeNameNodeDataNodeDataNodeDataNodeDataNodeDataNodeDataNodeDataNodeDataNode
DNDNRMRMNMNMNMNMNMNM
NMNMNMNM
ZKZKZK
KAFKAKAFKAKAFKA
FLUMEFLUMEFLUME
HbaseHbaseHbase
HIVEHIVE
SPARKSPARK
MysqlMysql MysqlFlume
AzkabanZabbixSqoop 业务数据库日志数据库

Hive,Spark,Mysql这种对外暴露接口的客户端放在一起。

ZK,Kafka,Flume这种之间IO操作比较多的放在一起

消耗内存的分开存放比如Hbase不和其他组件放一起

架构设计

采集架构设计

image-20200907205714195

实际上测试开发没有用两个日志服务器

数据采集目标

前端埋点

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

主要取的埋点数据为:

曝光
启动
异常
页面
事件

埋点数据结构

普通页面日志

结构如下,每条日志包含了,当前页面的页面信息,所有事件(动作)、所有曝光信息以及错误信息。除此之外,还包含了一系列公共信息,包括设备信息,地理位置,应用信息等,即下边的common字段。

{
 "common": {         -- 公共信息
  "ar": "230000",       -- 地区编码
  "ba": "iPhone",       -- 手机品牌
  "ch": "Appstore",      -- 渠道
  "md": "iPhone 8",      -- 手机型号
  "mid": "YXfhjAYH6As2z9Iq", -- 设备id
  "os": "iOS 13.2.9",      -- 操作系统
  "uid": "485",         -- 会员id
  "vc": "v2.1.134"       -- app版本号
 },

"actions": [           --动作(事件) 
  {
   "action_id": "favor_add",  --动作id
   "item": "3",          --目标id
   "item_type": "sku_id",    --目标类型
   "ts": 1585744376605      --动作时间戳
  }
 ],

 "displays": [
  {
   "displayType": "query",    -- 曝光类型
   "item": "3",           -- 曝光对象id
   "item_type": "sku_id",     -- 曝光对象类型
   "order": 1             --出现顺序
  },

  {
   "displayType": "promotion",
   "item": "6",
   "item_type": "sku_id",
   "order": 2
  },

  {
   "displayType": "promotion",
   "item": "9",
   "item_type": "sku_id",
   "order": 3
  },

  {
   "displayType": "recommend",
   "item": "6",
   "item_type": "sku_id",
   "order": 4
  },
  {
   "displayType": "query ",
   "item": "6",
   "item_type": "sku_id",
   "order": 5
  }
 ],
 "page": {            --页面信息
  "during_time": 7648,    -- 持续时间毫秒
  "item": "3",         -- 目标id
  "item_type": "sku_id",    -- 目标类型
  "last_page_id": "login",   -- 上页类型
  "page_id": "good_detail",  -- 页面ID
  "sourceType": "promotion"  -- 来源类型
 },
"err":{           --错误
"error_code": "1234",   --错误码
  "msg": "***********"    --错误信息
},

 "ts": 1585744374423 --跳入时间戳
}

启动日志结构

相对简单,主要包含公共信息,启动信息和错误信息

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",         --icon手机图标  notice 通知   install 安装后启动
    "loading_time": 18803,  --启动加载时间
    "open_ad_id": 7,        --广告页ID
    "open_ad_ms": 3449,    -- 广告总共播放时间
    "open_ad_skip_ms": 1989   --  用户跳过广告时点
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744304000
}

集群架设

阿里云服务器购买

为了省钱,放弃惠普4W一台的物理主机,选择阿里云的虚拟机

选择阿里云的 云服务器ECS

10台 共享标准型s6, 4CPU,16G内存,80G硬盘。按照指标运算,一次最大处理数据量是10G

10台账号root 密码********

image-20200910145409992

内网ip从 204 -213 一共十台

集群基础环境设置

Windows开发机 hosts文件修改

39.XXX.XXX.XXX hadoop204
39.XXX.XXX.XXX hadoop205
39.XXX.XXX.XXX hadoop206
39.XXX.XXX.XXX hadoop207
39.XXX.XXX.XXX hadoop208
39.XXX.XXX.XXX hadoop209
39.XXX.XXX.XXX hadoop210
39.XXX.XXX.XXX hadoop211
39.XXX.XXX.XXX hadoop212
39.XXX.XXX.XXX hadoop213

Linux主机名修改,host映射修改

windows使用xshell连接阿里云主机,主机名每个都要修改

image-20200910153058006

##打开文件,将对应的名字写入 比如hadoop204
vim /etc/hostname

修改映射

sudo vim /etc/hosts
#写入
172.27.223.204  hadoop204  hadoop204
172.27.223.205  hadoop205  hadoop205
172.27.223.206  hadoop206  hadoop206
172.27.223.207  hadoop207  hadoop207
172.27.223.208  hadoop208  hadoop208
172.27.223.209  hadoop209  hadoop209
172.27.223.210  hadoop210  hadoop210
172.27.223.211  hadoop211  hadoop211
172.27.223.212  hadoop212  hadoop212
172.27.223.213  hadoop213  hadoop213
172.27.223.214  logdata214  logdata214
172.27.223.215  logdata215  logdata215
#然后wq

Linux创建大数据集群用户

先在204创建大数据集群用户

sudo useradd sunp
sudo passwd  sunp
sudo vim /etc/sudoers
## Allow root to run any commands anywhere
root    ALL=(ALL)     ALL
sunp   ALL=(ALL)     NOPASSWD:ALL

编写各种用到的脚本

#创建root的脚本库,两个库里脚本存一份
mkdir /binRoot
创建SSH复制分发秘钥脚本
vim sshcopy
#!/bin/bash

# 判断id_rsa密钥文件是否存在
if [ ! -f ~/.ssh/id_rsa ];then
 ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa
else
 echo "id_rsa has created ..."
fi

#分发到各个节点,这里分发到host文件中的主机中.
while read line
  do
    user=`echo $line | cut -d " " -f 2`
    ip=`echo $line | cut -d " " -f 1`
    passwd=`echo $line | cut -d " " -f 3`
  
    expect <<EOF
      set timeout 10
      spawn ssh-copy-id $user@$ip
      expect {
        "yes/no" { send "yes\n";exp_continue }
        "password" { send "$passwd\n" }
      }
     expect "password" { send "$passwd\n" }
EOF
  done <  $@

#chmod +x sshcopy
创建hosts文件
vim hosts
172.27.223.204 root Sq8851770
172.27.223.205 root Sq8851770
172.27.223.206 root Sq8851770
172.27.223.207 root Sq8851770
172.27.223.208 root Sq8851770
172.27.223.209 root Sq8851770
172.27.223.210 root Sq8851770
172.27.223.211 root Sq8851770
172.27.223.212 root Sq8851770
172.27.223.213 root Sq8851770
#执行
sshcopy  hosts
#使用需要安装
yum install -y expect
创建远程执行命令脚本
vim /home/sunp/bin/exeAll
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi

if [ $# -gt 1 ]
then
  echo '参数过多!一次只能执行一个命令'
  exit;
fi

#2. 遍历集群所有机器
for host in  hadoop205 hadoop206 hadoop207 hadoop208 hadoop209 hadoop210 hadoop211 hadoop212 hadoop213 hadoop204
do
  echo ====================  $host  ====================
      ssh $host "$@"
done
分发脚本
vim /home/sunp/bin/xsync
-----
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop204 hadoop205 hadoop206 hadoop207 hadoop208 hadoop209 hadoop210 hadoop211 hadoop212 hadoop213
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

#添加权限 chmod +x xsync
定点分发脚本
vim /home/sunp/bin/xsync-any
-------------
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 2 ]
then
  echo "参数不够,最少要两个参数,第一个是分发文件,第二个是分发节点"
  exit;
fi
#2. 遍历集群所有机器
for host in ${@:2}
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $1
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

#添加权限 chmod +x xsync
修改主机名的脚本

vim /home/sunp/bin/changeHostName

#!/bin/bash
#获取ip,分组获取第四个,也就是204,205
lastip=`hostname -i | cut -d. -f4`
ipname='hadoop'$lastip
echo $ipname
#先将拼接好的hostname放到文件后边
sed -i "1a $ipname" /etc/hostname
#然后删除旧的
sed -i '1d' /etc/hostname
查看集群所有节点进程脚本
vim /home/sunp/bin/ajps

#! /bin/bash

for i in hadoop204 hadoop205 hadoop206 hadoop207 hadoop208 hadoop209 hadoop210 hadoop211 hadoop212 hadoop213
do
    echo --------- $i ----------
    ssh $i "jps"
done
创建ZK启动脚本
vim zk.sh
#! /bin/bash

case $1 in
"start"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- zookeeper $i 停止 ------------  
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- zookeeper $i 状态 ------------  
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
};;
esac

# zk.sh start/stop/status
创建kafka启动脚本
vim kafka.sh 
#启动kafka必须先启动zk
#! /bin/bash

case $1 in
"start"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- kafka $i 启动 ------------
		ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
	done
};;
"stop"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- kafka $i 停止 ------------  
		ssh $i '/opt/module/kafka/bin/kafka-server-stop.sh -daemon /opt/module/kafka/config/server.properties'
	done
};;
"status"){
	for i in hadoop211 hadoop212 hadoop213
	do
        echo ---------- kafka $i 状态 ------------  
		ssh $i '/opt/module/kafka/bin/kafka-server-status.sh -daemon /opt/module/kafka/config/server.properties'
	done
};;
esac
编写整个采集通道的启停脚本
vim cluster.sh
 #! /bin/bash

case $1 in
"start"){
	echo " -------- 启动 集群 -------"

	echo " -------- 启动 hadoop集群 -------"
	/opt/module/hadoop-3.1.3/sbin/start-dfs.sh 
	ssh hadoop206 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"

	#启动 Zookeeper集群
	zk.sh start

sleep 6s;

	#启动 Flume采集集群 这个必须做连接215,215的ssh
	f1.sh start

	#启动 Kafka采集集群
	kafka.sh start

sleep 6s;

	#启动 Flume消费集群
	f2.sh start

	};;
"stop"){
    echo " -------- 停止 集群 -------"


    #停止 Flume消费集群
	f2.sh stop

	#停止 Kafka采集集群
	kafka.sh stop

    sleep 6s;

	#停止 Flume采集集群
	f1.sh stop

	#停止 Zookeeper集群
	zk.sh stop

	echo " -------- 停止 hadoop集群 -------"
	ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
	/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh 
};;
esac
#加权限
 chmod 777 cluster.sh
 cluster.sh start
 cluster.sh stop

使用脚本进行基础配置

以下全部使用root用户,在/binRoot路径下进行

  • 处理SSH免密登陆
./sshcopy hosts
  • 配置各个节点的用户
#使用exeAll脚本
./exeAll 'sudo useradd sunp'
./exeAll 'sudo passwd  sunp'
  • 将用户配置文件分发
./xsync /etc/sudoers
  • 将修改主机名脚本分发
./xsync changeHostName
  • 分发hosts配置
./xsync /etc/hosts
  • 在所有节点安装expect
exeAll 'yum install -y expect'
  • 使用exeAll脚本更改所有主机的主机名
./exeAll 'sh /binRoot/changeHostName'
  • 重启所有主机
./exeAll 'reboot'
  • 安装所有必备软件
./exeAll 'sudo yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop'
  • 创建文件夹 soft module
./exeAll 'sudo mkdir /opt/soft  /opt/module'
  • 修改soft moudle的所有者
./exeAll 'sudo chown sunp:sunp /opt/module /opt/soft'

接下来登陆 204 sunp用户

#创建用户的脚本库
mkdir /home/sunp/bin
#将root下的脚本拷贝过来
cp -r /binRoot/* /home/sunp/bin 
#复制一份hosts
cp hosts sunphosts
#将hosts里的用户名密码修改一下
vim sunphosts
172.27.223.204 sunp sunp
172.27.223.205 sunp sunp
172.27.223.206 sunp sunp
172.27.223.207 sunp sunp
172.27.223.208 sunp sunp
172.27.223.209 sunp sunp
172.27.223.210 sunp sunp
172.27.223.211 sunp sunp
172.27.223.212 sunp sunp
172.27.223.213 sunp sunp
  • 配置sunp用户的ssh免密登陆
sshcopy sunphosts
  • 将sunphosts和脚本分发到所有节点
xsync /home/sunp/bin
  • 用远程执行命令将所有节点ssh建立好
exeAll 'sh /home/sunp/bin/sshcopy /home/sunp/bin/sunphosts'

基础组件安装

安装JDK

使用jdk1.8版本,jdk是每台机器都要安装的。

阿里云没有旧版本的java,所以不需要删除旧版本。

  • 直接上传JDK就可以。使用 xftp上传 jdk-8u212-linux-x64.tar到 /opt/module
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
  • 配置环境变量
sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

#刷新
source /etc/profile

#分发jdk和配置
用root传 sh /binRoot/xsync /etc/profile.d/my_env.sh
xsync /opt/module/jdk1.8.0_212/

#刷新
sh /binRoot/exeAll 'source /etc/profile'

安装Hadoop

使用hadoop-3.1.3.tar.gz。hadoop也是每台机器都要安装的。

  • 因为是阿里云的服务器,时间同步暂不做设置
  • 上传到204,解压,安装hadoop
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
  • 配置hadoop环境变量
sudo vim /etc/profile.d/my_env.sh
在profile文件末尾添加hadoop路径:(shitf+g)
##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
  • 分发配置文件
#使用root分发配置文件
sh /binRoot/xsync /etc/profile.d/my_env.sh
  • 在204上配置hadoop的xml文件
  • 配置core.xml
cd /opt/module/hadoop-3.1.3/etc/hadoop/
vim core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop204:8020</value>
    </property>

  
    <property>
        <name>hadoop.data.dir</name>
        <value>/opt/module/hadoop-3.1.3/data</value>
    </property>

    <property>
        <name>hadoop.proxyuser.sunp.hosts</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.proxyuser.sunp.groups</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>sunp</value>
    </property>
</configuration>
  • 配置HDFS文件
vim hdfs-site.xml

<configuration>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file://${hadoop.data.dir}/name</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file://${hadoop.data.dir}/data</value>
  </property>
    <property>
    <name>dfs.namenode.checkpoint.dir</name>
    <value>file://${hadoop.data.dir}/namesecondary</value>
  </property>
    <property>
    <name>dfs.client.datanode-restart.timeout</name>
    <value>30</value>
  </property>

	<!-备份nm->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop205:9868</value>
    </property>
</configuration>
  • 配置 yarn
vim yarn-site.xml

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop206</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
    <!--单个容器可申请的最小与最大内存 涉及规整因子的计算-->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>512</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>4096</value>
    </property>
  
    <!--每个节点可用的最大内存-->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>8192</value>
    </property>
  
    <!--虚拟内存率-->
    <property>
    	<name>yarn.nodemanager.vmem-pmem-ratio</name>
    	<value>2.1</value>
    </property>
  
    <!--日志聚集-->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <property>  
        <name>yarn.log.server.url</name>  
        <value>http://hadoop204:19888/jobhistory/logs</value>  
    </property> 
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
	</property>


</configuration>

image-20200911165736572

yarn.nodemanager.vmem-pmem-ratio: 物理内存 与 虚拟内存的比率,每用1M物理内存,默认使用2.1M虚拟内存,(建议调大);

或是将 yarn.nodemanager.vmem-check-enabled 虚拟内存的检查false掉,这一点很重要!

  • 配置mapred-site & 历史服务器
vi mapred-site.xml

<!-- 历史服务器端地址 -->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop204:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop204:19888</value>
</property>

<!--配置资源调用组件使用yarn-->
<property>
	<name>mapreduce.framework.name</name>
	<value>yarn</value>
</property>
  • 配置workers
vim /opt/module/hadoop-3.1.3/etc/hadoop/workers

#在文件内添加以下内容
hadoop204
hadoop205
hadoop206
hadoop207
hadoop208
hadoop209
hadoop210
hadoop211
hadoop212
hadoop213
  • 分发xsync
xsync /opt/module/hadoop-3.1.3/
  • 群起集群
  • 如果集群是第一次启动,需要在hadoop204节点格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)
[sunp@hadoop204 hadoop-3.1.3]$ bin/hdfs namenode -format
#报错就检查是哪里出了问题
  • 启动HDFS
[sunp@hadoop204 hadoop-3.1.3]$ sbin/start-dfs.sh
  • 在配置了RM的节点(206)启动YARN
[sunp@hadoop204 hadoop-3.1.3]$ sbin/start-yarn.sh
  • web前端查看HDFS的web页面

http://hadoop204:9870/

  • web端查看sencondaryNameNode

http://hadoop205:9868/status.html

  • HDFS基本测试
[sunp@hadoop204 hadoop-3.1.3] hadoop fs -mkdir -p /user/sunp/input
#写一个wc的文件
[sunp@hadoop204 hadoop-3.1.3] hadoop fs -put /opt/module/hadoop-3.1.3/wcinput/wc.input /user/sunp/input
#做一个wordcount的测试
[sunp@hadoop204 hadoop-3.1.3] hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /user/sunp/input /user/sunp/output
  • 集群的启动停止
#各个服务组件逐一启动/停止
hdfs --daemon start/stop namenode/datanode/secondarynamenode
yarn --daemon start/stop  resourcemanager/nodemanager

#各个模块分开启动/停止(配置ssh是前提)常用
start-dfs.sh/stop-dfs.sh
start-yarn.sh/stop-yarn.sh
  • 暂不考虑HDFS多目录情况
  • Hadoop基准测试
#测试HDFS写性能 向HDFS集群写10个128M的文件
[sunp@hadoop204 hadoop-3.1.3] hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

image-20200911161459391

#测试HDFS读性能 读取HDFS集群10个128M的文件
[sunp@hadoop204 hadoop-3.1.3] hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
#删除生成数据
[sunp@hadoop204 hadoop-3.1.3]hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

image-20200911161741029

#使用Sort程序评测MapReduce
#使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数
[sunp@hadoop204 hadoop-3.1.3] hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data
#执行Sort程序
[sunp@hadoop204 hadoop-3.1.3] hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
#验证数据是否真正排好序了
[sunp@hadoop204 hadoop-3.1.3] hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

#评测需要大量硬盘空间。。硬盘空间不足会卡死。慎重
配置Hadoop使用LZO压缩

lzo的jar包是需要编译的

  • 编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/
  • 同步jar包到所有节点
xsync hadoop-lzo-0.4.20.jar
  • 配置core-site.xml文件,以支持lzo压缩
<property>
        <name>io.compression.codecs</name>
        <value>
            org.apache.hadoop.io.compress.GzipCodec,
            org.apache.hadoop.io.compress.DefaultCodec,
            org.apache.hadoop.io.compress.BZip2Codec,
            org.apache.hadoop.io.compress.SnappyCodec,
            com.hadoop.compression.lzo.LzoCodec,
            com.hadoop.compression.lzo.LzopCodec
        </value>
    </property>

    <property>
        <name>io.compression.codec.lzo.class</name>
        <value>com.hadoop.compression.lzo.LzoCodec</value>
    </property>
  • 分发core-site.xml

xsync core-site.xml

一些优化
  • HDFS参数调优hdfs-site.xml

dfs.namenode.handler.count= image-20200911180653917 比如集群规模为8台时,此参数设置为41 为10台式。参数设置为46

#NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
#对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。
<property>
    <name>dfs.namenode.handler.count</name>
    <value>46</value>
</property>

#分发,并重启hdfs,yarn

安装Zookeeper

  • 上传apache-zookeeper-3.5.7-bin.tar到211 soft
  • 解压Zookeeper安装包到/opt/module/目录下
[sunp@hadoop211 soft]tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

#修改名字
[sunp@hadoop211 module]mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
  • 配置zoo.cfg文件
#重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg
[sunp@hadoop211 conf]$ mv zoo_sample.cfg zoo.cfg
[sunp@hadoop211 conf]$ vim zoo.cfg
#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
#增加如下配置
#######################cluster##########################
server.2=hadoop211:2888:3888
server.3=hadoop212:2888:3888
server.4=hadoop213:2888:3888
  • 配置服务器编号
#在/opt/module/zookeeper-3.5.7/这个目录下创建zkData
[sunp@hadoop211 zookeeper-3.5.7]$ mkdir zkData
#在/opt/module/zookeeper-3.5.7/zkData目录下创建一个myid的文件
[sunp@hadoop211 zkData]$ vim myid
  • 分发到212,213
xsync-any /opt/module/zookeeper-3.5.7 hadoop212 hadoop213
  • 在212,213修改/opt/module/zookeeper-3.5.7/zkData/myid 内的服务器编码

212 =3 213=4 对应上班zoo.cfg的配置

zk.sh start 
#启动查看status

安装Kafka

  • 上传kafka_2.11-2.4.1.tgz 到211 soft。并解压到module
[sunp@hadoop211 soft]tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
#修改名字
[sunp@hadoop211 module]$ mv kafka_2.11-2.4.1/ kafka
#在/opt/module/kafka目录下创建logs文件夹
[sunp@hadoop211 kafka]$ mkdir logs
#修改配置文件
[sunp@hadoop211 kafka]$ cd config/
[sunp@hadoop211 config]$ vim server.properties 

#修改或者增加以下内容:
#broker的全局唯一编号,不能重复 212改为1,213改为2
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop211:2181,hadoop212:2181,hadoop213:2181/kafka
  • 配置环境变量
[sunp@hadoop211 config]$ sudo vim /etc/profile.d/my_env.sh 
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

#分发环境变量文本
[sunp@hadoop211 config]$ xsync-any /etc/profile.d/my_env.sh hadoop212 hadoop213
  • 分发kafka,修改配置
[sunp@hadoop211 module]xsync-any kafka/ hadoop212 hadoop213
#分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties
broker.id=1 / 2
  • 群起kafka,必须先群起zk。
zk.sh start
kafka.sh
  • 一些命令行操作
#创建topic
 kafka-topics.sh --create --bootstrap-server hadoop211:9092 --topic first --partitions 2 --replication-factor 2
 #选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数
#查看当前服务器中的所有topic
kafka-topics.sh --list --bootstrap-server hadoop211:9092
#删除topic
kafka-topics.sh --zookeeper hadoop211:2181/kafka --delete --topic first
#需要server.properties中设置delete.topic.enable=true否则只是标记删除。

#发送消息
kafka-console-producer.sh --broker-list hadoop211:9092 --topic first
>hello world
>sunp sunp

#消费消息
kafka-console-consumer.sh --bootstrap-server hadoop211:9092 --from-beginning --topic first

kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning:会把主题中以往所有的数据都读取出来。

#查看某个topic详情
kafka-topics.sh --bootstrap-server hadoop211:9092 --describe --topic first

#修改分区数
kafka-topics.sh --zookeeper hadoop211:2181/kafka --alter --topic first --partitions 6
  • 压力测试
#生产者测试
kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop211:9092,hadoop212:9092,hadoop213:9092

record-size是一条信息有多大,单位是字节。
num-records是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量

#消费者测试
kafka-consumer-perf-test.sh --broker-list hadoop211:9092,hadoop212:9092,hadoop213:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
参数说明:
--zookeeper 指定zookeeper的链接信息
--topic 指定topic的名称
--fetch-size 指定每次fetch的数据的大小
--messages 总共要消费的消息个数

生产测试数据

image-20200911220600346

生产吞吐11.1M/s

Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1

消费测试数据

image-20200911221101309

消费9.5367mb数据,0.8865mb/s , 消费10W条,9295条/s

Kafak分区数 = 期望吞吐量/ min(生产吞吐,消费吞吐)

安装日志服务器上的Flume

Flume需要安装几个地方,一个是日志服务器,一个是集群上的flume集群

因此,重新开一台2核8g的日志服务器。一台业务数据库服务器

image-20200912105309377

  • 连接 ip 215的服务器
  • 按 ‘安装JDK’ 一节安装jdk
  • 按配置host映射,配置集群的host
  • 文档下载

(1) Flume官网地址:http://flume.apache.org/

(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html

(3)下载地址:http://archive.apache.org/dist/flume/

上传apache-flume-1.9.0-bin.tar到日志服务器215 soft

  • 解压到module
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  • 修改文件夹名字
[root@logdata215 module]$ mv apache-flume-1.9.0-bin flume
  • 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3(日志服务器不用删除)
rm /opt/module/flume/lib/guava-11.0.2.jar
注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  • 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
[root@logdata215 conf]$ mv flume-env.sh.template flume-env.sh
[root@logdata215 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
  • 编写配置脚本
[root@logdata215 conf]vim file-flume-kafka.conf

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
#指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件)
a1.sources.r1.filegroups = f1
#// 指定监控的文件目录f1,匹配的文件信息
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
# positionFile 检查点文件路径
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
#a1.sources.r1.interceptors =  i1
#a1.sources.r1.interceptors.i1.type = com.sunp.flume.interceptor.LogInterceptor$Builder

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop211:9092,hadoop212:9092,hadoop213:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
#注意:com.sunp.flume.interceptor.LogInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
#在此我就不做拦截器了
  • 上传mock数据生成脚本到/opt/module/applog/
  • 编写日志生成启动脚本
vim lg.sh
#!/bin/bash
for i in logdata215; do
    echo "========== $i =========="
    cd /opt/module/applog/; java -jar gmall2020-mock-log-2020-04-01.jar >/dev/null 2>&1 &
done 
#注意这个脚本生成的log存在 applog下的log文件夹里。命名为当天的app+当天系统时间。但是内容是按照配置文件的时间生成的。所以。如果想要生成配置文件的app+配置文件日期。那么要把系统时间修改为那天
#加权限
chmod 777 lg.sh

#生成前记得修改application.propites
#生成数据脚本
lg.sh
  • 日志服务器Flume采集启动,停止脚本
vim f1.sh
#! /bin/bash

case $1 in
"start"){
        for i in logdata215
        do
                echo " --------启动 $i 采集flume-------"
                nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &
        done
};;
"stop"){
        for i in logdata215
        do
                echo " --------停止 $i 采集flume-------"
                ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print $2}' | xargs -n1 kill -9
        done

};;
esac

#加权限
chmod 777 f1.sh
f1.sh start
f1.sh stop

安装集群上消费kafka的flume

上传apache-flume-1.9.0-bin.tar到日志服务器215 soft

  • 解压到module
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  • 修改文件夹名字
[root@logdata211 module]$ mv apache-flume-1.9.0-bin flume
  • 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3(日志服务器不用删除)
rm /opt/module/flume/lib/guava-11.0.2.jar
注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  • 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
[root@logdata211 conf]$ mv flume-env.sh.template flume-env.sh
[root@logdata211 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
  • 编写配置脚本
[sunp@hadoop211 conf]$ vim kafka-flume-hdfs.conf

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop211:9092,hadoop212:9092,hadoop213:9092
a1.sources.r1.kafka.topics=topic_log


## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop204:8020/origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
  • 将flume分发到212,213
创建消费kafka的flume的启停脚本
[sunp@hadoop211 bin]$ vim f2.sh
#! /bin/bash

case $1 in
"start"){
        for i in hadoop211 hadoop212 hadoop213
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop211 hadoop212 hadoop213
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac
#增加权限
chmod 777 f2.sh

#启动
f2.sh start
f2.sh stop
  • *** 重要!! 必须整个通道启动了,然后生成日志,才会将kafka 中的数据写入hdfs,而且要注意。HDFS上生成需要一定时间,根据上边配置的数据生成时间变化 a1.sinks.k1.hdfs.rollInterval = 120 ***

安装Mysql业务数据库

  • 在logdata214上安装
  • 先处理jdk,机器名,hosts,ssh等问题
  • 查看MySQL是否安装
rpm -qa|grep -i mysql
#如果有,就卸载
rpm -e --nodeps mysql-libs-5.1.73-7.el6.x86_64
  • 删除阿里云原有MySql依赖
yum remove mysql-libs
  • 安装MySQL
[root@logdata214 soft] yum install libaio
[root@logdata214 soft] yum -y install autoconf
[root@logdata214 soft] wget https://downloads.mysql.com/archives/get/p/23/file/MySQL-shared-compat-5.6.24-1.el6.x86_64.rpm
[root@logdata214 soft] wget https://downloads.mysql.com/archives/get/p/23/file/MySQL-shared-5.6.24-1.el7.x86_64.rpm
[root@logdata214 soft] rpm -ivh MySQL-shared-5.6.24-1.el7.x86_64.rpm
[root@logdata214 soft] rpm -ivh MySQL-shared-compat-5.6.24-1.el6.x86_64.rpm
  • 上传mysql-libs.zip到hadoop214的/opt/soft目录,并解压文件到当前目录
yum install unzip
unzip mysql-libs.zip
#进入mysql-libs目录
rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm
#安装MySQL客户端
rpm -ivh MySQL-client-5.6.24-1.el6.x86_64.rpm
  • 查看产生的随机密码
cat /root/.mysql_secret
  • 启动MySQL
service mysql start
  • 使用随机密码登陆mysql
mysql -uroot -pDNeXS8qha2CbqBZ2
 #修改密码
 mysql>SET PASSWORD=PASSWORD('000000');
 #配置任何主机都可登陆mysql
 mysql> use mysql;
 mysql>desc user;
 mysql>select User, Host, Password from user;
 mysql>update user set host='%' where host='localhost';
 mysql> delete from user where host!='%';
 mysql> flush privileges;
 mysql> quit;
  • 使用图形工具连接mysql,使用准备好的sql生成业务表
  • 生成业务数据,将mock包传到214上的/opt/module/db_log/下
  • 修改application.properties
logging.level.root=info


spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://logdata214:3306/gmall?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=000000

logging.pattern.console=%m%n


mybatis-plus.global-config.db-config.field-strategy=not_null


#业务日期
mock.date=2020-09-12
#是否重置
mock.clear=1

#生成新用户数量
mock.user.count=1000
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20

#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100

#购物车数量
mock.cart.count=30
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源  用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10

#用户下单比例
mock.order.user-rate=95
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=1000

#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10


#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50

#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
  • 在db_log目录下执行命令生成设定日的数据
java -jar gmall2020-mock-db-2020-04-01.jar

安装Sqoop

  • 下载解压

下载地址:http://mirrors.hust.edu.cn/apache/sqoop/1.4.6/

  • 上传安装包sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz到hadoop211的/opt/soft路径中
  • 解压安装包
tar -zxf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/
  • 进入到/opt/module/sqoop/conf目录,重命名配置文件
[root@logdata214 conf]# mv sqoop-env-template.sh sqoop-env.sh
#修改配置文件
vim sqoop-env.sh
#增加以下内容
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HIVE_HOME=/opt/module/hive
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export ZOOCFGDIR=/opt/module/zookeeper-3.5.7/conf
  • 将mysql-connector-java-5.1.48.jar 上传到/opt/soft路径
  • 进入到/opt/soft路径,拷贝jdbc驱动到sqoop的lib目录下
  • 可以通过某一个command来验证sqoop配置是否正确
[root@hadoop211 sqoop]# bin/sqoop help

image-20200912225107845

  • 检测是否可以连接数据库
[root@hadoop211 sqoop]bin/sqoop list-databases --connect jdbc:mysql://logdata214:3306/ --username root --password 000000

image-20200912225159319

同步策略

image-20200912225453820

  • 全量同步策略
  • 新增同步策略
  • 新增及变化
  • 特殊同步

**客观世界维度 **

没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一份固定值

日期维度

日期维度可以一次性导入一年或若干年的数据。

地区维度

省份表、地区表

编写同步脚本
  • 脚本放在home/sunp/bin下边

[sunp@hadoop211 bin]$ vim gmall_mysql_to_hdfs.sh

#! /bin/bash

sqoop=/opt/module/sqoop/bin/sqoop

if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d '-1 day' +%F`
fi

import_data(){
$sqoop import \
--connect jdbc:mysql://logdata214:3306/gmall \
--username root \
--password 000000 \
--target-dir hdfs://hadoop204:8020/origin_data/gmall/db/$1/$do_date \
--delete-target-dir \
--query "$2 and  \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
#直接做了lzo压缩,并生成index
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/gmall/db/$1/$do_date
}

import_order_info(){
  import_data order_info "select
                            id, 
                            final_total_amount, 
                            order_status, 
                            user_id, 
                            out_trade_no, 
                            create_time, 
                            operate_time,
                            province_id,
                            benefit_reduce_amount,
                            original_total_amount,
                            feight_fee  
                        from order_info
                        where (date_format(create_time,'%Y-%m-%d')='$do_date' 
                        or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}

import_coupon_use(){
  import_data coupon_use "select
                          id,
                          coupon_id,
                          user_id,
                          order_id,
                          coupon_status,
                          get_time,
                          using_time,
                          used_time
                        from coupon_use
                        where (date_format(get_time,'%Y-%m-%d')='$do_date'
                        or date_format(using_time,'%Y-%m-%d')='$do_date'
                        or date_format(used_time,'%Y-%m-%d')='$do_date')"
}

import_order_status_log(){
  import_data order_status_log "select
                                  id,
                                  order_id,
                                  order_status,
                                  operate_time
                                from order_status_log
                                where date_format(operate_time,'%Y-%m-%d')='$do_date'"
}

import_activity_order(){
  import_data activity_order "select
                                id,
                                activity_id,
                                order_id,
                                create_time
                              from activity_order
                              where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_user_info(){
  import_data "user_info" "select 
                            id,
                            name,
                            birthday,
                            gender,
                            email,
                            user_level, 
                            create_time,
                            operate_time
                          from user_info 
                          where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date' 
                          or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}

import_order_detail(){
  import_data order_detail "select 
                              od.id,
                              order_id, 
                              user_id, 
                              sku_id,
                              sku_name,
                              order_price,
                              sku_num, 
                              od.create_time,
                              source_type,
                              source_id  
                            from order_detail od
                            join order_info oi
                            on od.order_id=oi.id
                            where DATE_FORMAT(od.create_time,'%Y-%m-%d')='$do_date'"
}

import_payment_info(){
  import_data "payment_info"  "select 
                                id,  
                                out_trade_no, 
                                order_id, 
                                user_id, 
                                alipay_trade_no, 
                                total_amount,  
                                subject, 
                                payment_type, 
                                payment_time 
                              from payment_info 
                              where DATE_FORMAT(payment_time,'%Y-%m-%d')='$do_date'"
}

import_comment_info(){
  import_data comment_info "select
                              id,
                              user_id,
                              sku_id,
                              spu_id,
                              order_id,
                              appraise,
                              comment_txt,
                              create_time
                            from comment_info
                            where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_order_refund_info(){
  import_data order_refund_info "select
                                id,
                                user_id,
                                order_id,
                                sku_id,
                                refund_type,
                                refund_num,
                                refund_amount,
                                refund_reason_type,
                                create_time
                              from order_refund_info
                              where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_sku_info(){
  import_data sku_info "select 
                          id,
                          spu_id,
                          price,
                          sku_name,
                          sku_desc,
                          weight,
                          tm_id,
                          category3_id,
                          create_time
                        from sku_info where 1=1"
}

import_base_category1(){
  import_data "base_category1" "select 
                                  id,
                                  name 
                                from base_category1 where 1=1"
}

import_base_category2(){
  import_data "base_category2" "select
                                  id,
                                  name,
                                  category1_id 
                                from base_category2 where 1=1"
}

import_base_category3(){
  import_data "base_category3" "select
                                  id,
                                  name,
                                  category2_id
                                from base_category3 where 1=1"
}

import_base_province(){
  import_data base_province "select
                              id,
                              name,
                              region_id,
                              area_code,
                              iso_code
                            from base_province
                            where 1=1"
}

import_base_region(){
  import_data base_region "select
                              id,
                              region_name
                            from base_region
                            where 1=1"
}

import_base_trademark(){
  import_data base_trademark "select
                                tm_id,
                                tm_name
                              from base_trademark
                              where 1=1"
}

import_spu_info(){
  import_data spu_info "select
                            id,
                            spu_name,
                            category3_id,
                            tm_id
                          from spu_info
                          where 1=1"
}

import_favor_info(){
  import_data favor_info "select
                          id,
                          user_id,
                          sku_id,
                          spu_id,
                          is_cancel,
                          create_time,
                          cancel_time
                        from favor_info
                        where 1=1"
}

import_cart_info(){
  import_data cart_info "select
                        id,
                        user_id,
                        sku_id,
                        cart_price,
                        sku_num,
                        sku_name,
                        create_time,
                        operate_time,
                        is_ordered,
                        order_time,
                        source_type,
                        source_id
                      from cart_info
                      where 1=1"
}

import_coupon_info(){
  import_data coupon_info "select
                          id,
                          coupon_name,
                          coupon_type,
                          condition_amount,
                          condition_num,
                          activity_id,
                          benefit_amount,
                          benefit_discount,
                          create_time,
                          range_type,
                          spu_id,
                          tm_id,
                          category3_id,
                          limit_num,
                          operate_time,
                          expire_time
                        from coupon_info
                        where 1=1"
}

import_activity_info(){
  import_data activity_info "select
                              id,
                              activity_name,
                              activity_type,
                              start_time,
                              end_time,
                              create_time
                            from activity_info
                            where 1=1"
}

import_activity_rule(){
    import_data activity_rule "select
                                    id,
                                    activity_id,
                                    condition_amount,
                                    condition_num,
                                    benefit_amount,
                                    benefit_discount,
                                    benefit_level
                                from activity_rule
                                where 1=1"
}

import_base_dic(){
    import_data base_dic "select
                            dic_code,
                            dic_name,
                            parent_code,
                            create_time,
                            operate_time
                          from base_dic
                          where 1=1"
}

case $1 in
  "order_info")
     import_order_info
;;
  "base_category1")
     import_base_category1
;;
  "base_category2")
     import_base_category2
;;
  "base_category3")
     import_base_category3
;;
  "order_detail")
     import_order_detail
;;
  "sku_info")
     import_sku_info
;;
  "user_info")
     import_user_info
;;
  "payment_info")
     import_payment_info
;;
  "base_province")
     import_base_province
;;
  "base_region")
     import_base_region
;;
  "base_trademark")
     import_base_trademark
;;
  "activity_info")
      import_activity_info
;;
  "activity_order")
      import_activity_order
;;
  "cart_info")
      import_cart_info
;;
  "comment_info")
      import_comment_info
;;
  "coupon_info")
      import_coupon_info
;;
  "coupon_use")
      import_coupon_use
;;
  "favor_info")
      import_favor_info
;;
  "order_refund_info")
      import_order_refund_info
;;
  "order_status_log")
      import_order_status_log
;;
  "spu_info")
      import_spu_info
;;
  "activity_rule")
      import_activity_rule
;;
  "base_dic")
      import_base_dic
;;

"first")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
   import_base_province
   import_base_region
   import_base_trademark
   import_activity_info
   import_activity_order
   import_cart_info
   import_comment_info
   import_coupon_use
   import_coupon_info
   import_favor_info
   import_order_refund_info
   import_order_status_log
   import_spu_info
   import_activity_rule
   import_base_dic
;;
"all")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
   import_base_trademark
   import_activity_info
   import_activity_order
   import_cart_info
   import_comment_info
   import_coupon_use
   import_coupon_info
   import_favor_info
   import_order_refund_info
   import_order_status_log
   import_spu_info
   import_activity_rule
   import_base_dic
;;
esac
  • 脚本加权限
初次导入
[sunp@hadoop211 bin]$ gmall_mysql_to_hdfs.sh first 2020-09-12
#每日导入
[sunp@hadoop211 bin]$ gmall_mysql_to_hdfs.sh all 2020-09-12

至此,数据导入工作结束。下一个文档,开始数仓过程


作者:大鹏
转载请标注来源:大鹏的技术博客