本文档所描述的内容属于神策分析的高级使用功能,涉及较多技术细节,适用于对相关功能有经验的用户参考。

神策分析设计为开放的架构,让用户可以订阅实时数据来满足更多使用场景。服务端接到一条 SDK 发来的数据后,会对数据做一些预处理并将数据写入到消息队列 Kafka 供下游各类计算模块使用。本文将介绍订阅 Kafka 数据的方法。

订阅要求

订阅数据需要满足以下要求:

  • 仅私有部署版支持通过 Kafka 订阅数据;

  • 启动订阅的机器需与部署神策分析的机器在同一个内网,且必须可以解析神策分析服务器的 host;

  • 如果在自己的机器上订阅数据,需要先在自己机器上 hosts 中配上神策的服务器 hostname 和订阅 IP 的映射关系,订阅的时候填写所有机器的 hostname;

  • 建议使用与服务端保持一致的客户端版本,大部分客户的服务端版本大于等于 2.0.0,因此,建议使用的客户端版本也大于等于 2.0.0,具体情况可登录神策服务器查看或咨询神策值班同学。

获取 Kafka 参数

  • 登录任意的神策服务器
  • 切换至 sa_cluster 账户
su - sa_cluster
CODE
  • 使用以下命令获取地址
aradmin config get client -m kafka -p sp
BASH

      例如输出是:

{
    "broker_list": [
        "hostname1:9092",
        "hostname2:9092",
        "hostname3:9092"
    ],
    "channel_callback_partition_count": 3,
    "channel_callback_topic_name": "channel_topic",
    "extra_data_topic_name": "extra_data_topic",
    "item_partition_count": 3,
    "item_topic_name": "item_topic",
    "partitions_count": 10,
    "topic_name": "event_topic"
}
BASH

如果命令输出中包含 "profile_topic_name": "profile_topic" 字段,则代表神策环境安装了 SDF 模块,神策环境是否安装 SDF 模块也可以咨询神策值班同学获取相关信息,Kafka 相关的参数说明如下

参数名称参数值说明
topicitem_topic订阅神策 Items 表的数据
event_topic订阅神策 events 和 users 表的数据
partition单机 3 个/集群 10 个对应 partitions_count 里的值
brokerhostname1:9092,hostname2:9092,hostname3:9092对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔




参数名称参数值说明
topicitem_topic订阅神策 Items 表的数据
event_topic订阅神策 events 表的数据
profile_topic订阅神策 users 表的数据
partition单机 3 个/集群 10 个对应 partitions_count 里的值
brokerhostname1:9092,hostname2:9092,hostname3:9092对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔



订阅数据

订阅有多种方式,可以选择一种适合使用场景的方式。

下面给出两种启动订阅的示例

使用 Kafka Console Consumer

可以使用 Kafka 自带的 Kafka Console Consumer 通过命令行方式订阅,例如从最新数据开始订阅:

bin/kafka-console-consumer.sh --bootstrap-server <bootstrap-server>:9092 --topic event_topic
BASH

以上命令是需要您本地订阅的服务器安装 Kafka 客户端,然后进入 Kafka 路径下去执行

Java 代码订阅样例

请参考下面的 github 链接中的样例实现

Kafka 订阅样例 github 链接

数据格式

订阅的数据的格式与数据格式基本一致。

Java SDK 上报的数据示例

{
    "_track_id": -1302294273, 
    "lib": {
        "$lib": "Java", 
        "$lib_method": "code", 
        "$lib_version": "3.1.15", 
        "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91"
    }, 
    "distinct_id": "test0932", 
    "type": "track", 
    "event": "Order", 
    "properties": {
        "$lib": "Java", 
        "isLogin": false, 
        "order_id": "real_1272", 
        "$lib_version": "3.1.15"
    }
}


订阅出来的数据示例

{
    "_track_id": -1302294273, 
    "lib": {
        "$lib": "Java", 
        "$lib_method": "code", 
        "$lib_version": "3.1.15", 
        "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91"
    }, 
    "distinct_id": "test0932", 
    "type": "track", 
    "event": "Order", 
    "properties": {
        "$lib": "Java", 
        "isLogin": false, 
        "order_id": "real_1272", 
        "$lib_version": "3.1.15", 
        "$ip": "10.90.28.102", 
        "$is_login_id": false
    }, 
    "time": 1600400230612, 
    "project": "default", 
    "token": "super", 
    "extractor": {
        "f": null, 
        "o": 0, 
        "n": null, 
        "s": 25, 
        "c": 25, 
        "e": "debugboxcreate1038.sa-DebugService"
    }, 
    "recv_time": 1600400230612, 
    "ngx_ip": "10.90.28.102", 
    "process_time": 1600400230612, 
    "map_id": "zjj-0932", 
    "user_id": 81311457452485460, 
    "project_id": 24, 
    "ver": 2
}


神策 kafka 订阅出来的数据会加上部分神策内部字段,例如 extractor 相关的进度信息、project_id 项目 ID 等,您这边无需关注这些字段,按照您这边需要的字段去解析获取即可,一些可能需要的内部字段说明如下

参数名称说明
_track_id前端 SDK track 的时候上报的随机值,用于去重判断,并不会写入 events 表
project项目英文名,通过该字段判断神策对应的项目
token数据导入 token,参考:数据导入常见问题 中第 7 条说明
recv_time等于 events 表里的 $receive_time 的值,数据接收的时间
user_id等于 events 表里的 users_id 和 users 表里的 id,参考:标识用户

distinct_id、type、event、properties、time 字段的说明可以参考:数据格式

常见问题

从 Kafka 消费数据能否订阅历史数据

可以,神策的 kafka 单机默认保留一年的数据,集群默认保留一周的数据,如果是使用 Kafka 自带的 Kafka Console Consumer 通过命令行方式订阅,可以加上 --from-beginning 订阅历史数据

bin/kafka-console-consumer.sh --bootstrap-server hostname:9092 --topic event_topic --from-beginning
BASH

Kafka 订阅不到数据可能原因

  • 网络问题:神策默认通过 hostname 来订阅 Kafka 数据,需要在订阅 Kafka 的服务器的 hosts 文件了配上神策服务器的 hostname 和 IP 的映射关系,确认下通过 telnet hostname 9092 是否能正常访问神策服务器
  • Kafka 版本兼容问题:高版本服务端兼容低版本客户端,因此订阅的客户端版本建议不大于神策服务端的 Kafka 版
  • 如果是测试用的 debug 不入库的模式上报数据,是不会在 Kafka event_topic 下面订阅到数据的

能指定项目订阅 Kafka 数据吗

不能,通过对应 topic 订阅出来的数据是全部项目的,需要通过 JSON 数据里的 project 参数去区分不同的项目

能按组订阅 Kafka 数据吗

可以,通过代码或者工具订阅 Kafka 数据的时候可以自定义 group.id 参数,用来区分不同的消费组,具体定义可以参考 Kafka 官网

通过 profile_topic 订阅出事件数据正常吗

正常的,profile_topic 是订阅所有 users 表的数据,如果事件数据触发用户关联,去写 users 的 first_id,second_id ,是会在 profile_topic 里被订阅到,用户关联逻辑参考:标识用户