COS视频去音频处理脚本

手写了个python+ffmpeg的视频去音轨的脚本,多线程。

  • 支持ctrl+关闭程序
  • 相对完善的统计数据

# 后台启动
nohup xxx.py &

# 统计处理量
cat cos_process_log.csv |awk ‘{print $1}’ |awk ‘BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 “GB ” count}’
283.74GB 5819

# 查询处理进度
tail -f nohup.out
进度:POOL: 5919 START: 5908 FINISH: 5876

# -*- coding=utf-8

from qcloud_cos.cos_client import CosConfig, CosS3Client
import os
from concurrent.futures.thread import ThreadPoolExecutor
import threading
import datetime
from time import sleep
from threading import Thread
import io
from qcloud_cos.cos_exception import CosClientError, CosServiceError
import subprocess
import signal

bucket = 'xxxx-xxxx-111111111'
cos = CosS3Client(
    CosConfig(Region='ap-xxxx',
              Secret_id='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
              Secret_key='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'))

dir_logs = f'/data/mosaic/output/mmpeg'
dir_tmp_files = f'/data/mosaic/output/tmp'
dir_target_files = f'/data/mosaic/output/target'
os.makedirs(dir_logs, exist_ok=True)
os.makedirs(dir_tmp_files, exist_ok=True)
os.makedirs(dir_target_files, exist_ok=True)

start_task_num = 0
finish_task_num = 0
pooled_task_num = 0
task_run_lock = threading.RLock()
executor = ThreadPoolExecutor(max_workers=32,
                              thread_name_prefix='object_check_')
log_lock = threading.RLock()
cos_process_log_fi = open(dir_logs + '/cos_process_log.csv', mode='a')
cos_erro_log_fi = open(dir_logs + '/cos_erro_log.txt', mode='a')

prefix = '视频/'
prefix_output = '视频-output/'

running_lock = threading.RLock()
running = True


def list_cos_subobject(Prefix="", Marker="") -> ([], [], str, bool):
    response = cos.list_objects(Bucket=bucket,
                                Prefix=Prefix,
                                Delimiter='/',
                                Marker=Marker,
                                MaxKeys=100,
                                ContentType='text/html; charset=utf-8')
    dirs = []
    keys = []
    if 'CommonPrefixes' in response:
        for folder in response['CommonPrefixes']:
            dirs.append(folder['Prefix'])
    if 'Contents' in response:
        for content in response['Contents']:
            keys.append(content['Key'])
    isTruncated = response['IsTruncated'] == 'true'
    nextMarker = ""
    if isTruncated:
        nextMarker = response['NextMarker']
    return (dirs, keys, nextMarker, isTruncated)


def get_obj(bucket: str, key: str, tmp_file: str):
    for i in range(0, 10):
        try:
            response = cos.download_file(Bucket=bucket,
                                         Key=key,
                                         DestFilePath=tmp_file)
            break
        except CosClientError or CosServiceError as e:
            print(e)


def put_obj(bucket: str, key: str, file_path: str):
    # with open(file_path, 'rb') as fp:
    #     response = cos.put_object(Bucket=bucket, Body=fp, Key=key)
    #     print(response['ETag'])

    for i in range(0, 10):
        try:
            response = cos.upload_file(Bucket=bucket,
                                       Key=key,
                                       LocalFilePath=file_path)
            break
        except CosClientError or CosServiceError as e:
            print(e)
            log_lock.acquire()
            try:
                cos_erro_log_fi.write("%s\t%s\n" % (file_path, key))
                cos_erro_log_fi.flush()
            finally:
                log_lock.release()


def set_not_running():
    running_lock.acquire()
    try:
        global running
        running = False
    finally:
        running_lock.release()


def is_running():
    # 停止了后,不再新增加任务
    running_lock.acquire()
    try:
        global running
        return running
    finally:
        running_lock.release()


def is_not_running():
    # 停止了后,不再新增加任务
    running_lock.acquire()
    try:
        global running
        return not running
    finally:
        running_lock.release()

def get_pooled_task_num():
    task_run_lock.acquire()
    try:
        global pooled_task_num
        return pooled_task_num
    finally:
        task_run_lock.release()

def inc_pooled_task_num():
    # pooled
    task_run_lock.acquire()
    try:
        global pooled_task_num
        pooled_task_num += 1
        return pooled_task_num
    finally:
        task_run_lock.release()


def get_start_task_num():
    task_run_lock.acquire()
    try:
        global start_task_num
        return start_task_num
    finally:
        task_run_lock.release()

def inc_start_task_num():
    task_run_lock.acquire()
    try:
        global start_task_num
        start_task_num += 1
        return start_task_num
    finally:
        task_run_lock.release()


def inc_finish_task_num():
    task_run_lock.acquire()
    try:
        global finish_task_num
        finish_task_num += 1
        return finish_task_num
    finally:
        task_run_lock.release()


def process_one_key(dir: str, key: str, cur_index: int):
    if key.endswith('.mp4'):
        global dir_logs
        global dir_tmp_files
        global dir_target_files
        #-------------------------------------------------
        # if key.endswith('.jpg') and not key.endswith('-des.jpg'):
        # tmp_file = key.replace(prefix, dir_tmp_files + "/")
        # execd_file = key.replace(prefix, dir_target_files + "/")
        _, name = os.path.split(key)
        _, ext = os.path.splitext(key)

        tmp_file = "%s/%07d%s" % (dir_tmp_files, cur_index, ext)
        execd_file = "%s/%07d%s" % (dir_target_files, cur_index, ext)

        target_path = key.replace(prefix, prefix_output)
        os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
        os.makedirs(os.path.dirname(execd_file), exist_ok=True)

        if cos.object_exists(Bucket=bucket, Key=target_path):
            return

        #------------------------------------------------- download
        get_obj(bucket, key, tmp_file)
        #------------------------------------------------- 处理文件
        # 清理旧文件
        if dir_target_files in execd_file and os.path.exists(execd_file):
            os.remove(execd_file)
        # 处理文件
        # ffmpeg -i input.mp4 -vcodec copy -an output.mp4
        # cmd = "cp %s %s && ls -l %s" % (tmp_file, execd_file, execd_file)
        cmd = "ffmpeg -i %s -vcodec copy -an %s && ls -l %s" % (
            tmp_file, execd_file, execd_file)
        # cmd = "ffmpeg -i %s -vcodec copy -acodec copy %s && ls -l %s" % (tmp_file, execd_file, execd_file)
        # os.system("cp %s %s" % (tmp_file, execd_file))
        subprocess.call(cmd, shell=True)
        # 打印文件日志
        #------------------------------------------------- 上传到新cos地址
        put_obj(bucket=bucket, key=target_path, file_path=execd_file)
        #------------------------------------------------- TODO:打印处理文件的日志
        file_stats = os.stat(tmp_file)
        file_size = file_stats.st_size
        log_lock.acquire()
        try:
            cos_process_log_fi.write("%s\t%s\n" % (file_size, target_path))
            cos_process_log_fi.flush()
        finally:
            log_lock.release()
        # 清理本地文件
        if dir_tmp_files in tmp_file and os.path.exists(tmp_file):
            os.remove(tmp_file)
        if dir_target_files in execd_file and os.path.exists(execd_file):
            os.remove(execd_file)


def get_object_meta_target(dir, key):
    # 未开始的任务就不用做了
    if is_not_running():
        return
    cur_index = inc_start_task_num()
    print("开始处理:%07d %s" % (cur_index, key))
    try:
        process_one_key(dir, key, cur_index)
    except Exception as e:
        print(e)
    finally:
        finish_total = inc_finish_task_num()
        print("完成处理:%07d %s 总计:%07d" % (cur_index, key, finish_total))


def check(prefix):
    global executor

    marker = ""
    while True:
        (dirs, keys, nextMarker,
         isTruncated) = list_cos_subobject(Prefix=prefix, Marker=marker)
        # 文件
        for key in keys:
            if is_not_running():
                return
            while get_pooled_task_num() - get_start_task_num() > 10:
                stat_log_print()
                if is_not_running():
                    return
                sleep(1)
            inc_pooled_task_num()
            # 执行任务
            executor.submit(get_object_meta_target, prefix, key)
        # 目录
        for dir in dirs:
            check(dir)
        # 是否处理完
        if not isTruncated:
            break
        # 下一步
        marker = nextMarker


def onSig(signo, frame):
    set_not_running()
    pass

def stat_log_print():
    global pooled_task_num
    global start_task_num
    global finish_task_num

    print("进度:POOL: %5d START: %5d FINISH: %5d" %
              (pooled_task_num, start_task_num, finish_task_num))

if __name__ == "__main__":
    # signal.signal(signal.SIGCHLD, onSigChld)
    signal.signal(signal.SIGINT, onSig)
    signal.signal(signal.SIGTERM, onSig)

    check(prefix)
    while (True):
        # 判断是否处理完
        stat_log_print()
        pre_start_task_num = start_task_num
        if pre_start_task_num == 0:
            continue
        if pre_start_task_num == finish_task_num:
            sleep(10)
            if pre_start_task_num == start_task_num:
                break
        sleep(1)

    print("finish")



# cat cos_process_log.csv |awk '{print $1}' |awk 'BEGIN{sum=0;count=0;}{sum+=$1;count+=1;}END{print sum / 1024 / 1024 / 1024 "GB " count}'


Posted in 未分类 | Leave a comment

containerd启动容器报错

报错信息

**Failed to create pod sandbox: rpc error: code = Unknown desc = failed to create containerd task: failed to create shim task: OCI runtime create failed: unable to retrieve OCI runtime error (open /run/containerd/io.containerd.runtime.v2.task/k8s.io/ed17cbdc31099314dc8fd609d52b0dfbd6fdf772b78aa26fbc9149ab089c6807/log.json: no such file or directory): runc did not terminate successfully: exit status 127: unknown**

系统

Linux k8s-node01 5.18.15-1.el7.elrepo.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Jul 28 09:26:15 EDT 2022 x86_64 x86_64 x86_64 GNU/Linux

原因

centos7默认的libseccomp的版本为2.3.1,不满足containerd的需求

解决

# rpm -e libseccomp-2.3.1-4.el7.x86_64
错误:依赖检测失败:
        libseccomp.so.2()(64bit) 被 (已安裝) chrony-3.4-1.el7.x86_64 需要
# rpm -e libseccomp-2.3.1-4.el7.x86_64 --nodeps

#### 下载libseccomp-2.5.1-1.el8.x86_64.rpm
#### 地址:https://vault.centos.org/centos/8/AppStream/x86_64/os/Packages/libseccomp-devel-2.5.1-1.el8.x86_64.rpm
# 安装
# rpm -ivh libseccomp-2.5.1-1.el8.x86_64.rpm

#### 重启containerd
# systemctl restart containerd
#### 查看执行结果
# kubectl  get pod -A
NAMESPACE     NAME                                       READY   STATUS    RESTARTS       AGE
kube-system   calico-kube-controllers-56cdb7c587-k8grb   1/1     Running   1 (91s ago)    18m
kube-system   calico-node-42qs2                          1/1     Running   2 (56s ago)    18m
kube-system   calico-node-sk6sg                          1/1     Running   1 (110s ago)   18m
kube-system   calico-node-vtq9d                          1/1     Running   1 (85s ago)    18m
kube-system   calico-typha-6775694657-24mzh              1/1     Running   1 (110s ago)   18m
Posted in 小技巧 | Leave a comment

vmware虚拟机配置共享

环境:

  • OS:CentOS-7-x86_64-Minimal-2009.iso
  • 内核版本:kernel-ml-5.18.15-1.el7.elrepo.x86_64
  • 虚拟机平台:VMware® Workstation 16 Pro
  • 虚拟机平台版本:16.2.3 build-19376536
# yum search vm-tools
已加载插件:fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.tuna.tsinghua.edu.cn
 * elrepo: mirrors.tuna.tsinghua.edu.cn
 * extras: mirrors.tuna.tsinghua.edu.cn
 * updates: mirrors.tuna.tsinghua.edu.cn
elrepo                                                                                                            | 3.0 kB  00:00:00     
elrepo/primary_db                                                                                                 | 396 kB  00:00:00     
========================================================= N/S matched: vm-tools =========================================================
open-vm-tools.x86_64 : Open Virtual Machine Tools for virtual machines hosted on VMware
open-vm-tools-desktop.x86_64 : User experience components for Open Virtual Machine Tools
open-vm-tools-devel.x86_64 : Development libraries for Open Virtual Machine Tools
open-vm-tools-test.x86_64 : Test utilities for Open Virtual Machine Tools
qemu-kvm-tools.x86_64 : KVM debugging and diagnostics tools

  名称和简介匹配 only,使用“search all”试试。

# yum install open-vm-tools
# vmware-hgfsclient
sharedir
# mkdir /mnt/hgfs/
# vmhgfs-fuse .host:/sharedir /mnt/hgfs
# cp /etc/fstab /etc/fstab.bak
##### /etc/fstab 中添加一行
.host:/sharedir /mnt/hgfs fuse.vmhgfs-fuse allow_other,defaults 0 0

# reboot

注意:不需要安装vmware-tools,vmware-tools不兼容kernel,无法配置目录共享

Posted in 小技巧 | Leave a comment

SSH登录Vmware虚拟机Linux服务器慢问题

现象:

每次通过ssh登录机器都特别慢,大约30秒,甚至更长

环境:

  • OS:CentOS-7-x86_64-Minimal-2009.iso
  • 虚拟机平台:VMware® Workstation 16 Pro
  • 虚拟机平台版本:16.2.3 build-19376536

解决:

vim /etc/ssh/sshd_config
# 找到UseDNS yes,打开注释,并设置为no
UseDNS no
# 重启sshd_server
systemctl restart sshd
Posted in 小技巧 | Leave a comment

移动终端网络接入

1.1 终端接入网络

1.1.1 移动终端接入网络有如下的几种情况

  • 终端设备在属地,终端设备通过基站,接入属地的网络
  • 终端设备在国内漫游时,移动设备会漫游连接到当地的网络,联通和电信则需要漫游回属地的网络
  • 终端设备在国外漫游时,需要漫游回属地的网络

1.1.2 基站接入分析

移动终端通过基站接入移动运营商网络,终端与基站之间是数据链路层,不涉及网络层(IP)和传输层(TCP),终端设备的IP地址是由运营商分配的,在切换基站时一般不会引起IP地址的变化

有以下几种情况:

  • 当设备进行重启、飞行模式切换等时,设备会重新发起接入,这时IP地址会发生改变
  • 设备在同一区域内切换基站的过程中,如果没有发生断网情况下,即没有重新接入,IP地址是不会变化的
  • 设备在区域间切换基站,比如联通设备从北京到河北,接入由北京联通变成河北联通,IP地址会发生变化

终端设备切换基站一般情况下可在50ms~200ms完成,TCP是基于连接的协议,连接状态由状态机来维护,连接完毕后,双方都会处于established状态,它们之间的连接由各自的IP和TCP的端口唯一标识,即使这个连接没有任何数据,但仍是保持连接状态。TCP的KeepAlive机制用于检测连接死活,一般时间为 7200 s,失败后重试 10 次,每次超时时间 75 s,以释放无效链接。这个时间比切换基站时间要大的多,因此TCP通道在切换基站时,其IP地址一般没有变化,所以基于IP和端口的已建立的TCP连接不会失效。

1.1.3 DNS解析

当前移动 DNS 的现状:

  • 运营商 LocalDNS 出口根据权威 DNS 目标 IP 地址进行 NAT,或将解析请求转发到其他DNS 服务器,导致权威 DNS 无法正确识别运营商的 LocalDNS IP,引发域名解析错误、流量跨网。
  • 域名被劫持的后果:网站无法访问(无法连接服务器)、访问到钓鱼网站等。
  • 解析结果跨域、跨省、跨运营商、国家的后果:网站访问缓慢甚至无法访问。

为了解决这些问题,通常TCP网关的地址可以通过HttpDNS技术获取,以避免DNS解析异常、域名劫持的问题。客户端直接访问HTTPDNS接口,获取服务最优IP,返回给客户端,客户拿到IP地址后,直接使用此IP地址进行连接。

1.2 接入层

接入层最靠近客户端,接入层一般使用LVS(DR模式)+VIP+HAProxy来实现,如果使用公有云也可以使用云服务提供的负载均衡服务,如使用腾讯云的CLB,阿里云的ALB,配置按TCP转发;有矿的话可以使用F5硬件来做接入层;保留这一层有如下好处:

  • 负载均衡:均衡客户端连接,尽量保证连接在连接服务器上均衡
  • 真实服务不需要公网IP,因为它不需要对外暴露IP地址,更安全
  • 会话保持

1.3 长连接服务器

长连接服务部署的机器关注以下几个配置项

nf_conntrack_max
nf_conntrack_max 决定连接跟踪表的大小,当nf_conntrack模块被装置且服务器上连接超过这个设定的值时,系统会主动丢掉新连接包,直到连接小于此设置值才会恢复。
Backlognet.core.somaxconn排队等待接受的最大连接数

net.core.netdev_max_backlog数据包在发送给cpu之ueej被网卡缓冲的速率,增加可以提高有高带宽机器的性能
文件描述符sys.fs.file-max允许的最大文件描述符 /proc/sys/fs/file-max

nofile应用层面允许的最大文件描述数 /etc/security/limits.conf
portsnet.ipv4.ip_local_port_range端口范围

net.ipv4.tcp_tw_reuse端口复用,允许time wait的socket重新用于新的连接,默认为0,关闭短连接设置为1

net.ipv4.tcp_tw_recycletcp连接中的time wait的sockets快速回收,默认为0,表示关闭

1.4 服务实现

1.4.1 认证:

验证终端身份,确保只有合法的终端才能够使用服务,流程如下

  1. 服务端生成设备私钥、公钥;私钥执久化到设备上,公钥保存在服务端
  2. 握手:客户端发起TCP连接,TCP连接建立成功后,服务端生成256字符随机字串 randomMsg,返回客户端
  3. 客户端登录:客户端拿出token+randomMsg,使用其私钥签名得到 secretChap,并把token、secretChap 通过TCP通道上报服务端
  4. 服务端验证:服务端使用设备公钥验证签名,并调用 Passport 服务验证 token,拿到用户信息;服务端配置用户、设备的路由信息
  5. 协商对称密钥:服务端验证后,生成并返回对称密钥 secureKey,返回的对称密钥 secureKey 使用终端的公钥加密,只有使用设备的私钥才可以解密;客户端解密对称密钥 secureKey,至此服务端和终端完成密钥协商,之后可以愉快并安全的通信了。

模拟代码:

import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.RandomStringUtils;

import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;

public class AuthClientTest {
    private static final byte[] PARAM_IV;

    static {
        String PARAM_IV_CONFIG = Base64.encodeBase64String(new SecureRandom().generateSeed(32));
        PARAM_IV = Base64.decodeBase64(PARAM_IV_CONFIG);
    }

    private static class ClientStore {
        private static final byte[] PARAM_IV = AuthClientTest.PARAM_IV;
        String privateKey;
        String token;// 用户标识信息

        String randomMsg;
        // 对称密钥
        byte[] secretKey;
    }

    private static class ServerStore {
        private static final String SECRET_KEY = RandomStringUtils.random(32);
        private static final byte[] PARAM_IV = AuthClientTest.PARAM_IV;

        // 对称密钥
        byte[] secretKey = new SecureRandom().generateSeed(32);

        String publicKey;
        String randomMsg;
    }

    public static void main(String[] args) throws Exception {
        // s0 初始化,生成设备的公私钥
        SHA256SignUtil.RsaKeys rsaKeys = SHA256SignUtil.generateKeyBytes();
        ClientStore clientStore = new ClientStore();
        ServerStore serverStore = new ServerStore();
        // 发送私钥到客户端
        clientStore.privateKey = Base64.encodeBase64String(rsaKeys.getPrivateKey());
        // 公钥保存在服务器端
        serverStore.publicKey = Base64.encodeBase64String(rsaKeys.getPublicKey());
        // s1: 握手
        // s1.1 客户端连接服务端 app -> server
        serverStore.randomMsg = RandomStringUtils.randomAlphanumeric(256);
        // s1.2 TCP建立成功后 server -(randomMsg)-> app
        clientStore.randomMsg = serverStore.randomMsg;
        // s2 端侧签名登录 app -(token,secretChap:privateKey签名)-> server
        clientStore.token = genToken(1000L);
        Map<String, Object> data = new HashMap<>();
        data.put("token", clientStore.token);
        PrivateKey privateKey = SHA256SignUtil.restorePrivateKey(Base64.decodeBase64(clientStore.privateKey));
        byte[] secretChap = SecretChapUtils.createSecretChap(data, clientStore.randomMsg, privateKey);
        // s3 服务端验证登录
        PublicKey publicKey = SHA256SignUtil.restorePublicKey(Base64.decodeBase64(serverStore.publicKey));
        boolean verify = SecretChapUtils.verifySecretChap(data, serverStore.randomMsg, secretChap, publicKey);
        System.out.println(verify);
        // s4 服务端下发对称密钥 server -(secretKey:publicKey加密)-> app
        byte[] secureKey = SHA256SignUtil.encryptByPublicKey(serverStore.secretKey, publicKey.getEncoded());
        // s5 端侧解密对称密钥并存储
        clientStore.secretKey = SHA256SignUtil.decryptByPrivateKey(secureKey, privateKey.getEncoded());
        // s6 正常加密传输数据
        byte[] encrypt = AesUtil.encrypt("田加国是好人".getBytes(StandardCharsets.UTF_8), clientStore.secretKey, ClientStore.PARAM_IV);
        byte[] decrypt = AesUtil.decrypt(encrypt, serverStore.secretKey, ServerStore.PARAM_IV);
        String sourceData = new String(decrypt, StandardCharsets.UTF_8);
        System.out.println(sourceData);
    }

    private static String genToken(long uid) {
        HashMap<String, Object> claims = new HashMap<>();
        claims.put("iss", "user.tianjiaguo.com");
        claims.put("expire", System.currentTimeMillis() + 24 * 60 * 60 * 1000L * 30);
        claims.put("uid", uid);
        claims.put("type", 2);
        return Jwts.builder().setClaims(claims)
                .signWith(SignatureAlgorithm.HS256, ServerStore.SECRET_KEY.getBytes())
                .compact();
    }
}

1.4.2 连接保持

心跳机制+自适应心跳:

  1. 端侧定时发送心跳包,服务端重置心跳检查点
  2. 端侧会根据业务数据,决定是否、何时上报心跳包
  3. 心跳包频率可控

附录:

示例架构图

未完

Posted in 未分类 | Leave a comment

用detectron2做识别

config.yaml

VERSION: 2

OUTPUT_DIR: /data/mooc/data_kiti/src/output
#coding=utf-8
# conda activate leastereo
import glob
import os
import torch

from detectron2.structures import BoxMode
from utils import writexml
import cv2
import numpy as np

from detectron2.engine import DefaultTrainer
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.data import build_detection_train_loader
from detectron2.utils.logger import setup_logger
from detectron2.utils.visualizer import Visualizer

setup_logger()

DETECTRON2_REPO_PATH = "/data/mooc/workspace/detectron2/"

KEYS = ['Truck', 'Cyclist', 'Car', 'DontCare', 'Tram', 'Pedestrian', 'Person_sitting', 'Van', 'Misc']
KEYS_DICS = {}
index = 0
for key in KEYS:
    KEYS_DICS[key] = index
    index += 1

def get_tl_dicts(data_dir):
    cates = {}
    dataset_dicts = []
    list_anno_files = glob.glob(data_dir + "label_2/*")
    max_num = 100
    for file_path in list_anno_files:
        with open(file_path) as f:
            max_num = max_num - 1
            if max_num < 0:
                break
            file_name = os.path.basename(file_path)  # 000000.txt
            simple_name, ext = os.path.splitext(file_name)  # 000000 .txt
            image_path = data_dir + "image_2/" + simple_name + ".png"
            height, width = cv2.imread(image_path).shape[:2]

            record = {}
            record["file_name"] = image_path
            record["height"] = height
            record["width"] = width
            objs = []
            anno_items = f.readlines()
            for anno_item in anno_items:
                anno_infos = anno_item.split(" ")
                if anno_infos[0] == "Misc" or anno_infos[1] == "DontCare":
                    continue
                category = anno_infos[0]
                xmin = int(float(anno_infos[4]))
                ymin = int(float(anno_infos[5]))
                xmax = int(float(anno_infos[6]))
                ymax = int(float(anno_infos[7]))
                obj = {
                    "bbox": [xmin, ymin, xmax, ymax],
                    "bbox_mode": BoxMode.XYXY_ABS,
                    "category_id": KEYS_DICS[category],
                    "iscrowd": 0
                }
                cates[category] = KEYS_DICS[category]
                objs.append(obj)
            record["annotations"] = objs
            dataset_dicts.append(record)
        # print(cates)
    return dataset_dicts


# 打印类别
# {'Truck', 'Cyclist', 'Car', 'DontCare', 'Tram', 'Pedestrian', 'Person_sitting', 'Van', 'Misc'}
# print(get_tl_dicts("/data/mooc/data_kiti/image/training/"))

def register_dataset():
    d = "training"
    # 注册数据集
    DatasetCatalog.register("training_data", lambda d="training":get_tl_dicts("/data/mooc/data_kiti/image/training/"))
    # 数据集添加元信息,主要是类别名,用于可视化
    MetadataCatalog.get("training_data").set(thing_classes=KEYS) # ["Car"...]
    # 添加数据集评估指标,采用coco评测准则
    MetadataCatalog.get("training_data").evaluator_type = "coco"
    tl_metadata = MetadataCatalog.get("training_data")
    return tl_metadata

def train_kiti_data():
    # 训练
    cfg = get_cfg()
    cfg.merge_from_file(DETECTRON2_REPO_PATH + "configs/COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")
    cfg.DATASETS.TRAIN = ("training_data",) # 使用数据集
    # train_loader = build_detection_train_loader(cfg, mapper=None)
    # no metrics implemented for this dataset
    # no metrics implemented for this dataset
    cfg.DATASETS.TEST = ()
    cfg.DATALOADER.NUM_WORKERS = 2
    # 从 Model Zoo 中获取预训练模型
    # cfg.MODEL.WEIGHTS = "https://dl.fbaipublicfiles.com/detectron2/COCO-Detection/faster_rcnn_R_50_FPN_3x/137849458/model_final_280758.pkl"
    # cfg.MODEL.WEIGHTS = "/data/mooc/workspace/detectron2/models/model_final_280758.pkl"
    cfg.MODEL.WEIGHTS = "./output/model_final.pth"  # initialize from model zoo
    cfg.SOLVER.IMS_PER_BATCH = 2
    cfg.SOLVER.BASE_LR = 0.01  # 学习率
    cfg.SOLVER.MAX_ITER = 300  # 最大迭代次数
    cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
    cfg.MODEL.ROI_HEADS.NUM_CLASSES = 9

    os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)
    trainer = DefaultTrainer(cfg)
    trainer.resume_or_load(resume=False)
    trainer.train()  # 开始训练

def get_config():
    cfg = get_cfg()
    cfg.merge_from_file(DETECTRON2_REPO_PATH + "configs/COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")
    cfg.merge_from_file("/data/mooc/data_kiti/src/config.yaml")
    print(cfg.OUTPUT_DIR)
    cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
    cfg.MODEL.DEVICE = "cpu"
    cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.8   # set the testing threshold for this model
    cfg.MODEL.ROI_HEADS.NUM_CLASSES = 9
    return cfg

def get_predictor(cfg):
    predictor = DefaultPredictor(cfg)
    return predictor

def testing_kiti_data(cfg, predictor, file_path, tl_metadata):
    print(file_path)
    
    # cfg = get_cfg()
    # cfg.merge_from_file(DETECTRON2_REPO_PATH + "configs/COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")
    # cfg.merge_from_file("/data/mooc/data_kiti/src/config.yaml")
    # print(cfg.OUTPUT_DIR)
    # cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
    # cfg.MODEL.DEVICE = "cpu"
    # cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.8   # set the testing threshold for this model
    # cfg.MODEL.ROI_HEADS.NUM_CLASSES = 9

    # predictor = DefaultPredictor(cfg)
    im = cv2.imread(file_path)
    outputs = predictor(im)
    v = Visualizer(im[:, :, ::-1],
                   metadata=tl_metadata, 
                   scale=0.8,
    )
    v = v.draw_instance_predictions(outputs["instances"].to("cpu"))
    ret_m = v.get_image()

    file_name = os.path.basename(file_path)  # 000000.txt
    simple_name, ext = os.path.splitext(file_name)  # 000000 .txt
    cv2.imencode(".png",ret_m)[1].tofile(cfg.OUTPUT_DIR + "/" + file_name)
    cv2.waitKey()
    print(outputs)


def im2bytes(im: np.ndarray, ext='.jpg') -> bytes:
    return cv2.imencode(ext, im)[1].tobytes()


def bytes2im(buf: bytes) -> np.ndarray:
    return cv2.imdecode(np.frombuffer(buf, np.uint8), cv2.IMREAD_COLOR)

    
if __name__ == '__main__':
    metadata = register_dataset()
    print("*"*100)
    # train_kiti_data()
    print("*"*100)
    cfg = get_config()
    predictor = get_predictor(cfg)
    testing_kiti_data(cfg, predictor, "/data/mooc/data_kiti/image/testing/image_2/000001.png", metadata)
    testing_kiti_data(cfg, predictor, "/data/mooc/data_kiti/image/testing/image_2/000314.png", metadata)
    testing_kiti_data(cfg, predictor, "/data/mooc/data_kiti/image/testing/image_2/000045.png", metadata)
    
    # print("*"*100)
    # cfg = get_cfg()
    # model = torch.load(os.path.join(cfg.OUTPUT_DIR, "model_final.pth"))
    # print(model)
    
    
Posted in 未分类 | Leave a comment

博客网站搭建

最近把网站迁移到国内的云服务厂商上,之前几次迁移部署都是采用编译安装的方式,非常的不方便,此次采用的是docker方式搭建环境的配置,前端也不再采用nginx,采用了能够自动生成和替换证书的ca#ddy来实现的。

安装workpress

安装docker和docker-compose

# 安装docker
...
# 安装docker-compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

编写wordpress的docker-compose.yml文件

version: "3.9"
    
services:
  db:
    image: mysql:8.0.25
    volumes:
      - ~/docker/mysql/data:/var/lib/mysql
    # docker的重启策略:在容器退出时总是重启容器,但是不考虑在Docker守护进程启动时就已经停止了的容器
    restart: unless-stopped
    command:
      # MySQL8的密码验证方式默认是 caching_sha2_password,但是很多的连接工具还不支持该方式
      # 就需要手动设置下mysql的密码认证方式为以前的 mysql_native_password 方式
      --default-authentication-plugin=mysql_native_password
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_general_ci
    environment:
      MYSQL_ROOT_PASSWORD: {数据库ROOT密码}
      MYSQL_DATABASE: {数据库名称}
      MYSQL_USER: {数据库普通用户名,可用来做wordpress的用户}
      MYSQL_PASSWORD: {数据库普通用户的密码}
      TZ: Asia/Shanghai
    ports:
      - 3306:3306
    volumes:
      - ~/docker/mysql/data:/var/lib/mysql
      - ~/docker/mysql/conf:/etc/mysql/conf.d
      - ~/docker/mysql/logs:/logs
    networks:
      - app-network
    
  wordpress:
    depends_on:
      - db
    image: wordpress:latest
    ports:
      - "8000:80"
    restart: always
    environment:
      WORDPRESS_DB_HOST: db:3306
      WORDPRESS_DB_NAME: {数据库名称}
      WORDPRESS_DB_USER: {数据库普通用户名,可用来做wordpress的用户}
      WORDPRESS_DB_PASSWORD: {数据库普通用户的密码}
    volumes:
      - ~/docker/wordpress:/var/www/html
    networks:
      - app-network

networks:
  app-network:
    driver: bridge

本地需要创建mysql和wordpress的目录,docker会把本地创建的目录挂载到docker镜像中,这样mysql的数据和wordpress的配置才会在docker销毁时不会丢失。

mkdir mysql
mkdir mysql/conf
mkdir mysql/data
mkdir mysql/log
mkdir wordpress

启动docker镜像

docker-compose up

然后临时打云厂商的防火墙的8000端口,在本地看是docker是否能到安装配置页面,如果可以正常打开,说明wordpress已经安装配置完成,可以使用 docker-compose up -d 转到后台执行,我们现在需要把云厂商的防火墙关闭。

安装配置ca#ddy

编写执行脚本

#!/usr/bin/env sh

VERSION=2.4.1
IMAGE=caddy:${VERSION}
CMD=""

docker run -itd --restart=always \
  --name caddy \
  --net=host \
  -v $PWD/conf/Caddyfile:/etc/Caddyfile \
  -v $PWD/data:/data/caddy \
  -v $PWD/www:/opt/www \
  ${IMAGE} caddy run -config /etc/Caddyfile

编写Ca#ddy的配置文件

https://www.{域名}.com https://{域名}.com http://www.{域名}.com http://{域名}.com {

    tls internal {
        on_demand
    }

    route /* {
        reverse_proxy * localhost:8000 {
            header_up -Origin
        }
        encode * {
            gzip
            zstd
        }
    }

    log {
        output stdout
    }

}

预创建目录

mkdir caddy
mkdir caddy/conf
mkdir caddy/data
mkdir caddy/www

执行脚本启动服务器,服务启动后我们在本地机器上可以通过配置hosts的方式,连接上去,看是否能打开wordpress的安装页面。

配置证书

我们使用cloudflare的证书,需要先到其官方网站上注册 https://dash.cloudflare.com/login 账号。

接下来我们需要在网站上添加我们的网站

然后到自己的域名服务商修改域名服务器为cloudflare的,等cloudflare检测通过

# 域名服务器
braden.ns.cloudflare.com
shubhi.ns.cloudflare.com

接下来需要拿到我们的api key,点击cloudflare右上角 “我的个人资料” – “API令牌“

配置ca#ddy

在服务器 caddy/conf 文件夹下编写文件 caddy.service 文件内容为:

[Service]
Environment=CLOUDFLARE_EMAIL={你的邮箱地址}
Environment=CLOUDFLARE_API_KEY={刚拿到的cloudflare密钥}

接下来重启ca#ddy

docker restart caddy

此时用chrome打开网站,查看地址栏左侧是否提示锁的图标,如果不是也不用着急,有可能是域名解析还没有完全切换到新的域名服务器,这需要一定的时间。

Posted in 未分类 | Leave a comment

redis数据结构

数据结构在新版本的Redis中做了优化

Posted in 未分类 | Leave a comment

k8s服务访问

k8s基本网络模型

约法三单

  1. pod之间可以直接通信,无需显式使用NAT接收和地址转换
  2. node与pod之间可以直接通信,无需要明显地址转换
  3. pod可以看到自己的ip跟别人看到它所用的ip是一样的,中间不经过转换

分类

  1. underlay:与Host网络同层
  2. ovrelay:只要与Host网络不冲突,ip可以自由分配

Kubenets Service

metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 80
      targetPort: 9376

Service工作原理:

通过kube-proxy设置宿主机iptables规则来实现,kube-proxy通过观察service的创建,然后通过修改本机的iptables规则,将访问service vip的请求转发到真实的Pod上。

基于iptables规则的service的实现,宿主机上有大量Pod时,规则的不断刷新占用大量CPU资源,新的模式:ipvs,通过把规则放到内核态,降低了维护规则的代价

  1. Service的DNS记录:<myservice>.<mynamespace>.svc.cluster.local,访问这条记录时,返回的是Service的VIP或代理Pod的IP地址集合
  2. Pod的DNS记录:<pod_hostname>.<subdomain>.<mynamespace>.svc.cluster.local,注意pod的hostname和subdomain都是在Pod中定义的

集群内访问

`my-service:80 -> 192.168.11.236:9376` 

`my-service.my-namespace:80 -> 192.168.11.236:9376`

`172.18.255.16:80 -> 192.168.11.236:9376`

环境变量

TASK_BUS_SERVICE_HOST=172.18.254.152
TASK_BUS_SERVICE_PORT=80
TASK_WORKER_SERVICE_HOST=172.18.255.164
TASK_WORKER_SERVICE_PORT=80

Headless ServiceclusterIP: None不再提供虚拟IP来负载均衡

`my-service:9376 -> 192.168.11.236:9376,192.168.11.237:9376` 

`my-service.my-namespace:9376 -> 192.168.11.236:9376,192.168.11.237:9376`

集群外访问(外部宿主机访问)

  1. NodePort:外部client访问任意一台宿主机的8080端口,就是访问Servicer所代理的Pod的80端口,由接收外部请示请求的宿主机做转发,即client -> nodeIP:nodePort -> serviceVIP:port -> podIp:targetIp
  2. LoadBalance:公有云提供的k8s服务自带的loadbalancer做负载均衡和外部流量 的入口
  3. ExternalName:通过ExternalName或ExternalIp给Service挂在一个公有IP或者域名,当访问这个公有IP地址时,就会转发到Service所代理的Pod服务上。类似于软链或快捷方式
  4. ClusterIP:虚拟IP地址,外部网络无法访问,只有k8s内部访问使用。更像是一个伪造的IP网络
    • 仅仅用于Service这个对象,并由k8s管理和分配IP地址
    • 无法被Ping通,没有一个“实体网络对象”来响应
    • 只能结合Service Port组成一个具体的通信端口,单独的ClusterIP不具备通信的基础,并且它们属于k8s集群这样一个封闭的空间
    • 不同Service下的Pod节点在集群间相互访问可以通过ClusterIP

Service与Ingress

Ingress:全局的、为了代理不同后端Sercie而设置的负载均衡服务,只能工作在7层,而service工作在四层

Ingress对象:k8s项目对反向代理的一种抽象

Ingress Controller:目前支持Nginx、Haproxy、Envoy、Traefik等;nginx-ingress-controller只是被代理的Serive对象被更新,是不需要重新加载的,因为它的Lua方案实现了Nginx Upstream的动态配置。

nginx-igress-service:暴露服务出去,如果是公有云,需要创建LoadBalancer类型的Service

default-backend-service:默认规则

apiVersion: extensions/v1beat1
kind: Ingress
metadata:
  name: a-ingress
spec:
  tls:
  - hosts:
    # 标准域名格式,不能是IP地址
    - a.example.com
    secretName: a-secret
  rules:
  - hosts: a.example.com
    http:
    - path: /a1
      backend:
        serviceName: a1-svc
        servicePort: 80
    - path: /a2
      backend:
        serviceName: a2-svc
        sercvicePort: 80
Posted in Kubernetes | Tagged , | Leave a comment

k8s三层网络实现

字典

  1. 下一跳:如果ip包从主机A发送到主机B,需要经过路由设备X的中转,那么X的IP地址就应该配置为主机A的下一跳地址。
  2. Host-gw模式:将每个 Flannel 子网(Flannel Subnet,比如10.244.1.0/24)的下一跳 ,设置成该子网对应宿主机的IP地址。也就是主机(host)充当这条容器通信路径里的网关(gateway),也就是host-gw的含义;无封装,纯路由,只经过协议栈一次,性能较高
  3. 边际网关协议:BGP,Border Gateway Protocol,大规模数据中心维护不同“自治系统”之间路由信息、无中心的路由协议; 它不会在宿主上创建任何网桥设备相当于:每个边界网关上运行着一个小程序,它们会将各自的路由表信息,通过TCP传输给其它边界网关。其它边界网关上的这个小程序,则会对收到的这些数据进行分析,然后把需要的信息添加到自己的路由表。
    • CNI:与k8s对接的部分
    • Felix:DeamonSet,wmgm宿主机上插入路由规则,即写入Linux内核的FIB转发信息库,以及维护Calico所需要的网络设备等工作
    • BIRD:BGP客户端,专门负责在集群里分发路由信息
  4. 边际网关:负责把自治系统连接在一起的路由器。它的路由表中有其它自治系统里的主机路由信息;
  5. Calico:集群中所有的节点,都是边界路由器,被称为BGP Peer;默认情况下,是一个Node-to-Node Mesh的模式。随着节点的增多,会以N^2的规模快速增长。一般推荐在少于100个节点的集群中
  6. Calico Route Reflector的模式,大集群,它指定一个或几个专门节点,来负责与所有节点建立BGP连接,从而学习全局路由规则,其它节点,只需要给它交换路由信息。
  7. Calico IPIP模式:添加的路由规则10.233.2.0/24 via 192.168.2.2 tunl0,下一跳地址是Node2 IP地址,但发出去包的设备是tunl0(注意,不是flannel UDP模式的tun0,它们的功能是不同的)它是一个IP隧道设备(IP tunnel),IP包进入IP隧道后,被内核IPIP驱动接管。它会将这个IP包直接封装在一个宿主机网络的Ip包中。

Calico IPIP模式:

Flannel Host-gw模式

$ ip route
...
10.244.1.0/24 via 10.168.0.3 dev eth0

目的地址属于10.244.1.0/24网段的IP包,应该经过本机的eth0设备(dev eth0)设备发出去,并且它的下一跳(next-lop)是10.168.0.3(via 10.168.0.3)

从示意图上看,下一跳地址是目的宿主机Node2,所以IP包从应用层到链路层被封装成帧时,etho会使用下一跳地址对应的MAC地址,做为数据帧的MAC地址。

Node 2的内核网络从二层拿到IP包,就会看到这个目的IP地址:10.244.1.3,根据Node 2的路由表,会匹配到第二条路由规则,从而进入cni0网桥,进入infra-containeere-2当中。

Flannel子网和主机的信息,都是保存在Etcd中的。flanneld只需要wacth这些数据变化,然后实时更新路由表则可。性能损失在10%左右,其它基于VXLAN隧道机制的网络方案,性能损失在20%~30%左右。

Flannel host-gw限制,要求宿主机必须是二层连通;必须有其它宿主机的路由信息;

Posted in Kubernetes | Tagged , | Leave a comment