个性化举荐系统源码之之收集用户行径数据
<div style="color: black; text-align: left; margin-bottom: 10px;"><img src="https://p3-sign.toutiaoimg.com/pgc-image/087e024ef0e24303bfc5663037d81c81~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1725853021&x-signature=VxM06cdwIB7MJItJUE0ZDiG7Xh0%3D" style="width: 50%; margin-bottom: 20px;"></div>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">在<span style="color: black;">举荐</span>系统中另一个必不可少的数据<span style="color: black;">便是</span>用户<span style="color: black;">行径</span>数据,<span style="color: black;">能够</span>说用户<span style="color: black;">行径</span>数据是<span style="color: black;">举荐</span>系统的基石,巧妇难为无米之炊,<span style="color: black;">因此</span>接下来,<span style="color: black;">咱们</span>就要将用户的<span style="color: black;">行径</span>数据同步到<span style="color: black;">举荐</span>系统数据库中。</span></p>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">在<span style="color: black;">文案</span><span style="color: black;">举荐</span>系统中,用户<span style="color: black;">行径</span><span style="color: black;">包含</span><span style="color: black;">揭发</span>、点击、停留、<span style="color: black;">保藏</span>、分享等,<span style="color: black;">因此</span><span style="color: black;">这儿</span><span style="color: black;">咱们</span>定义的用户<span style="color: black;">行径</span>数据的字段<span style="color: black;">包含</span>:<span style="color: black;">出现</span>时间(actionTime)、停留时间(readTime)、频道 ID(channelId)、事件名<span style="color: black;">叫作</span>(action)、用户 ID(userId)、<span style="color: black;">文案</span> ID(articleId)以及算法 ID(algorithmCombine),<span style="color: black;">这儿</span>采用 json 格式,如下所示</span></p><span style="color: black;"># <span style="color: black;">揭发</span>的参数</span>
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:35"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">""</span>,<span style="color: black;">"channelId"</span>:0,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"exposure"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">""</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
<span style="color: black;"># 对<span style="color: black;">文案</span>触发<span style="color: black;">行径</span>的参数</span>
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:36"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">""</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"click"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"18577"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:38"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">"1621"</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"read"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"18577"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:39"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">""</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"click"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"1"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"14299"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:39"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">""</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"click"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"14299"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:41"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">"914"</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"read"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"14299"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 18:15:47"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">"7256"</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"read"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"1"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"14299"</span>,<span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
<h1 style="color: black; text-align: left; margin-bottom: 10px;"><strong style="color: blue;"><span style="color: black;">用户离线<span style="color: black;">行径</span>数据</span></strong></h1>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;"><span style="color: black;">因为</span>用户<span style="color: black;">行径</span>数据规模庞大,<span style="color: black;">一般</span>是<span style="color: black;">每日</span>更新一次,以供离线计算<span style="color: black;">运用</span>。<span style="color: black;">首要</span>,在 Hive 中创建用户<span style="color: black;">行径</span>数据库 profile 及用户<span style="color: black;">行径</span>表 user_action,设置<span style="color: black;">根据</span>日期进行分区,并匹配 json 格式</span></p><span style="color: black;">-- 创建用户<span style="color: black;">行径</span>数据库</span>
<span style="color: black;">create</span> <span style="color: black;">database</span> <span style="color: black;">if</span> <span style="color: black;">not</span> <span style="color: black;">exists</span> profile <span style="color: black;">comment</span> <span style="color: black;">"use action"</span> location <span style="color: black;">/user/hive/warehouse/profile.db/</span>;
<span style="color: black;">-- 创建用户<span style="color: black;">行径</span>信息表</span>
<span style="color: black;">create</span> <span style="color: black;">table</span> user_action
(
actionTime <span style="color: black;">STRING</span> <span style="color: black;">comment</span> <span style="color: black;">"user actions time"</span>,
readTime <span style="color: black;">STRING</span> <span style="color: black;">comment</span> <span style="color: black;">"user reading time"</span>,
channelId <span style="color: black;">INT</span> <span style="color: black;">comment</span> <span style="color: black;">"article channel id"</span>,
param <span style="color: black;">map</span> <span style="color: black;">comment</span> <span style="color: black;">"action parameter"</span>
)
<span style="color: black;">COMMENT</span> <span style="color: black;">"user primitive action"</span>
PARTITIONED <span style="color: black;">BY</span> (dt <span style="color: black;">STRING</span>) <span style="color: black;"># <span style="color: black;">根据</span>日期分区</span>
<span style="color: black;">ROW</span> <span style="color: black;">FORMAT</span> SERDE <span style="color: black;">org.apache.hive.hcatalog.data.JsonSerDe</span> <span style="color: black;"># 匹配json格式</span>
LOCATION <span style="color: black;">/user/hive/warehouse/profile.db/user_action</span>;
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;"><span style="color: black;">一般</span>用户<span style="color: black;">行径</span>数据被<span style="color: black;">保留</span>在应用服务器的日志文件中,<span style="color: black;">咱们</span><span style="color: black;">能够</span>利用 Flume 监听应用服务器上的日志文件,将用户<span style="color: black;">行径</span>数据收集到 Hive 的 user_action 表对应的 HDFS 目录中,Flume 配置如下</span></p><span style="color: black;">a1.sources</span>= s1<span style="color: black;">a1.sinks</span> = k1
<span style="color: black;">a1.channels</span> = c1
<span style="color: black;">a1.sources.s1.channels</span>= c1
<span style="color: black;">a1.sources.s1.type</span> = exec
<span style="color: black;">a1.sources.s1.command</span> = tail -F /root/logs/userClick.log
<span style="color: black;">a1.sources.s1.interceptors</span>=i1 i2
<span style="color: black;">a1.sources.s1.interceptors.i1.type</span>=regex_filter
<span style="color: black;">a1.sources.s1.interceptors.i1.regex</span>=\\{.*\\}
<span style="color: black;">a1.sources.s1.interceptors.i2.type</span>=timestamp<span style="color: black;"># c1</span>
<span style="color: black;">a1.channels.c1.type</span>=memory
<span style="color: black;">a1.channels.c1.capacity</span>=<span style="color: black;">30000</span>
<span style="color: black;">a1.channels.c1.transactionCapacity</span>=<span style="color: black;">1000</span>
<span style="color: black;"># k1</span>
<span style="color: black;">a1.sinks.k1.type</span>=hdfs
<span style="color: black;">a1.sinks.k1.channel</span>=c1
<span style="color: black;">a1.sinks.k1.hdfs.path</span>=hdfs://<span style="color: black;">192.168</span>.<span style="color: black;">19.137</span>:<span style="color: black;">9000</span>/user/hive/warehouse/profile.db/user_action/%Y-%m-%d
<span style="color: black;">a1.sinks.k1.hdfs.useLocalTimeStamp</span> = <span style="color: black;">true</span>
<span style="color: black;">a1.sinks.k1.hdfs.fileType</span>=DataStream
<span style="color: black;">a1.sinks.k1.hdfs.writeFormat</span>=Text
<span style="color: black;">a1.sinks.k1.hdfs.rollInterval</span>=<span style="color: black;">0</span>
<span style="color: black;">a1.sinks.k1.hdfs.rollSize</span>=<span style="color: black;">10240</span>
<span style="color: black;">a1.sinks.k1.hdfs.rollCount</span>=<span style="color: black;">0</span>
<span style="color: black;">a1.sinks.k1.hdfs.idleTimeout</span>=<span style="color: black;">60</span>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">编写 Flume <span style="color: black;">起步</span>脚本 collect_click.sh</span></p><span style="color: black;">#!/usr/bin/env bash
</span>
<span style="color: black;">export</span> JAVA_HOME=/root/bigdata/jdk
<span style="color: black;">export</span> HADOOP_HOME=/root/bigdata/hadoop
<span style="color: black;">export</span> PATH=<span style="color: black;">$PATH</span>:<span style="color: black;">$JAVA_HOME</span>/bin:<span style="color: black;">$HADOOP_HOME</span>/bin
/root/bigdata/flume/bin/flume-ng agent -c /root/bigdata/flume/conf -f /root/bigdata/flume/conf/collect_click.conf -Dflume.root.logger=INFO,console -name a1<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">Flume 自动生成目录后,需要手动<span style="color: black;">相关</span> Hive 分区后<span style="color: black;">才可</span>加载到数据</span></p><span style="color: black;">alter</span> <span style="color: black;">table</span> user_action <span style="color: black;">add</span> <span style="color: black;">partition</span> (dt=<span style="color: black;">2019-11-11</span>) location <span style="color: black;">"/user/hive/warehouse/profile.db/user_action/2011-11-11/"</span>
<h1 style="color: black; text-align: left; margin-bottom: 10px;"><strong style="color: blue;"><span style="color: black;">用户实时<span style="color: black;">行径</span>数据</span></strong></h1>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">为了<span style="color: black;">加强</span><span style="color: black;">举荐</span>的实时性,<span style="color: black;">咱们</span><span style="color: black;">亦</span>需要收集用户的实时<span style="color: black;">行径</span>数据,以供在线计算<span style="color: black;">运用</span>。<span style="color: black;">这儿</span>利用 Flume 将日志收集到 Kafka,在线计算任务<span style="color: black;">能够</span>从 Kafka 读取用户实时<span style="color: black;">行径</span>数据。<span style="color: black;">首要</span>,开启 zookeeper,以<span style="color: black;">保护</span>进程运行</span></p>/root/bigdata/kafka/bin/zookeeper-server-start.sh -daemon /root/bigdata/kafka/<span style="color: black;">config</span>/zookeeper.properties
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">开启 Kafka</span></p><span style="color: black;">/root/bigdata/kafka/bin/kafka-server-start.sh</span> <span style="color: black;">/root/bigdata/kafka/config/server.properties</span>
<span style="color: black;"># 开启<span style="color: black;">信息</span>生产者</span>
<span style="color: black;">/root/bigdata/kafka/bin/kafka-console-producer.sh</span> <span style="color: black;">--broker-list</span> <span style="color: black;">192.168</span><span style="color: black;">.19</span><span style="color: black;">.19092</span> <span style="color: black;">--sync</span> <span style="color: black;">--topic</span> <span style="color: black;">click-trace</span>
<span style="color: black;"># 开启消费者</span>
<span style="color: black;">/root/bigdata/kafka/bin/kafka-console-consumer.sh</span> <span style="color: black;">--bootstrap-server</span> <span style="color: black;">192.168</span><span style="color: black;">.19</span><span style="color: black;">.137</span><span style="color: black;">:9092</span> <span style="color: black;">--topic</span> <span style="color: black;">click-trace</span>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">修改 Flume 的日志收集配置文件,添加 c2 和 k2 ,将日志数据收集到 Kafka</span></p>a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.s1.channels= c1 c2
a1.sources.s1.<span style="color: black;">type</span> = exec
a1.sources.s1.command = tail -F /root/logs/userClick.<span style="color: black;">log</span>
a1.sources.s1.interceptors=i1 i2
a1.sources.s1.interceptors.i1.<span style="color: black;">type</span>=regex_filter
a1.sources.s1.interceptors.i1.regex=\\{.*\\}
a1.sources.s1.interceptors.i2.<span style="color: black;">type</span>=timestamp
# c1
a1.channels.c1.<span style="color: black;">type</span>=memory
a1.channels.c1.capacity=<span style="color: black;">30000</span>
a1.channels.c1.transactionCapacity=<span style="color: black;">1000</span>
# c2
a1.channels.c2.<span style="color: black;">type</span>=memory
a1.channels.c2.capacity=<span style="color: black;">30000</span>a1.channels.c2.transactionCapacity=<span style="color: black;">1000</span>
# k1
a1.sinks.k1.<span style="color: black;">type</span>=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.<span style="color: black;">path</span>=hdfs://<span style="color: black;">192.168</span><span style="color: black;">.19</span><span style="color: black;">.137</span>:<span style="color: black;">9000</span>/user/hive/warehouse/profile.db/user_action/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp =<span style="color: black;">true</span>a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=<span style="color: black;">0</span>
a1.sinks.k1.hdfs.rollSize=<span style="color: black;">10240</span>
a1.sinks.k1.hdfs.rollCount=<span style="color: black;">0</span>
a1.sinks.k1.hdfs.idleTimeout=<span style="color: black;">60</span># k2
a1.sinks.k2.channel=c2
a1.sinks.k2.<span style="color: black;">type</span>=org.apache.flume.supervisorctl
<span style="color: black;">咱们</span><span style="color: black;">能够</span>利用supervisorctl来管理supervisor。sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=<span style="color: black;">192.168</span><span style="color: black;">.19</span><span style="color: black;">.137</span>:<span style="color: black;">9092</span>
a1.sinks.k2.kafka.topic=click-trace
a1.sinks.k2.kafka.batchSize=<span style="color: black;">20</span>
a1.sinks.k2.kafka.producer.requiredAcks=<span style="color: black;">1</span>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">编写 Kafka <span style="color: black;">起步</span>脚本 start_kafka.sh</span></p><span style="color: black;">#!/usr/bin/env bash</span>
<span style="color: black;"># <span style="color: black;">起步</span>zookeeper</span>/root/bigdata/kafka/bin/zookeeper-server-start.sh -daemon /root/bigdata/kafka/config/zookeeper.properties<span style="color: black;"># <span style="color: black;">起步</span>kafka</span>
/root/bigdata/kafka/bin/kafka-server-start.sh /root/bigdata/kafka/config/server.properties
<span style="color: black;"># <span style="color: black;">增多</span>topic</span>/root/bigdata/kafka/bin/kafka-topics.sh --zookeeper 192.168.19.137:2181 --create --replication-factor 1 --topic click-trace --partitions 1<h1 style="color: black; text-align: left; margin-bottom: 10px;"><strong style="color: blue;"><span style="color: black;">进程管理</span></strong></h1>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;"><span style="color: black;">咱们</span><span style="color: black;">这儿</span><span style="color: black;">运用</span> Supervisor 进行进程管理,当进程<span style="color: black;">反常</span>时<span style="color: black;">能够</span>自动重启,Flume 进程配置如下</span></p><span style="color: black;"></span>
<span style="color: black;">command</span>=/bin/bash /root/toutiao_project/scripts/collect_click.sh
<span style="color: black;">user</span>=root
<span style="color: black;">autorestart</span>=<span style="color: black;">true</span>
<span style="color: black;">redirect_stderr</span>=<span style="color: black;">true</span>
<span style="color: black;">stdout_logfile</span>=/root/logs/collect.log
<span style="color: black;">loglevel</span>=info
<span style="color: black;">stopsignal</span>=KILL
<span style="color: black;">stopasgroup</span>=<span style="color: black;">true</span>
<span style="color: black;">killasgroup</span>=<span style="color: black;">true</span>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">Kafka 进程配置如下</span></p><span style="color: black;"></span>
<span style="color: black;">command</span>=/bin/bash /root/toutiao_project/scripts/start_kafka.sh<span style="color: black;">user</span>=root
<span style="color: black;">autorestart</span>=<span style="color: black;">true</span>
<span style="color: black;">redirect_stderr</span>=<span style="color: black;">true</span>
<span style="color: black;">stdout_logfile</span>=/root/logs/kafka.log
<span style="color: black;">loglevel</span>=info
<span style="color: black;">stopsignal</span>=KILL
<span style="color: black;">stopasgroup</span>=<span style="color: black;">true</span>
<span style="color: black;">killasgroup</span>=<span style="color: black;">true</span>
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;"><span style="color: black;">起步</span> Supervisor</span></p><span style="color: black;">supervisord</span> -c /etc/supervisord.conf
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;"><span style="color: black;">起步</span> Kafka 消费者,并在应用服务器日志文件中写入日志数据,Kafka 消费者<span style="color: black;">就可</span>收集到实时<span style="color: black;">行径</span>数据</span></p><span style="color: black;"># <span style="color: black;">起步</span>Kafka消费者</span>/root/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.137:9092 --topic click-trace<span style="color: black;"># 写入日志数据</span>
<span style="color: black;">echo</span>{\"actionTime\":\"2019-04-10 21:04:39\",\"readTime\":\"\",\"channelId\":18,\"param\":{\"action\": \"click\", \"userId\": \"2\", \"articleId\": \"14299\", \"algorithmCombine\": \"C2\"}} >> userClick.log<span style="color: black;"># 消费者接收到日志数据</span>
{<span style="color: black;">"actionTime"</span>:<span style="color: black;">"2019-04-10 21:04:39"</span>,<span style="color: black;">"readTime"</span>:<span style="color: black;">""</span>,<span style="color: black;">"channelId"</span>:18,<span style="color: black;">"param"</span>:{<span style="color: black;">"action"</span>: <span style="color: black;">"click"</span>, <span style="color: black;">"userId"</span>: <span style="color: black;">"2"</span>, <span style="color: black;">"articleId"</span>: <span style="color: black;">"14299"</span>, <span style="color: black;">"algorithmCombine"</span>: <span style="color: black;">"C2"</span>}}
<p style="font-size: 16px; color: black; line-height: 40px; text-align: left; margin-bottom: 15px;"><span style="color: black;">Supervisor 常用命令如下</span></p>supervisorctl
<span style="color: black;">
></span><span style="color: black;"> status <span style="color: black;"># 查看程序状态</span></span>
<span style="color: black;">></span><span style="color: black;"> start apscheduler <span style="color: black;"># <span style="color: black;">起步</span>apscheduler单一程序</span></span>
<span style="color: black;">></span><span style="color: black;"> stop toutiao:* <span style="color: black;"># 关闭toutiao组程序</span></span>
<span style="color: black;">></span><span style="color: black;"> start toutiao:* <span style="color: black;"># <span style="color: black;">起步</span>toutiao组程序</span></span>
<span style="color: black;">></span><span style="color: black;"> restart toutiao:* <span style="color: black;"># 重启toutiao组程序</span></span>
<span style="color: black;">></span><span style="color: black;"> update <span style="color: black;"># 重启配置文件修改过的程序</span></span>
回顾历史,我们不难发现:无数先辈用鲜血和生命铺就了中华民族复兴的康庄大道。 在遇到你之前,我对人世间是否有真正的圣人是怀疑的。 顶楼主,说得太好了! i免费外链发布平台 http://www.fok120.com/
页:
[1]