COS视频去音频处理脚本

手写了个python+ffmpeg的视频去音轨的脚本,多线程。

  • 支持ctrl+关闭程序
  • 相对完善的统计数据

# 后台启动
nohup xxx.py &

# 统计处理量
cat cos_process_log.csv |awk ‘{print $1}’ |awk ‘BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 “GB ” count}’
283.74GB 5819

# 查询处理进度
tail -f nohup.out
进度:POOL: 5919 START: 5908 FINISH: 5876

# -*- coding=utf-8

from qcloud_cos.cos_client import CosConfig, CosS3Client
import os
from concurrent.futures.thread import ThreadPoolExecutor
import threading
import datetime
from time import sleep
from threading import Thread
import io
from qcloud_cos.cos_exception import CosClientError, CosServiceError
import subprocess
import signal

bucket = 'xxxx-xxxx-111111111'
cos = CosS3Client(
    CosConfig(Region='ap-xxxx',
              Secret_id='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
              Secret_key='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'))

dir_logs = f'/data/mosaic/output/mmpeg'
dir_tmp_files = f'/data/mosaic/output/tmp'
dir_target_files = f'/data/mosaic/output/target'
os.makedirs(dir_logs, exist_ok=True)
os.makedirs(dir_tmp_files, exist_ok=True)
os.makedirs(dir_target_files, exist_ok=True)

start_task_num = 0
finish_task_num = 0
pooled_task_num = 0
task_run_lock = threading.RLock()
executor = ThreadPoolExecutor(max_workers=32,
                              thread_name_prefix='object_check_')
log_lock = threading.RLock()
cos_process_log_fi = open(dir_logs + '/cos_process_log.csv', mode='a')
cos_erro_log_fi = open(dir_logs + '/cos_erro_log.txt', mode='a')

prefix = '视频/'
prefix_output = '视频-output/'

running_lock = threading.RLock()
running = True


def list_cos_subobject(Prefix="", Marker="") -> ([], [], str, bool):
    response = cos.list_objects(Bucket=bucket,
                                Prefix=Prefix,
                                Delimiter='/',
                                Marker=Marker,
                                MaxKeys=100,
                                ContentType='text/html; charset=utf-8')
    dirs = []
    keys = []
    if 'CommonPrefixes' in response:
        for folder in response['CommonPrefixes']:
            dirs.append(folder['Prefix'])
    if 'Contents' in response:
        for content in response['Contents']:
            keys.append(content['Key'])
    isTruncated = response['IsTruncated'] == 'true'
    nextMarker = ""
    if isTruncated:
        nextMarker = response['NextMarker']
    return (dirs, keys, nextMarker, isTruncated)


def get_obj(bucket: str, key: str, tmp_file: str):
    for i in range(0, 10):
        try:
            response = cos.download_file(Bucket=bucket,
                                         Key=key,
                                         DestFilePath=tmp_file)
            break
        except CosClientError or CosServiceError as e:
            print(e)


def put_obj(bucket: str, key: str, file_path: str):
    # with open(file_path, 'rb') as fp:
    #     response = cos.put_object(Bucket=bucket, Body=fp, Key=key)
    #     print(response['ETag'])

    for i in range(0, 10):
        try:
            response = cos.upload_file(Bucket=bucket,
                                       Key=key,
                                       LocalFilePath=file_path)
            break
        except CosClientError or CosServiceError as e:
            print(e)
            log_lock.acquire()
            try:
                cos_erro_log_fi.write("%s\t%s\n" % (file_path, key))
                cos_erro_log_fi.flush()
            finally:
                log_lock.release()


def set_not_running():
    running_lock.acquire()
    try:
        global running
        running = False
    finally:
        running_lock.release()


def is_running():
    # 停止了后,不再新增加任务
    running_lock.acquire()
    try:
        global running
        return running
    finally:
        running_lock.release()


def is_not_running():
    # 停止了后,不再新增加任务
    running_lock.acquire()
    try:
        global running
        return not running
    finally:
        running_lock.release()

def get_pooled_task_num():
    task_run_lock.acquire()
    try:
        global pooled_task_num
        return pooled_task_num
    finally:
        task_run_lock.release()

def inc_pooled_task_num():
    # pooled
    task_run_lock.acquire()
    try:
        global pooled_task_num
        pooled_task_num += 1
        return pooled_task_num
    finally:
        task_run_lock.release()


def get_start_task_num():
    task_run_lock.acquire()
    try:
        global start_task_num
        return start_task_num
    finally:
        task_run_lock.release()

def inc_start_task_num():
    task_run_lock.acquire()
    try:
        global start_task_num
        start_task_num += 1
        return start_task_num
    finally:
        task_run_lock.release()


def inc_finish_task_num():
    task_run_lock.acquire()
    try:
        global finish_task_num
        finish_task_num += 1
        return finish_task_num
    finally:
        task_run_lock.release()


def process_one_key(dir: str, key: str, cur_index: int):
    if key.endswith('.mp4'):
        global dir_logs
        global dir_tmp_files
        global dir_target_files
        #-------------------------------------------------
        # if key.endswith('.jpg') and not key.endswith('-des.jpg'):
        # tmp_file = key.replace(prefix, dir_tmp_files + "/")
        # execd_file = key.replace(prefix, dir_target_files + "/")
        _, name = os.path.split(key)
        _, ext = os.path.splitext(key)

        tmp_file = "%s/%07d%s" % (dir_tmp_files, cur_index, ext)
        execd_file = "%s/%07d%s" % (dir_target_files, cur_index, ext)

        target_path = key.replace(prefix, prefix_output)
        os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
        os.makedirs(os.path.dirname(execd_file), exist_ok=True)

        if cos.object_exists(Bucket=bucket, Key=target_path):
            return

        #------------------------------------------------- download
        get_obj(bucket, key, tmp_file)
        #------------------------------------------------- 处理文件
        # 清理旧文件
        if dir_target_files in execd_file and os.path.exists(execd_file):
            os.remove(execd_file)
        # 处理文件
        # ffmpeg -i input.mp4 -vcodec copy -an output.mp4
        # cmd = "cp %s %s && ls -l %s" % (tmp_file, execd_file, execd_file)
        cmd = "ffmpeg -i %s -vcodec copy -an %s && ls -l %s" % (
            tmp_file, execd_file, execd_file)
        # cmd = "ffmpeg -i %s -vcodec copy -acodec copy %s && ls -l %s" % (tmp_file, execd_file, execd_file)
        # os.system("cp %s %s" % (tmp_file, execd_file))
        subprocess.call(cmd, shell=True)
        # 打印文件日志
        #------------------------------------------------- 上传到新cos地址
        put_obj(bucket=bucket, key=target_path, file_path=execd_file)
        #------------------------------------------------- TODO:打印处理文件的日志
        file_stats = os.stat(tmp_file)
        file_size = file_stats.st_size
        log_lock.acquire()
        try:
            cos_process_log_fi.write("%s\t%s\n" % (file_size, target_path))
            cos_process_log_fi.flush()
        finally:
            log_lock.release()
        # 清理本地文件
        if dir_tmp_files in tmp_file and os.path.exists(tmp_file):
            os.remove(tmp_file)
        if dir_target_files in execd_file and os.path.exists(execd_file):
            os.remove(execd_file)


def get_object_meta_target(dir, key):
    # 未开始的任务就不用做了
    if is_not_running():
        return
    cur_index = inc_start_task_num()
    print("开始处理:%07d %s" % (cur_index, key))
    try:
        process_one_key(dir, key, cur_index)
    except Exception as e:
        print(e)
    finally:
        finish_total = inc_finish_task_num()
        print("完成处理:%07d %s 总计:%07d" % (cur_index, key, finish_total))


def check(prefix):
    global executor

    marker = ""
    while True:
        (dirs, keys, nextMarker,
         isTruncated) = list_cos_subobject(Prefix=prefix, Marker=marker)
        # 文件
        for key in keys:
            if is_not_running():
                return
            while get_pooled_task_num() - get_start_task_num() > 10:
                stat_log_print()
                if is_not_running():
                    return
                sleep(1)
            inc_pooled_task_num()
            # 执行任务
            executor.submit(get_object_meta_target, prefix, key)
        # 目录
        for dir in dirs:
            check(dir)
        # 是否处理完
        if not isTruncated:
            break
        # 下一步
        marker = nextMarker


def onSig(signo, frame):
    set_not_running()
    pass

def stat_log_print():
    global pooled_task_num
    global start_task_num
    global finish_task_num

    print("进度:POOL: %5d START: %5d FINISH: %5d" %
              (pooled_task_num, start_task_num, finish_task_num))

if __name__ == "__main__":
    # signal.signal(signal.SIGCHLD, onSigChld)
    signal.signal(signal.SIGINT, onSig)
    signal.signal(signal.SIGTERM, onSig)

    check(prefix)
    while (True):
        # 判断是否处理完
        stat_log_print()
        pre_start_task_num = start_task_num
        if pre_start_task_num == 0:
            continue
        if pre_start_task_num == finish_task_num:
            sleep(10)
            if pre_start_task_num == start_task_num:
                break
        sleep(1)

    print("finish")



# cat cos_process_log.csv |awk '{print $1}' |awk 'BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 "GB " count}'


Posted in 未分类 | Leave a comment

k8s服务访问

k8s基本网络模型

约法三单

  1. pod之间可以直接通信,无需显式使用NAT接收和地址转换
  2. node与pod之间可以直接通信,无需要明显地址转换
  3. pod可以看到自己的ip跟别人看到它所用的ip是一样的,中间不经过转换

分类

  1. underlay:与Host网络同层
  2. ovrelay:只要与Host网络不冲突,ip可以自由分配

Kubenets Service

metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 80
      targetPort: 9376

Service工作原理:

通过kube-proxy设置宿主机iptables规则来实现,kube-proxy通过观察service的创建,然后通过修改本机的iptables规则,将访问service vip的请求转发到真实的Pod上。

基于iptables规则的service的实现,宿主机上有大量Pod时,规则的不断刷新占用大量CPU资源,新的模式:ipvs,通过把规则放到内核态,降低了维护规则的代价

  1. Service的DNS记录:<myservice>.<mynamespace>.svc.cluster.local,访问这条记录时,返回的是Service的VIP或代理Pod的IP地址集合
  2. Pod的DNS记录:<pod_hostname>.<subdomain>.<mynamespace>.svc.cluster.local,注意pod的hostname和subdomain都是在Pod中定义的

集群内访问

`my-service:80 -> 192.168.11.236:9376` 

`my-service.my-namespace:80 -> 192.168.11.236:9376`

`172.18.255.16:80 -> 192.168.11.236:9376`

环境变量

TASK_BUS_SERVICE_HOST=172.18.254.152
TASK_BUS_SERVICE_PORT=80
TASK_WORKER_SERVICE_HOST=172.18.255.164
TASK_WORKER_SERVICE_PORT=80

Headless ServiceclusterIP: None不再提供虚拟IP来负载均衡

`my-service:9376 -> 192.168.11.236:9376,192.168.11.237:9376` 

`my-service.my-namespace:9376 -> 192.168.11.236:9376,192.168.11.237:9376`

集群外访问(外部宿主机访问)

  1. NodePort:外部client访问任意一台宿主机的8080端口,就是访问Servicer所代理的Pod的80端口,由接收外部请示请求的宿主机做转发,即client -> nodeIP:nodePort -> serviceVIP:port -> podIp:targetIp
  2. LoadBalance:公有云提供的k8s服务自带的loadbalancer做负载均衡和外部流量 的入口
  3. ExternalName:通过ExternalName或ExternalIp给Service挂在一个公有IP或者域名,当访问这个公有IP地址时,就会转发到Service所代理的Pod服务上。类似于软链或快捷方式
  4. ClusterIP:虚拟IP地址,外部网络无法访问,只有k8s内部访问使用。更像是一个伪造的IP网络
    • 仅仅用于Service这个对象,并由k8s管理和分配IP地址
    • 无法被Ping通,没有一个“实体网络对象”来响应
    • 只能结合Service Port组成一个具体的通信端口,单独的ClusterIP不具备通信的基础,并且它们属于k8s集群这样一个封闭的空间
    • 不同Service下的Pod节点在集群间相互访问可以通过ClusterIP

Service与Ingress

Ingress:全局的、为了代理不同后端Sercie而设置的负载均衡服务,只能工作在7层,而service工作在四层

Ingress对象:k8s项目对反向代理的一种抽象

Ingress Controller:目前支持Nginx、Haproxy、Envoy、Traefik等;nginx-ingress-controller只是被代理的Serive对象被更新,是不需要重新加载的,因为它的Lua方案实现了Nginx Upstream的动态配置。

nginx-igress-service:暴露服务出去,如果是公有云,需要创建LoadBalancer类型的Service

default-backend-service:默认规则

apiVersion: extensions/v1beat1
kind: Ingress
metadata:
  name: a-ingress
spec:
  tls:
  - hosts:
    # 标准域名格式,不能是IP地址
    - a.example.com
    secretName: a-secret
  rules:
  - hosts: a.example.com
    http:
    - path: /a1
      backend:
        serviceName: a1-svc
        servicePort: 80
    - path: /a2
      backend:
        serviceName: a2-svc
        sercvicePort: 80
Posted in Kubernetes | Tagged , | Leave a comment

k8s三层网络实现

字典

  1. 下一跳:如果ip包从主机A发送到主机B,需要经过路由设备X的中转,那么X的IP地址就应该配置为主机A的下一跳地址。
  2. Host-gw模式:将每个 Flannel 子网(Flannel Subnet,比如10.244.1.0/24)的下一跳 ,设置成该子网对应宿主机的IP地址。也就是主机(host)充当这条容器通信路径里的网关(gateway),也就是host-gw的含义;无封装,纯路由,只经过协议栈一次,性能较高
  3. 边际网关协议:BGP,Border Gateway Protocol,大规模数据中心维护不同“自治系统”之间路由信息、无中心的路由协议; 它不会在宿主上创建任何网桥设备相当于:每个边界网关上运行着一个小程序,它们会将各自的路由表信息,通过TCP传输给其它边界网关。其它边界网关上的这个小程序,则会对收到的这些数据进行分析,然后把需要的信息添加到自己的路由表。
    • CNI:与k8s对接的部分
    • Felix:DeamonSet,wmgm宿主机上插入路由规则,即写入Linux内核的FIB转发信息库,以及维护Calico所需要的网络设备等工作
    • BIRD:BGP客户端,专门负责在集群里分发路由信息
  4. 边际网关:负责把自治系统连接在一起的路由器。它的路由表中有其它自治系统里的主机路由信息;
  5. Calico:集群中所有的节点,都是边界路由器,被称为BGP Peer;默认情况下,是一个Node-to-Node Mesh的模式。随着节点的增多,会以N^2的规模快速增长。一般推荐在少于100个节点的集群中
  6. Calico Route Reflector的模式,大集群,它指定一个或几个专门节点,来负责与所有节点建立BGP连接,从而学习全局路由规则,其它节点,只需要给它交换路由信息。
  7. Calico IPIP模式:添加的路由规则10.233.2.0/24 via 192.168.2.2 tunl0,下一跳地址是Node2 IP地址,但发出去包的设备是tunl0(注意,不是flannel UDP模式的tun0,它们的功能是不同的)它是一个IP隧道设备(IP tunnel),IP包进入IP隧道后,被内核IPIP驱动接管。它会将这个IP包直接封装在一个宿主机网络的Ip包中。

Calico IPIP模式:

Flannel Host-gw模式

$ ip route
...
10.244.1.0/24 via 10.168.0.3 dev eth0

目的地址属于10.244.1.0/24网段的IP包,应该经过本机的eth0设备(dev eth0)设备发出去,并且它的下一跳(next-lop)是10.168.0.3(via 10.168.0.3)

从示意图上看,下一跳地址是目的宿主机Node2,所以IP包从应用层到链路层被封装成帧时,etho会使用下一跳地址对应的MAC地址,做为数据帧的MAC地址。

Node 2的内核网络从二层拿到IP包,就会看到这个目的IP地址:10.244.1.3,根据Node 2的路由表,会匹配到第二条路由规则,从而进入cni0网桥,进入infra-containeere-2当中。

Flannel子网和主机的信息,都是保存在Etcd中的。flanneld只需要wacth这些数据变化,然后实时更新路由表则可。性能损失在10%左右,其它基于VXLAN隧道机制的网络方案,性能损失在20%~30%左右。

Flannel host-gw限制,要求宿主机必须是二层连通;必须有其它宿主机的路由信息;

Posted in Kubernetes | Tagged , | Leave a comment

Pod生命周期

Pod容器

  1. Init Container:
    • 支持应用容器的全部字段与特性,包括资源限制,存储券和安全设置
    • 由于需要在pod就绪之前运行完成,所以不支持reaadinessProbe
    • 定义多个init容器,它们会按定义顺序执行
    • init容器失败,如果restartPolicy不为Nerver,则会不断重启,直到成功为止
    作用:
    • 具有与应用容器分离的单独镜像,可包含不建议在生产镜像中包含的实用工具
    • 应用程序镜像基于它可以分离出创建和部署的角色
    • 使用linux namespace,相对应用容器来说有不同的文件系统视图,因此有访问secret的权限,应用容器则不能
    • 可以阻塞或延迟应用容器的启动
  2. Pod Hook 如果postStart,postStop失败,会杀死容器
    • postStart:容器创建后立即执行,不保证钩子在容器EntryPoint之前运行。主要用于资源部署、环境准备等。如果运行时间过长以至不能运行或者挂起容器将无法到达Running状态
    • preStop:容器终止之前立即调用,阻塞,同步的,必须在删除容器调用发出之前完成。主要用于优雅关闭应用程序、通知其它系统等。如果执行期间被挂起,pod将永远在running状态,并不会到达failed状态
  3. 健康检查

Pod状态

PodStatus.phase字段

  1. 挂起(Pending)信息已提交集群,但没有被调度器调度到合适的节点或pod镜像正在下载
  2. 运行中(Running)已绑定到一个节点,所有容器已被创建。至少一个正在运行,或者处理启动或重启状态
  3. 成功(Successed)所有容器成功终止,并且不会重启
  4. 失败(Failed)所有容器已终止,并且至少一个容器是因为失败终止,也就是说容器以非0状态退出或被系统终止
  5. 未知(UnKnow)无法获得状态,通常是主机通信失败导致的

PodStatus.PodCondition

描述当前Status的具体原因

  1. PodScheduled:pod已调度到某节点
  2. ContainersReady:pod内所有容器已就绪
  3. Initialized:所有的init容器都已成功完成
  4. Unschedulable
  5. Ready:可以提供服务,并且应该被添加到对应服务的负载均衡池中

Pod

  1. 资源定义(CPU、内存)
  2. 调度
  3. Volume
    • Projected Volume 投射数据卷 存在不是为了存放容器中的数据,也不是容器与宿主之间数据交换,为容器提供预先定义好的数据
      • Secret
      • ConfigMap
        • 不需要加密的,应用所需要的配置文件
      • Downward API
        • 获取到Pod API对象本身的信息,进程启动之前就能确定的信息
      • SeviceAccountToken

restartPolicy

  1. Always:容器失效时,kubelet自动重启
  2. OnFailure:终止运行并且退出码不为0,由kubelet自动启动
  3. Never:不论容器运行状态如何,都不重启

ReplicationController,DaemonSet必须为always,需要保证容器持续运行

Job:onFailure或Never,确保执行完成后不再重启

探针

readinessProbe

检查结果决定这个Pod能不能通过Service的方式访问到,而不会影响Pod的生命周期。探测失败,端点控制器将从与 pod匹配的所有服务的端点列表中删除这个pod的ip地址

livenessProbe

配置参数太多,有模板:PodPresent。指示容器是否存活,如果探活失败,则kubelet会杀死容器,并且通过重启策略决定未来

startupProbe 指示容器内应用是否已经启动,提供了此探针,则所有其它控针都会被禁用,直到这个探针成功为止。

容器状态 kubectl describe pod <pod名字>

  1. Waiting:等待,仍在运行完成它启动所需要的操作,比如拉镜像、应用secret数据等
  2. Running:运行中,容器正在执行,并且没有问题产生
  3. Terminated:已终止,容器已经执行并且或者正常结束,或者因为其它原因失败

pod终止

体面退出

容器运行时会向每个容器的主进程发送TERM信号,一旦超出了体面退出终止限期,容器运行时会向所有剩余进程发送kill信号,之后pod会从api服务器上移除

例子:

  1. 使用kubectl手动删除某个pod,pod的体面终止时间默认为30s
  2. api服务中pod对象被更新,pod状态会被标记为“terminating“正在终止,超出所计算的体面终止限期时间点则认为pod dead
  3. kubelet看到pod被标记为正在终止(已设置了体面终止限期),则马上开始本地pod关闭过程
    • 检查并执行preStop回调,如果到达体面终止时间,仍在运行,则一次性给予宽限期2秒(可通过terminationGracePeriodSeconds参数配置)
    • 接下来kubelet触发容器运行时发送TERM信号给容器中的进程,如果需要容器按顺序关闭,可以考虑使用preStop回调逻辑来协调
  4. 控制器会将pod从对应的端点列表中移除,ReplicaSets与其它工作负载资源不再将关闭进程中的pod视为合法,可提供服务的副本。服务在终止宽限期开始的时候负载均衡器已经从端点列表肿移除了,所以pod即使还没有关闭完成,也无法继续处理请求
  5. 超出宽限期,kubelet会触发强制关闭,容器进行时向所有运行中容器发送sigkill信息。同时如果使用了pause容器的话,kubelet也会清理隐藏的pause容器
  6. api服务器删除pod的api对象,所有客户端之后则无法再看到这个容器

强制关闭

宽限期限--grace-period=<seconds>设置为0,并且设置--force参数,api服务器直接删除pod对象,节点侧,被终止的pod仍然会在被强制终止前获得一点点宽限时间

Posted in Kubernetes | Tagged , | Leave a comment

qcow2镜像定制指南

背景

目前网络上关于定制镜像的说明很分散,需要搜寻很多文章才能完成镜像的定制任务。所以我尝试提供一个全面而系统的指南,遵循本指南,用户可以方便的完成镜像的定制。

实施步骤

一、环境配置

1、准备软件
mac pro、VmWare fusion、CentOS-7-x86_64-DVD-1708.iso、CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
2、安装嵌套CentOs环境
由于MacOs不支持Kvm,故需要在嵌套的操作系统中安装云镜像需要的软件,使用Fusion很容易在MacOs中虚拟出一个CentOs的环境。
3、修改嵌套操作系统配置
在centos关闭的情况下打开虚拟机“处理器和内存”配置,选择“高级配置”,选中“在此虚拟机中启用虚拟化管理程序”和“在此虚拟机中启用代码分析应用程序”,如无这步操作,则在启动virt-manager时会报:“
virt-manager 报 WARNING : KVM不可用.这可能是因为没有安装KVM软件包,或者没有载入KVM内核模块.您的虚拟机可能性很差。”的错误,启动虚拟机。以下操作如无特殊说明都是在嵌套操作系统中执行。
4、安装依赖

yum install qemu-kvm qemu-img qemu-kvm-tools qemu-kvm-common
yum install libvirt-admin libvirt-client libvirt-daemon libvirt-devel libvirt
yum install libguestfs libguestfs-tools libguestfs-bash-completion

5、编译nbd内核模块(如不使用“nbd挂载方式修改镜像”则不需要安装此模块)
执行命令,出现以下报错时,说明没有nbd模块,需要自己手动安装





执行下面的命令安装hbd模块

[@xx] # cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
[@xx] # uname -a
Linux localhost 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
[] # uname -r
3.10.0-693.el7.x86_64

安装kernel组件

[@xx] sudo yum install kernel-devel kernel-headers

编译安装hbd组件

# 下载对应的内核源码包
[@xx] # wget http://vault.centos.org/7.4.1708/os/Source/SPackages/kernel-3.10.0-693.el7.src.rpm
[@xx] # rpm -ihv kernel-3.10.0-693.el7.src.rpm
[@xx] # cd /root/rpmbuild/SOURCES/
[@xx] # tar Jxvf linux-3.10.0-123.el7.tar.xz -C /usr/src/kernels/
[@xx] # cd /usr/src/kernels/
# 配置源码
[@xx] # mv $(uname -r) $(uname -r)-old
[@xx] # mv linux-3.10.0-693.el7 $(uname -r)
# 编译安装
[@xx 3.10.0-693.el7.x86_64] # cd $(uname -r)
[@xx 3.10.0-693.el7.x86_64] # make mrproper
[@xx 3.10.0-693.el7.x86_64] # cp ../$(uname -r)-old/Module.symvers ./
[@xx 3.10.0-693.el7.x86_64] # cp /boot/config-$(uname -r) ./.config
[@xx 3.10.0-693.el7.x86_64] # make oldconfig
[@xx 3.10.0-693.el7.x86_64] # make prepare
[@xx 3.10.0-693.el7.x86_64] # make scripts
[@xx 3.10.0-693.el7.x86_64] # make CONFIG_BLK_DEV_NBD=m M=drivers/block
[@xx 3.10.0-693.el7.x86_64] # cp drivers/block/nbd.ko /lib/modules/$(uname -r)/kernel/drivers/block/
[@xx 3.10.0-693.el7.x86_64] # depmod -a
# 查看nbd模块
[@xx 3.10.0-693.el7.x86_64]$ modinfo nbd
filename:       /lib/modules/3.10.0-693.el7.x86_64/kernel/drivers/block/nbd.ko
license:        GPL
description:    Network Block Device
rhelversion:    7.4
srcversion:     EDE909A294AC5FE08E81957
depends:        
vermagic:       3.10.0 SMP mod_unload modversions 
parm:           nbds_max:number of network block devices to initialize (default: 16) (int)
parm:           max_part:number of partitions per device (default: 0) (int)
parm:           debugflags:flags for controlling debug output (int)

编译安装时的错误处理
阶段:make CONFIG_BLK_DEV_NBD=m M=drivers/block

drivers/block/nbd.c: 在函数‘__nbd_ioctl’中:
drivers/block/nbd.c:619:19: 错误:‘REQ_TYPE_SPECIAL’未声明(在此函数内第一次使用)
   sreq.cmd_type = REQ_TYPE_SPECIAL;
                   ^
drivers/block/nbd.c:619:19: 附注:每个未声明的标识符在其出现的函数内只报告一次
make[1]: *** [drivers/block/nbd.o] 错误 1
make: *** [_module_drivers/block] 错误 2

处理:

[@xx 3.10.0-693.el7.x86_64] # vim include/linux/blkdev.h
# 由代码可知 REQ_TYPE_SPECIAL = 7
/*
 * request command types
 */
enum rq_cmd_type_bits {
        REQ_TYPE_FS             = 1,    /* fs request */
        REQ_TYPE_BLOCK_PC,              /* scsi command */
        REQ_TYPE_SENSE,                 /* sense request */
        REQ_TYPE_PM_SUSPEND,            /* suspend request */
        REQ_TYPE_PM_RESUME,             /* resume request */
        REQ_TYPE_PM_SHUTDOWN,           /* shutdown request */
#ifdef __GENKSYMS__
        REQ_TYPE_SPECIAL,               /* driver defined type */
#else
        REQ_TYPE_DRV_PRIV,              /* driver defined type */
#endif
        /*
         * for ATA/ATAPI devices. this really doesn"&gt;cmd[0] with the range of driver
         * private REQ_LB opcodes to differentiate what type of request this is
         */
        REQ_TYPE_ATA_TASKFILE,
        REQ_TYPE_ATA_PC,
};
# 修改nbd.c文件
[@xx 3.10.0-693.el7.x86_64] # vim drivers/block/nbd.c
# sreq.cmd_type = REQ_TYPE_SPECIAL;
sreq.cmd_type = 7;
# 重新执行命令
[@xx 3.10.0-693.el7.x86_64] # make CONFIG_BLK_DEV_NBD=m M=drivers/block

二、设置镜像共享

设置嵌套虚拟机文件夹共享
qcow2文件放置在mac本地文件夹中,嵌套虚拟机通过文件共享的方式使用qcow2文件。需要注意的是qcow2文件权限需要在macos中设置为可读写,否则在嵌套虚拟机中无法更新配置。

[mac@] # chmod 755 ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2

嵌套虚拟机中,需要要关闭SeLinux否则同样无法更新镜像内容

sudo /usr/sbin/setenforce 0

三、guestfish工具使用

1、示例程序:获取镜像ip地址

guestfish --rw -a ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# run
# list-filesystems
/dev/sda1: xfs
# mount /dev/sda1 /
# vi /var/log/cloud-init.log

2、示例程序:配置用户访问权限

guestfish --rw -a ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# run
# list-filesystems
/dev/sda1: xfs
# mount /dev/sda1 /
# mkdir /home/dc2-user/.ssh
# 注意guestfish与shell的区别,权限位是四位
# chown 1001 1001 /home/dc2-user/.ssh
# chmod 0700 /home/dc2-user/.ssh
# touch /home/dc2-user/.ssh/authorized_keys
# chmod 0600 /home/dc2-user/.ssh/authorized_keys
# chown 1001 1001 /home/dc2-user/.ssh/authorized_keys
# ll /home/dc2-user/.ssh/authorized_keys
-rw------- 1 1001 1001 0 Mar  1 10:05 /sysroot/home/dc2-user/.ssh/authorized_keys
# vi /home/dc2-user/.ssh/authorized_keys
# 添加公钥...
# quit

四、nbd挂载方式修改镜像(qemu-nbd)

1、确保已安装nbd模块,加载模块

# modinfo nbd
# insmod /lib/modules/3.10.0-693.el7.x86_64/kernel/drivers/block/nbd.ko max_part=8

3、建立nbd连接,挂载到目录

# qemu-nbd -c /dev/nbd0 ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# mkdir -p /mnt/qcows/qcow0
# mount /dev/nbd0p1  /mnt/qcows/qcow0

4、执行chroot

# chroot /mnt/qcows/qcow0/

5、执行修改,比如

$ passwd dc2-user
# 或其它操作

6、修改完毕后解除挂载点,解除连接

# umount /mnt/qcows/qcow0
# qemu-nbd -d /dev/nbd0p1

五、通过virt-manager挂载虚拟机

1、执行

virt-manager

2、新建虚拟机
选择“导入现有磁盘”,“使用ISO镜像”,选择qcow2文件…
如果报:“WARNING : KVM不可用.这可能是因为没有安装KVM软件包,或者没有载入KVM内核模块.您的虚拟机可能性很差。”的警告相应的解决方案是:

  • 关闭虚拟机
  • 进入虚拟机设置(可以配置网卡,硬盘,光驱的地方)
  • 点击“处理器和内存”,勾选虚拟化Inter VT-x/EPT 或AMD-V/RVI(V)

3、登录机器,修改
IP地址和密码、公钥方式登录,在前面已说明如何操作

六、清理痕迹

1、清理/var/log/文件夹
2、删除cloud-init执行记录
cloud-init是专门为云环境的虚拟机初始化而开发的工具,通过读取相应的数据,对虚拟机进行配置。其执行完毕后会在一个叫 sem 的目录下创建信号文件,以便下次启动模块时不再重复执行语句。其目录位置为:
/var/lib/cloud/instances/实例ID/sem

sudo rm -rf /var/lib/cloud/instances
sudo rm -rf /var/lib/cloud/instance

3、清理history和.ssh目录等

rm -rf /home/xxx/.ssh
echo '' # /home/xxx/.bash_history
echo '' # /root/.bash_history
 
rm -rf /root/.oracle_jre_usage

七、去除磁盘空洞

# 创建同样大小的镜像
$ qemu-img create -f qcow2 CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2 40G
$ virt-sparsify -x ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2 --convert qcow2 ./CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2
$ du -sh *
7.3G	CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
5.3G	CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2

Posted in 未分类 | Leave a comment

hive生命周期

功能

当前线上系统情况:

  • hadoop集群小文件数太多
  • hive的meta存储压力,有hive分区表有75W+分区
  • 释放非必要存储资源,中间层的数据较容易重新生成
  • 规范业务Hive使用,数据治理

名词:

路径不规范:

  1. 库路径无重合
  2. 库路径下不能有其它库
  3. 库路径必须在库所有者目录(/user/{Database.getOwnerName()}/…)下
  4. 表是在所在库路径下
  5. 表路径下不能有其它库
  6. 表路径下不能有其它表
  7. 表路径必须在表所有者目录(/user/{Table.getOwnerName()}/…)下

TTL: 数据保留时间(单位:天)

LEVEL:数据级别(0:永久保留;1:需要进行生命周期)

需要实现的功能:

  1. 新建表将纳入生命周期,增加库表TTL和LEVEL的设置功能
  2. 新建表未指定生命周期会使用默认值,默认生命周期会删除60天前未更新的数据,除非新建表对应的库设置了生命周期,此种情况下,新建表会继承对应库的生命周期,建议创建表时设置,避免数据误删除;配置方式:CREATE EXTERNAL TABLE guoguo.t_test_02(id string) … TBLPROPERTIES (‘LEVEL’=’1′,’TTL’=’70’)
  3. 配置了生命周期的非分区表到期会Drop表,分区表则只Drop分区
  4. 判断数据更新时间的标准是:
    • 1)表分区元数据更新时间,2)表分区对应的hdfs数据更新时间
    • 取两者的最大值做为数据更新的时间
    • 生命周期清理数据时基于数据更新时间后推ttl天清理
  5. 数据安全方案
    • 邮件:执行删除前一周会分别和库、表的所有者发送其负责的待删除的库、表的通知
    • 邮件:执行日报,每天把当天执行的情况汇总按需发给管理员,库所有者,表所有者
    • 邮件:路径不规范的库和表,不会进行处理,并每天有报警邮件,会给相关负责人发送,提醒业务整改
    • 备份:清理数据是先移动数据到每天生成的一个处理备份目录,然后再清理元数据
      • hive有内部表和外部表之分,内部表删除会影响内部表的元数据和底层数据存储,为保持处理一致,不采用只删除元数据,到期再清理数据的方案
      • 移动数据会把meta信息和权限信息一同带到备份目录
      • 备份的目录在15天后由程序删除,删除后数据无法恢复
    • 恢复:可按天恢复数据和meta信息
      • 恢复脚本随每次处理的同时由MR程序生成,保存在HDFS文件中
    • 严格的配置和管理权限,普通用户无法修改生命周期配置信息
  6. 配置权限
    • 创建库、创建表的用户权限
    • 普通用户无法修改库、表的生命周期属性,只能由配置的管理员账号修改

生命周期配置

增加生命周期默认配置项,表的参数 level=0代表永不过期,默认level=1,数据60天后删除;库的参数level和ttl做为库下面新建表的默认生命周期参数;

    METASTORE_LIFE_CYCLE_ADMINS("hive.lifecycle.admins", "admin,hadoop",
            "lifecycle admin users."),
    METASTORE_LIFE_CYCLE_DEFAULT_LEVEL("hive.lifecycle.default.level", "1",
            "lifecycle default level."),
    METASTORE_LIFE_CYCLE_DEFAULT_TTL("hive.lifecycle.default.ttl", "60",
            "lifecycle default ttl."),

增加生命周期的MetaStorePreEventListener实现,校验参数、权限检查和处理生命周期参数。配置信息保存在Database和Table的Parameters中;

@Override
  public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException, InvalidOperationException {
    // 判断操作类型
    if (context.getEventType() == PreEventContext.PreEventType.CREATE_DATABASE) {
      onPreCreateDatabase(context);
    } else if (context.getEventType() == PreEventContext.PreEventType.CREATE_TABLE) {
      onPreCreateTable(context);
    } else if (context.getEventType() == PreEventContext.PreEventType.ALTER_DATABASE) {
      onPreAlterDatabase(context);
    } else if (context.getEventType() == PreEventContext.PreEventType.ALTER_TABLE) {
      onPreAlterTable(context);
    } else if (context.getEventType() == PreEventContext.PreEventType.DROP_DATABASE) {
      onPreDropDatabase((PreDropDatabaseEvent) context);// /*do nothing*/
    } else if (context.getEventType() == PreEventContext.PreEventType.DROP_TABLE) {
      onPreDropTable((PreDropTableEvent) context);// /*do nothing*/
    }
  }

执行逻辑:

Step 0: 创建MetaStore连接池,并获取db,table列表

产出:[{dbname,tblName}]

Step 1: 查询库表信息,并检查是否规范

不规范类型:

      case HAS_OTHER_DB:
        return String.format("路径中存在其它库%s;", values);
      case OWNER_PATH_FAIL:
        return String.format("路径与库表所有者不一致%s;", values);
      case HAS_OTHER_DB_TBL:
        return String.format("路径下其它库的表%s;", values);
      case DB_PATH_FAIL:
        return String.format("库路径不规范%s;", values);
      case HAS_OTHER_TBL:
        return String.format("路径下有其它表%s;", values);
      case PART_PATH_NOT_IN_TBL:
        return String.format("分区不在表路径下%s;", values);
      case TBL_NOT_SUB_DB:
        return String.format("表路径不在库路径下%s;", values);

检查库表规范

1. 检查库规范,获取库信息,维护DeniedResion

库规范: /user/{Database.getOwnerName()}/…

  • 检查库路径是否在库所有者下
  • 检查库路径无重合

库信息DatabaseData:

  • 库名
  • 库所有者
  • 库路径是否正确
  • NoScamaLocation
  • Location(database.getLocationUri())

2. 检查视图规范(ToDo:)

3. 检查表规范,获取表信息,维护DeniedResion

表规范:/user/{table.getOwner}/…

  • 检查表是否在库路径下
  • 检查表路径下是否有其它库
  • 检查表路径下是否有其它表
  • 检查表路径是否在表所有者下

表信息TblData:

  • 库名
  • 表名
  • 表所有者(table.getOwner())
  • 表类型(table.getTableType())
  • 表分区key列表(table.getPartitionKeys().getName())
  • 表修改时间(params.get(TRANSIENT_LASTDDLTIME)*1000)
  • 表是否使用生命周期
  • 从params中获取表级别:level=0 level=1
  • 从params表过期时间:ttl 单位天
  • 表路径是否正确
  • NoScamaLocation
  • Location(database.getLocationUri())

4. 检查分区规范

分区必须在表下

产出:

  • [DatabaseData] [TblData]

Step 3: 发送库表不规范日报

使用Velocity模板,生成日报表格,发送邮件给hive管理员组

  • 库名
  • 库所有者
  • 库路径
  • 不规范原因
  • 库管理员(元数据平台管理)
  • 报表生成时间

Step 4: Start Write Source File

表和分区路径对应信息写入hdfs,为过期检查做准备

分区表:

创建jdbc连接,从数据库中查询每个规范的分区表的分区信息

    String sql = "" +
        "SELECT " +
        "     pt.PART_ID AS PART_ID," +
        "     ls.LOCATION AS LOCATION," +
        "     pt.CREATE_TIME AS CREATE_TIME," +
        "     ptp.PARAM_VALUE AS LAST_DDL_TIME" +
        " FROM " +
        " (SELECT db.DB_ID FROM DBS AS db WHERE db.name=:db_name) AS db2" +
        " INNER JOIN TBLS AS tbl ON (db2.DB_ID = tbl.DB_ID AND tbl.TBL_NAME=:tbl_name)" +
        " INNER JOIN PARTITIONS AS pt ON (pt.TBL_ID = tbl.TBL_ID)" +
        " LEFT JOIN SDS AS ls ON ( ls.SD_ID = pt.SD_ID )" +
        " LEFT JOIN PARTITION_PARAMS AS ptp ON (pt.PART_ID = ptp.PART_ID AND ptp.PARAM_KEY = 'transient_lastDdlTime')";
    MapSqlParameterSource paramSource = new MapSqlParameterSource();
    paramSource.addValue("db_name", dbName);
    paramSource.addValue("tbl_name", tblName);
    

根据PART_ID查询PART_KEY_VAL信息

          StringBuilder builder = new StringBuilder("");
          builder.append(" SELECT PART_ID,PART_KEY_VAL ");
          builder.append(" FROM PARTITION_KEY_VALS ");
          builder.append(" WHERE PART_ID IN (");
          List<Long> ids = Lists.transform(datas, new Function<PartDetailEntity, Long>() {
            public Long apply(PartDetailEntity entity) {
              return entity.getId();
            }
          });
          builder.append(StringUtils.join(ids, ','));
          builder.append(")");
          builder.append(" ORDER BY PART_ID,INTEGER_IDX");

合并结果集

partitions = [PartDetailEntity{id,location,List<String> values, createTime, lastDdlTime}]

分区信息写入hdfs,内容:

  • 库名
  • 表名
  • 表类型
  • level
  • ttl
  • partSpecs
  • 路径=partition.getLocation()
  • LocType=PARTITION
  • 规范:true
  • 最后一次修改时间:partition.getLastDdlTime()

非分区表:

信息写入HDFS,内容:

  • 库名
  • 表名
  • 表类型
  • ttl
  • level
  • PartSpecs=null
  • location=tblData.getLocation()
  • locType=TABLE
  • 规范:true
  • 库名
  • 表名
  • 表类型
  • level
  • ttl
  • partSpecs
  • 路径=partition.getLocation()
  • LocType=PARTITION
  • 规范:true
  • 最后一次修改时间:tblData.getHiveDdlTime()

Step 5: 过期检查

基于MapReduce实现,读取每条表、分区信息,检查数据更新时间

判断数据是否更新的标准是:1)表分区元数据是否有更新,2)表分区对应的hdfs数据是否有更新

  • 修改时间:Max.(fileSystem.getFileStatus().getModificationTime(), 最后一次修改时间) + 1
  • 清理时间:修改时间 + ttl * D_1

会根据数据情况分下面三类,写入不同文件中

  • 本次会清理
  • 将于近期清理
  • 本次无需要关注

Step 6: Start Gen Export And Drop Cmd

Step 7: 备份数据

备份数据基于MR实现,代码首先获取上一个MR程序生成的Export语句,然后调用hive的底层代码生成执行计划,执行;

使用这种方式在数据需要恢复时,执行恢复后数据的权限不会发生变化

Step 8: 生成清理分区和表的脚本

对于分区表,修改表结构删除分区,对于非分区表,直接删除表

Step 8: Start Exec Drop Hql Cmds

使用MapReduce程序跑清理脚本,

  • 拉取执行脚本到本地
  • hive -f 执行清理脚本
state = shellCmdExecutor.exec(
      String.format("hive -f %s", tmpFile.getAbsolutePath()),
      systemEnv, workDir, Constats.IS_DEBUG);

Step 9: 生成通知邮件

使用MapperReduce聚合处理数据

Step 10: 汇总通知

  • 库所有者
    • 执行失败的库表(忽略)
    • 本次成功处理的表、分区
    • 将要清理的表、分区
    • 不规范的表和分区
  • hive管理员邮件组
    • 文件统计
    • 执行分区统计
    • 执行表统计
    • 路径统计
Posted in 未分类 | Leave a comment

参数校验

使用场景

  1. 接口参数校验

引入依赖

<dependency>
    <groupId>javax.validation</groupId>
    <artifactId>validation-api</artifactId>
    <version>2.0.1.Final</version>
</dependency>
<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
    <version>5.4.1.Final</version>
</dependency>

验证类上添加注解

import org.hibernate.validator.constraints.NotBlank;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

public class RefundReq extends BaseSignReq {
    @NotBlank
    private String tradeOrderId;
    @Min(value = 100)
    @Max(value = 999)
    private Integer bizId;
    @NotBlank
    private String refundOrderId;

    private List<RefundDetail> refundDetails;
}

public class BaseSignReq {
    @NotBlank
    private String caller;
    @NotBlank
    private String noise;
    @NotBlank
    private String reqId;
    @NotBlank
    private String sign;
}

public class RefundDetail {
    @Min(1)
    private Integer channelType;
    @Min(1)
    private Long refundAmount;
}

工具类

import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import javax.validation.groups.Default;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public final class ValidationUtil {
    private ValidationUtil() {/**/}

    private static Validator validator = Validation.buildDefaultValidatorFactory().getValidator();

    public static <T> ValidationResult validateEntity(T obj) {
        ValidationResult result = new ValidationResult();
        Set<ConstraintViolation<T>> set = validator.validate(obj, Default.class);
        if (!CollectionUtils.isEmpty(set)) {
            result.hasErrors = true;
            Map<String, String> errorMsg = new HashMap<String, String>();
            for (ConstraintViolation<T> cv : set) {
                errorMsg.put(cv.getPropertyPath().toString(), cv.getMessage());
            }
            result.errorMsg = errorMsg;
        }
        return result;
    }

    @Getter
    public static class ValidationResult {
        //校验结果是否有错
        private boolean hasErrors;
        //校验错误信息
        private Map<String, String> errorMsg;
    }
}

使用

 ValidationUtil.ValidationResult result = ValidationUtil.validateEntity(req);
if (result.isHasErrors()) {
      log.error("[预支付]校验入参,数据有误[{}]", result);
      return RestResult.<PrePayRsp>builder()
             .code(ResultCode.PARAM_VALIDATE_FAIL.code).msg(result.getErrorMsg().toString())
             .build();
}
Posted in 编程工具 | Leave a comment

Sqoop源码分析

Sqoop的Mysql数据导出实现分两种,一种是使用JDBC方式从Mysql中获取数据,一种是使用MysqlDump命令从MySql中获取数据,默认是 JDBC方式获取数据,如果要使用dump方式获取数据,需要添加 -direct 参数。

使用JDBC方式从Mysql中获取数据

配置语句时,需要添加 $CONDITIONS 点位符,比如:SELECT id FROM user WHERE $CONDITIONS,Sqoop在内部实现时会把它替换成需要的查询条件。

Sqoop启动后会先查询元数据,它会把 $CONDITIONS 替换为 (1=0) ,然后用得到的SQL语句查询数据表对应的Meta信息对于导出一个表的情况,Sqoop会使用这个SQL查询三次数据库,分别是: 1、获取 colInfo(最终得到columnTypes信息)2、查询ColumnNames信息3、生成QueryResult类执行 generateFields操作获取columnTypeNames时。

Sqoop会对获取的Fields做校验,列不能重复,它还会处理数据库的字段到Java属性名的转换

QueryResult类是通过构建java类文件,然后获取JavaCompiler,然后编译加载,为了提高处理性能,不是使用反射实现的,这个生成类内部处理mysql到hdfs属性值为空和分隔符的处理。

接着它会进行下面一个Sql查询操作,查询结果集为MIN(split列),MAX(split列),查询条件的处理逻辑为 $CONDITIONS 替换为(1=1),然后再添加外面SELECT查询 (举例:SELECT MIN(id), MAX(id) FROM (SELECT ID,NAME,PASSPORT WHERE (1=1) ) AS t1 ),这样就查询出来此次导出数据最大的split列值和最小的split列值。

对于为整数、布尔值、时间格式、Float等 的分区列,进行split时直接根据对应值的大小进行Split,Text文本的处理方式比较特殊,Sqoop先会对之前获取到的Min和Max的字串寻找它们最大的相同前缀子字串,然后把后面的字段转化为BigDecimal,结合时char占两个字节(65536),算法在 TextSplitter类中,比较简单,本质上就是一个进制转换。拆分好后,需要把Split的值再转换为String,然后加上相同前缀子字段,就构成了查询区间了(注意中文可能会被拆分)。

Sqoop对数据的获取是在DataDrivenDBRecordReader中,在查询时会把 $CONDITIONS 替换成 split 的范围比如 ( id >= 1) && (id<10),使用JDBC获取到结果集游标,然后移动游标处理数据。

使用MysqlDump命令从MySql中获取数据

第二种方法与第一种方式有下面的差别:

初始化元数据,它是在构建的查询语句后面添加 limit 1 ,比如:SELECT t. FROM AS t LIMIT 1,因为dump方式在查询指定获取列时使用的是 t.,当使用limit 0时,数据库不会给它返回必须的元数据信息。

dump方式在map进行数据的获取,其会构建mysqldump命令,然后使用java程序调用,获取输入输出流和错误流,其实现了 org.apache.sqoop.util.AsyncSink 抽象类,用来处理输入输出流和错误流。

优化策略:

Sqoop在执行时会执行三次相同的Sql语句用来查询无数据,可以合并查询,由于查询仅返回Meta信息很快,不需要修改这块的实现。

分区列选择对于查询元数据和导出的查询影响很大,应该对索引做调优,避免对分区列的排序操作,加快元数据查询速度和导出数据的速度,尽量选择自增加的主键ID做Split列,区分度好并且可以顺序读取数据。

导出操作的查询语句中,$CONDITIONS 会被替换为范围区间,创建索引时,要考虑做这个查询的优化。

索引建议,考虑三个规则(使查询数据集较少、减少点的查询、避免排序操作),Sqoop场景下,如果分区列不是主键(自增加)时,把分区列做为联合索引的第一个字段,其它被选择的查询条件做为索引的其它字段。

分区列的选择,要避免Split后数据不均衡。

从实现上来看-m参数是可以增加任务的并行度的,但数据库的读线程是一定的,所以 -m 过大对于数据库和hadoop集群会有压力,在Sqoop的场景下,数据库是一个影响并发的瓶颈。增加mapper数意义不大。

下面列出Sqoop目前1.4.6版本存在的两个问题。

查看Sqoop源码,发现其存在两个比较严重的问题。

问题 1、数据分片与Mapper设定不合理

Sqoop在抽取时可以指定 -m 的参数来控制mapper的数量的,sqoop根据mapper的数量,对数据库数据进行分片,每个分片交给一个mapper处理,mapper在处理的过程中会通过jdbc或mysqldump对相应分片的数据进行查询和拉取,最终产生和mapper数量相同的文件数。

当每天需求导入hdfs中的表非常大时,过小的mapper值,会导致写入hdfs中的文件过大,如果文件的格式不能被split,那么会对业务下游产生影响。过大的mapper值,在数据库读线程一定,网络带宽一定的情况下,并不能提高导入hdfs的效率,反而会对数据库造成并发读压力,同时过多的mapper也会占用hadoop集群的资源。

Sqoop做为一个数据导出框架,对数据的控制应该再细至一些,-m 只是控制mapper的数量,而数据分片数目应该由另外的参数控制。

我们为sqoop添加了 -split-per-map 参数,比如设置-m=4 -split-per-map=2,则对结果集分 8 片,每个Mapper处理两片数据,最后共产生 8 个文件。

问题 2、分片效率低

Sqoop在做分片处理时有问题,其实现会使用 “Select Max(splitKey),Min(splitKey) From ( –select参数 ) as t1” 的语句来查询分片信息,在Mysql下,这样的查询会产生一个以split-id为主键,包含需要导出的其它所有字段的临时表,如果数据量不大,临时表数据可以在内存中,处理速度还可以保证。但如果数据量很大,内存中已经存放不下时,这些数据会被保存为MyISAM表存放到磁盘文件中,如果数据量再大一些,磁盘文件已经存放不下临时表时,拆分数据会失败。

在我们场景下,由于表数据量很大,这个查询占到整个导出时间的45%(整个过程原来需要90多分钟),优化空间很大。

解决方案一:

修改所有导出脚本,分片语句自定义

解决方案二:

修改:org.apache.sqoop.mapreduce.DataDrivenImportJob的

@Contract(“null, _ -&gt; !null”)
private String buildBoundaryQuery(String col, String query)

修改代码如下

org.apache.sqoop.mapreduce.DataDrivenImportJob的
 
@Contract("null, _ -> !null") 
private String buildBoundaryQuery(String col, String query)
 
/**
   * Build the boundary query for the column of the result set created by
   * the given query.
   * @param col column name whose boundaries we're interested in.
   * @param query sub-query used to create the result set.
   * @return input boundary query as a string
   */
  private String buildBoundaryQuery(String col, String query) {
    if (col == null || options.getNumMappers() == 1) {
      return "";
    }
 
    // Replace table name with alias 't1' if column name is a fully
    // qualified name.  This is needed because "tableName"."columnName"
    // in the input boundary query causes a SQL syntax error in most dbs
    // including Oracle and MySQL.
    String alias = "t1";
    int dot = col.lastIndexOf('.');
    String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
 
    ConnManager mgr = getContext().getConnManager();
    String ret = mgr.getInputBoundsQuery(qualifiedName, query);
    if (ret != null) {
      return ret;
    }
 
//    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
//        + "FROM (" + query + ") AS " + alias;
    return initBoundaryQuery(qualifiedName, query, alias);
  }
 
  private String initBoundaryQuery(String qualifiedName, String query, String alias) {
    StringBuilder regex = new StringBuilder();
    regex.append("(\\s[A|a][S|s][\\s][`]?");
    for (char c : qualifiedName.toCharArray()) {
      regex.append('[').append(c).append(']');
    }
    regex.append("[`|\\s|,])");
    final Matcher matcher1 = Pattern.compile(regex.toString()).matcher(query);
    final boolean asCheckOk = !matcher1.find();
    if(asCheckOk) {
      final Matcher matcher2 = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s)").matcher(query);
      int count = 0;
      while (matcher2.find()) {
        count++;
      }
      boolean fromCheckOk = count == 1;
      if(fromCheckOk) {
        final Matcher matcher = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s[\\s\\S]*)").matcher(query);
        while (matcher.find()) {
          return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
                  + matcher.group();
        }
      }
    }
    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
            + "FROM (" + query + ") AS " + alias;
  }

问题三:无法自定义特殊字符替换

解决方案一:

通过SQL的替换功能,修改脚本代价高,并且脚本可读性、可维护性都大大降低

解决文案二:

修改Sqoop实现,增加自定义特殊字符替换功能

Posted in 未分类 | Leave a comment

异常处理

应用场景

  1. 接口返回值

异常枚举类

public enum ResultCode {

    /**
     * 成功
     */
    SUCCESS(200, "接口调用成功"),

    /**/
    SIGN_ERROR(300,"验签失败"),
    INNER_SERVER_ERROR(500, "服务器内部错误"),
    UNAUTHORIZED(401, "Unauthorized"),
    /**/
    PAY_EXPIRED(1001, "超时"),
    DUPLACATE_REQ(1002, "重复提交"),
    PARAM_VALIDATE_FAIL(1003, "参数错误"),
    RESOURCE_NOT_EXISTS(1004, "资源不存在"),
    ;

    public final int code;

    public final String msg;

    public SystemException exception() {
        return new SystemException(this.code, this.msg);
    }

    public SystemException exception(String msg) {
        return new SystemException(this.code, msg);
    }

    public SystemException exception(Throwable cause) {
        return new SystemException(this.code, this.msg, cause);
    }

    public SystemException exception(String msg, Throwable cause) {
        return new SystemException(this.code, msg, cause);
    }
}

业务异常类

@Getter
public class SystemException extends RuntimeException {
    private int code = 500;

    private HashMap<String, Object> value;

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public SystemException() {
    }

    public SystemException(int code) {
        this.code = code;
    }

    public SystemException(int code, String message) {
        super(message);
        this.code = code;
    }

    public SystemException(int code, String message, Throwable cause) {
        super(message, cause);
        this.code = code;
    }

    public SystemException(Throwable cause) {
        super(cause.getMessage(), cause);
    }

    public SystemException(String pattern, Object[] args, Throwable cause) {
        super(String.format(pattern, args), cause);
    }

    public SystemException(int code, String pattern, Object[] args, Throwable cause) {
        super(String.format(pattern, args), cause);
        this.code = code;
    }

    public SystemException(int code, String pattern, Object... args) {
        super(String.format(pattern, args));
        this.code = code;
    }

    public SystemException(String pattern, Object[] args) {
        super(String.format(pattern, args));
    }

    public SystemException(String message) {
        super(message);
    }

    public SystemException(String message, Throwable cause) {
        super(message, cause);
    }

    public void setValue(HashMap<String, Object> value) {
        this.value = value;
    }
}

统一异常处理器

/**
 * 统一异常处理注册器
 */
@ControllerAdvice(annotations = RestController.class)
public class ExceptionRegister {
    private LoadingCache<String, Logger> LOGGER = CacheBuilder.newBuilder()
            .build(new CacheLoader<String, Logger>() {
                @Override
                public Logger load(String key) throws Exception {
                    return LoggerFactory.getLogger(key);
                }
            });


    @ExceptionHandler(SystemException.class)
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    public RestResult<?> handleSystemException(
            HttpServletRequest request, HttpServletResponse response, SystemException ex) {
        getLogger(ex).warn(ex.getMessage());
        if (ex.getValue() != null) {
            return new RestResult<>(ex.getCode(), ex.getMessage(), ex.getValue());
        }
        return new RestResult<>(ex.getCode(), ex.getMessage(), Maps.newHashMap());
    }

    @ExceptionHandler(Exception.class)
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    public RestResult<?> handleException(
            HttpServletRequest request, HttpServletResponse response, Exception ex) {
        getLogger(ex).error(ex.getMessage(), ex);
        return new RestResult<>(500, "Server Err", Maps.newHashMap());
    }

    private Logger getLogger(Exception ex) {
        StackTraceElement traceElement = ex.getStackTrace()[0];
        return LOGGER.getUnchecked(traceElement.getClassName());
    }
}

使用举例

// 抛出带返回值的业务异常
SystemException exception = ResultCode.ALREADY_PAY_SUCCESS.exception(e.getErrorMessage(), e);
HashMap<String, Object> map = Maps.newHashMap();
map.put("tradeOrderId", tradeOrderId);
exception.setValue(map);
throw exception;
// 抛出普通业务异常
throw ResultCode.REFUND_SUCCESS.exception();
Posted in 编程工具 | Leave a comment

Camus源码分析

协议:

输出文件压缩:Camus默认只支持两种压缩格式(snappy和deflate),默认是defalte,使用 StringRecordWriterProvider写入文本格式文档时,还可以指定gzip的压缩格式,扩展其它压缩格式很容易,只需要添加 两行代码就可以,建议增加lzo和lzop的压缩格式,以和我们Hive保持一致。

输出格文件类型:建议文本格式的文件

文件目录规则:(配置的目录)+ topic名 + daily|hour + (年/月/日)|(年/月/日/小时) + 数据文件,例如:/rocketmq/data/vip_ods_heartbeat/daily/2015/06/10 /vip_ods_heartbeat.broker-a.0.999.48388735.1433865600000.deflate

文件名规则:topic名+ (RocketBrokerId)|(kafka的对应分区的learder的BrokerId)+ (RocketQueueId)|(kafka分区号)+ 写入消息行数 + 最后一条消息的Offset + 编码的分区(时间 + 压缩格式后缀),例如:vip_ods_heartbeat.broker- a.0.999.48388735.1433865600000.deflate

Topic的命名规则:业务标识+数据库名+数据库表名(分表只需要BaseName就可以),例 如:vip_ods_heartbeat

消息格式:操作类型\t表名(分表的话是分表名)\t数据库名\t主键名\t唯一索引\tBinlog日志时间 \tCheckPoint字段\tDataBefore\tDataAfter,库名表名都是RockMQ中的原始数据,在生成列数据时,列中数 据如果有\t等特殊字符需要替换,例如:
insert\theartbeat\tvip_ods\tid\tname,pid\t1232132131\t120@21\t{“字段名”,”字段 值”,…}\t{“字段名”,”字段值”,…}

确定数据导入是否完成:

  • Camus中会在History的目录中存放历次消费的状态,包括开始执行的分区和它们的Offset、 执行结束位置的分区和它们的Offset,这两个文件以SequenceFile的形式存放在HDFS文件中
  • Camus在执行结束后可以把 执行信息汇总发送到Kafka的Topic中,Topic的名字为:TrackingMonitoringEvent,如果监控程序监控这个 Topic,是可以得到当前执行的情况的信息的。

Posted in 未分类 | Leave a comment