服务器

版本:24.03 LTS SP2

UNT用户指南

简介

特性介绍

Spark、Hive、Flink等大数据引擎中提供的Function有限,往往不能够满足客户需求,需要由客户自定义一些UDF来满足自己的业务需求。

以Flink DataSream为例,用户希望在享受开源标准带来的兼容性和灵活性的同时,获得更高的性能和更低的成本,当前的Flink引擎优化主要是基于开源的Flink Java工程进行改进,性能提升存在天花板,我们需要基于Native化引擎更大程度地提升系统性能,突破现有的性能瓶颈,同时保持对Flink的兼容性。Spark已经实现了类似的优化,但Flink尚未有类似的进展。Flink流处理业务非常广泛,其中80%以上的业务场景需要使用UDF,因此,我们面向Flink native引擎提供了一套UDF自动native化管理系统,该系统能够将用户加载的UDF字节码自动转换为native二进制并自动化替代原始UDF运行于Flink等大数据引擎中。

UNT特性

  • 实现了将业务jar包字节码自动转换为IR代码,而后根据大数据引擎UDF规则自动提取UDF代码,并解析UDF外部依赖。
  • 实现了从内存对象自动管理、硬件亲和加速等维度对UDF IR进行优化。
    • 建立UDF对象声明周期管理规则,基于该规则自动插入内存对象引用及释放代码。
    • 建立基础类对象池,将基础类内存申请接口自动替换为对象池对象获取接口。
    • 自动根据执行环境硬件匹配亲和库,并在IR中自动替换为硬件亲和库调用。
  • 实现了针对Java转c++的UDF翻译,自动将用户的原始java程序翻译成c++代码并编译。

约束与限制

在特性配置之前,请先了解UNT特性的使用限制。

  1. 整体规格约束

    native翻译UDF Function类型约束:支持Function类型白名单内的UDF native翻译。

    native翻译UDF成员方法语法约束

    text
    - 支持Java类型翻译白名单内的类型native翻译。
    - 支持Java语句翻译白名单内的语句native翻译。
    - 支持Java关键字翻译白名单内的关键字native翻译。

    native翻译UDF成员对象类型约束:native翻译UDF成员对象运行时类型必须与静态定义类型完全一致,否则UDF native翻译失败,回退至原生UDF。

    native翻译jar包打包约束:nativa翻译的输入jar包必须是包含所有依赖的胖包。

    native翻译接口返回值约束:父子类相同接口的返回值属性需相同(接口返回值属性为0代表返回值为空、基础类型、集合类元素或类对象成员,接口返回值属性为1代表其他情况)。

    native扫描UDF约束:暂不支持扫描lambda形式的UDF。

  2. Function类型白名单

    支持的Function类型:

    • FlatMapFunction
    • KeySelector
    • MapFunction
    • ReduceFunction
    • RichFilterFunction
    • RichFlatMapFunction
  3. Java关键字翻译白名单

    支持的Java关键字:

    • abstract
    • boolean
    • break
    • byte
    • case
    • char
    • class
    • continue
    • default
    • do
    • while
    • double
    • if
    • else
    • for
    • extends
    • float
    • final
    • int
    • implements
    • import
    • interface
    • instanceof
    • long
    • new
    • package
    • private
    • protected
    • public
    • return
    • short
    • static
    • switch
    • this
    • void
    • volatile
  4. Java类型翻译白名单

    支持的Java类型如下:

    • boolean
    • byte
    • char
    • short
    • int
    • long
    • double
    • float
    • Array
    • null
    • void
    • Class
  5. Java语句翻译白名单

    • InvokeStmt(函数/方法调用语句,不支持dynamicinvoke)

    例子:

    java
    public class DemoClass{
        public void print(int x){
            int a = increment(x);
            System.out.println(a);
            a = increment(x);
            System.out.println(a);
        }
        public int increment(int x){
            return x+1;
        }
    }
    • IdentityStmt(this成员赋值)

    例子:

    java
    public class DemoClass{
        private int counter;
        public void DemoClass(int counter){
            this.counter = counter;
        }
    }
    • AssignStmt(赋值语句)

    例子:

    java
    public class DemoClass{
        private int counter = 0;
        public int updateCounter(){
            counter = counter + 1;
            return counter;
        }
    }
    • IfStmt(if语句)

    例子:

    java
    public class DemoClass{
        public static void sampleMethod(int x){
            if(x % 2 == 0){
                System.out.println("Even");
            }else{
                System.out.println("Odd");
            }
        }
    }
    • Switch(switch语句)

    例子:

    java
    public class DemoClass{
        public void switchExample(int x){
            switch(x){
                case 1:
                    System.out.println("Input1");
                    break;
                case 2:
                    System.out.println("Input2");
                    break;
                default:
                   System.out.println("Input more than 2");
                   break; 
            }
        }
    }
    • ReturnStmt(return语句)

    例子:

    java
    public class DemoClass{
        public int increment(int x){
            return x + 1;
        }
    }
    • GotoStmt(goto语句)

    例子:

    java
    public class DemoClass{
        public static void sampleMethod(){
            for(int i = 0; i < 5; i++){
                if(i == 3){
                    break;
                }
            }
        }
    }

安装与部署

软件要求

  • jdk1.8
  • python3
  • maven3.6.3

硬件要求

  • aarch64架构
  • x86_64架构

安装软件

UNT使用rpm方式安装部署。

shell
rpm -ivh UNT-1.0-5.oe2403sp2.noarch.rpm

安装完成后会在/opt/udf-trans-opt目录下生成文件夹udf-translator,该目录即为UNT的工作目录。

text
#目录结构
bin:执行脚本所在目录
conf:配置文件目录
lib:依赖所在目录
cpp:翻译完成的cpp源文件所在的目录,下级不同的jar包对应不同的目录
log:翻译生成的日志记录
output:翻译完成后编译生成的so所在目录,下级不同的jar包对应不同的jar目录

同时会在/usr/bin下面生成native_udf.py文件,用于查看翻译相关信息

使用方法

UNT的翻译依赖于配置文件。

配置文件及简介如下

text
conf/depend_class.properties:java侧与native侧类名映射关系配置
conf/depend_include.properties:头文件路径配置
conf/depend_interface.config:依赖接口配置

其中conf为相对目录,基目录配置说明可查看第4小节修改UNT用户配置

使用步骤如下:

  1. 扫描缺失接口

    使用native_udf.py depend_info ${job_jar}命令扫描缺失接口。

    扫描结果示例:

    text
    java.lang.String
    Methods:
        int length()

    示例显示缺失Stringlength()函数接口。

  2. 实现缺失接口

    需要根据基础库编写规范实现native侧相关接口,如用户可以按如下方式声明String中的length()接口,用户可根据接口自行进行函数实现。

    cpp
    //String头文件
    class String : public Object 
    {
    public:
        int32_t length() const;
    private:
        std::string inner;
    }
    cpp
    //String cpp文件
    int32_t String::length() const
    {
        return static_cast<int32_t>(inner.size());
    }

    实现完成后需要编译为libbasictypes.a文件,注意,名字必须是libbasictypes.a

  3. 增加接口配置文件

    实现接口后需要增加相应的接口配置文件。

    • 在depend_class.properties文件中增加java到native的类名映射。
    text
    java.lang.String=String

    该配置的key表示java中的String类,value表示native侧对应的类名。

    • 在depend_include.properties文件中增加头文件路径配置。
    text
    java.lang.String=basictypes/String.h

    该配置的key表示java中的String类,value表示native侧头文件所在的相对路径,基目录配置说明可查看第4小节修改UNT用户配置

    • 在depend_interface.config文件中增加依赖接口配置。
    text
    <java.lang.String: int length()>, 0

    该配置的第一个元素表示java中对应函数的签名,value表示其内存语义,内存语义相关信息请参考自开发native规范中内存语义规范相关内容。

  4. 修改UNT用户配置

    该配置文件默认在/opt/udf-trans-opt/udf-translator/conf/udf_tune.properties目录下。

    配置文件内容为:

    text
    basic_lib_path=/opt/udf-trans-opt/libbasictypes
    tune_level=0
    regex_lib_type=1
    regex_lib_path=/usr/local/ksl/lib/libKHSEL_ops.a
    compile_option=

    basic_lib_path用于配置基础库的基目录,该目录下存在三个子目录。

    • conf目录:用于存放配置文件,此目录为配置文件的基目录。
    • include目录:用于存放第2小节中实现的native头文件,此目录为头文件的基目录。
    • lib目录:用于存放用户编译出来的.a静态依赖文件。

    tune_level用于配置优化级别,各优化级别说明如下。

    • level:0代表基础优化,即内存自动释放基础优化,后续的优化都会以此为基础。
    • level:1代表硬件加速优化,此时会读取硬件加速基础库接口配置,对接硬件加速基础库接口。
    • level:2代表内存申请释放加速优化。
    • level:4代表AI4C加速优化。

    上述优化中,除了基础优化默认与其他优化叠加,其余优化level数值相加即代表多个优化手段的叠加(每个level的数值必须为2的幂和)。

    regex_lib_type用于配置是否进行正则库优化。

    配置为1表示开启正则库优化,配置为0表示不开启正则库优化,该配置在tune_level=1的条件下生效。

    regex_lib_path用于配置正则库的链接路径。

    配置正则库优化的路径。

    compile_option用于用户自定义编译选项,用户需要保证编译选项的正确性。

  5. 使用翻译命令生成native源文件及二进制文件

    text
    bash /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh {jar包路径} flink

    执行完成后会在cpp目录下生成源文件,在output目录下生成so文件,在log目录下生成日志。

  6. 查询翻译信息

    • native_udf.py source_info $

    支持用户指定job_jar查看其native源码文件位置。

    • native_udf.py list $

    支持用户指定job_jar查看native成功能UDF标识及其二进制文件信息。

    二进制文件生成在UNT安装路径下的output目录,子目录hash值可以通过source_info获取。

    • native_udf.py depend_info $

    支持用户指定job_jar查看依赖库接口信息。

    当前不支持lambda表达式的扫描。

    • native_udf.py fail_info $

    支持用户指定job_jar查看其native失败原因:如依赖接口缺失信息。

    • native_udf.py tune_level $

    支持用户指定udf native优化级别。

自开发native规范

unt工具支持用户自行开发部分native代码,为了使自行开发的代码能够自动嵌入到翻译的代码中,请遵守以下规范。

  1. 内存语义规范

    自行开发的代码需要保证除输出对象外,其余对象均已释放。除此之外,需要在depend_interface.config依赖接口配置文件中配置其内存语义,如果return的对象为“新创建”的,请在配置文件中标识该方法签名为“1”,否则标识该方法签名为“0”。

    例子1:

    java
    int32_t String::length() const
    {
        return static_cast<int32_t>(inner.size());
    }

    该length方法返回基础类型,不涉及到新创建的对象,因此配置为"0"。

    text
    <java.lang.String: int length()>, 0

    例子2:

    java
    String *String::substring(const int32_t idx) const
    {
        std::string s = this->inner.substr(idx);
        return new String(std::move(s));
    }

    该substring方法返回String类型,且return的为新创建的对象,因此配置为"1"。

    text
    <java.lang.String: java.lang.String substring(int)>, 1
  2. 继承规范

    所有类最终都应该继承自Object。

    Object类描述如下:

    Object头文件:

    cpp
    class Object 
    {
    public:
        Object();
        
        Object(nlohmann::json jsonObj);
        
        virtual ~Object();
        
        virtual int hashCode();
        
        virtual bool equals(Object *obj);
        
        virtual std::string toString();
        
        virtual Object *clone();
        
        Object(const Object &obj);
        
        Object(Object &&obj);
        
        Object &operator=(const Object &obj);
        
        Object &operator=(Object &&obj);
        
        void putRefCount();
        
        void getRefCount();
        
        void setRefCount(uint32_t count);
        
        bool isCloned();
        
        uint32_t getRefCountNumber();
        
    public:
        std::recursive_mutex mutex;
        bool isClone = false;
        bool isPool = false;
        uint32_t refCount = 1;
    }

    Object cpp文件:

    cpp
    Object::Object() = default;
    
    Object::Object(nlohmann::json jsonObj)
    {
        return;
    }
    
    Object::~Object() = default;
    
    int Object::hashCode()
    {
        return 0;
    }
    
    bool Object::equals(Object *obj)
    {
        return false;
    }
    
    std::string Object::toString()
    {
        return std::string();
    }
    
    Object * Object::clone()
    {
        return nullptr;
    }
    
    Object::Object(const Object &obj)
    {
        this->refCount = obj.refCount;
        this->isClone = obj.isClone;
    }
    
    Object::Object(const Object &&obj)
    {
        this->refCount = obj.refCount;
        this->isClone = obj.isClone;
    }
    
    Object &Object::operator=(const Object &obj)
    {
        this->refCount = obj.refCount;
        this->isClone = obj.isClone;
    }
    
    Object &Object::operator=(Object &&obj)
    {
        this->refCount = obj.refCount;
        this->isClone = obj.isClone;
    }
    
    void Object::putRefCount()
    {
        if (__builtin_expect(--refCount != 0, true))
        {
            return;
        }
        delete this;
    }
    
    void Object::getRefCount()
    {
        ++refCount;
    }
    
    void Object::setRefCount(uint32_t count)
    {
        refCount = count;
    }
    
    bool Object::isCloned()
    {
        return isClone;
    }
    
    uint32_t Object::getRefCountNumber()
    {
        return refCount;
    }