Categories
程式開發

用户增长实战


一.介绍

早期很多公司,基于经验和实践,靠本能的无意识做出决策,从心里学上讲,我们每天无意识暗示自己做很多事情,并没有大脑深层次神经元进行思考,这就跟动物的习性相类似,这种情况,其实是不怎么严密的数学分析,当然不要过于以数据为导向,滥用数据容易导致局部最优化,数据是检验假设的工具,再结合人类的反思,不断闭环,可以得出当时那场景的最好方案。

这就是用户增长平台的初衷,制定完善的数据评估和优化体系,确保用户增长规模和用户增长成本达到一个良性的水平。

这里面涉及六大商业模式,公司规模,市场等等原因,很复杂,可以生成不同种类框架,对产品生命周期,有不同的具现化,我以当初医疗管理考培产品为例,串联一条线路分析,围绕数据驱动涉及四个环节——采集、建模、分析、指标讲述如何将数据驱动方案落地。

二.背景

我这篇文章结合做过的一个实例,从一个侧面了解下我当时的所思所想,公司是做互联网医疗的,产品方向做关于护理人员的考试、问卷、活动、培训、评价方面,产品端有Web,App,小程序,由于时间久了,有些数据不便公开,请朋友们谅解。

三.商业模式的区别

3.1 背景

具我了解这个领域当初在国内2015兴起,无论是用第三方平台友盟,还是自研平台,找对方法,都会对整个团队有所增益,这个领域不断在演化,下面介绍比较有名几个概念。

3.2 海盗指标

“海盗指标”这个术语由风险投资人戴夫·麦克卢尔创造,得名于五个成功创业关键元素的首字母缩写。将公司最需要关注的指标分为五大类:获取用户(Acquisition)、提高活跃度(Activation)、提高留存率(Retention)、获取营收(Revenue)和自传播(Referral),简称AARRR。

用户增长实战 1

这五个指标并不一定遵循严格的先后顺序,这是因为跟当时产品,公司发展,市场情况决定,以我的经验,最初是关注第一关键指标,然后围绕这个指标建模分析。不用关注太多,关注太多,生成决策混乱,所有这个找对方向效果非常明显。

3.3 增长引擎

《精益数据分析》中提到 3 种增长引擎:黏着式增长引擎、病毒式增长引擎、付费式增长引擎。埃里克 · 莱斯提出了驱动创业增长的三大引擎,它们都有各自对应的关键绩效指标(KPI)。

用户增长实战 2

黏着式增长引擎

黏着式增长引擎的重点是让用户成为回头客,并且持续使用你的产品,它和海盗指标中的提高留存率阶段这个概念类似。如果你的用户粘性不大,流失率会很高,用户留存不理想。

衡量粘性最重要的KPI就是客户留存率,除此之外,流失率和使用频率也是非常重要的指标。长期黏性往往来自用户在使用产品过程中为自身所创造的价值。比如百度网盘里面的内容,存储着用户所有资料,同样游戏账号也是如此。

黏着性也不能全看留存率,它还和频率有关,以我当初的场景为例,产品面向用户是全国的护士,这些用户不同地区,不同医院,跟政策有关,才会用产品某些功能的(考试,测试,活动,试卷,题库等等),它使用频率不相同,当初建立几个维度,地区,医院,时间,来分析留存率。

病毒式增长引擎

所谓病毒式传播归根结底就是一件事情:让声名传播出去。病毒式传播之所以吸引人,在于它的指数性本质:如果每个用户能带来1.5个新用户,那么用户数将会无限制地增长直到饱和。当然事情肯定不能这么简单。用户流失、竞品和其他因素决定了它不可能真的无限制增长。

仅考虑病毒式传播系数还不够,你还需要衡量哪些用户行为形成了一个病毒传播周期(循环)。这就涉及到用户分级别,以我当初见到场景为例,跟你的设定划分用户,比如老用户,新用户,试用期用户,医院管理员,科室管理,根据时间维度,付费情况划分,得出增长率。

付费式增长引擎

重点是让已有用户进行付费,这一阶段的前置条件是前两者增长引擎已经得到了一些良好的成效,有了一些忠诚的用户,也有了一定的口碑,那么是时候赚点钱了。但赚钱本身并不能保证用户的持续增长,这里需要进行再投资,并使投资获得更多的回报。衡量增长的关键因素在于客户终生价值和用户获取成本,以及回收成本所需要的时间。

例如,当初医疗产品,一个用户年费180,根据用户衍生出路径分析,通过什么渠道获取(地推,管理员绑定),人力成本,这里面涉及边际成本,这样划分出一条付费路径。最好在一段时间内只关注某一个引擎。

3.4 第一关键指标

第一关键指标(OMTM),这个非常重要,这个取决假设,不同的阶段,这个关键指标不一样。

解释现阶段最重要的问题。促使得出初始基线并建立清晰的目标。抽象出整个公司层面的健康。不断假设,测量,认知循环。

3.5 商业模式

《精益数据分析》中提到六种商业模式。

电子商务

电子商务公司的主营业务就是让访客在其网店上买东西。这大概是一种最常见的在线生意模式了,并且绝对是大多传统统计工具的主要分析对象。

早期的电子商务模式由一个相对简单的“漏斗”构成:访客在网站浏览了一系列网页后,驻足于某件商品并点击“购买”按钮,然后提供相应的支付信息,并完成了此次交易。这就是经典的“转化漏斗”。

电子商务流程

用户增长实战 3

尽管转化率、重复购买率以及购物车大小十分重要,但真正起作用的指标是它们三者的积,即平均每位客户营收。双边市场和电子商务很相像,二者均关注在买家与卖家间促成交易,以及客户的忠诚度。

SaaS

SaaS指按需提供软件的公司,通常以网站的形式出现。比如一些云平台,软件供应商等等。

大部分的SaaS提供商以月费或年费的形式获取收益。一些提供商会按实际硬件消耗,即存储空间的使用量、占用的带宽或计算能力收费,然而这种模式目前在很大程度上受到基础设施即服务(Infrastructure as a Service, IaaS)和平台即服务(Platform as a Service, PaaS)云计算公司的限制。

很多SaaS提供商选择将其服务分层出售,月费也随所提供功能的不同而不同。这种差异可以是项目管理工具中项目数量的上限,也可以是客户关系管理应用中客户数量的上限。找寻层级和价格的最佳组合一直以来都是一大挑战,SaaS公司投入相当大的精力以寻找向已有客户追加销售的方法,从而升级至更高且获利更多的层级。

对于SaaS公司而言,增加一个客户的边际成本几乎可以忽略不计。因此很多SaaS提供商都选择通过免费增值模式来获取客户。提供商一方面允许客户在一开始免费使用一个受限的版本,另一方面又期望他们能够达到免费容量上限并开始支付费用。

SaaS流程

用户增长实战 4

流失率高于一切 ,如果忠实用户的形成速度要高过用户流失速度,你就可以生存下来,需要用户转换成付费客户以前便衡量其参与度,并赶在客户流失以前对其活动进行分析,采取先见性措施。SaaS公司和移动应用公司具有很多相同点,两种商业模式均非常关注客户的流失以及再发性营收,并努力提高用户参与度以使其付费。

移动应用

大部分营收来自于一小部分用户,应该将该部分用户单独划归一组进行分析处理。虽然关键指标是平均每位用户营收,但最好同时跟踪平均每位付费用户营收,因为“鲸鱼”玩家和其他玩家行为想法相差甚远。

移动公司与SaaS公司十分相似,二者均希望提高用户参与度,不断从用户身上牟利并减少流失率。

媒体网站

媒体网站需要库存(访客的浏览)以及合意性,而这均源于可吸引广告商期待群体访问来访的内容。很难在优质的网站内容和足量的付费广告间取得平衡。广告收入意味着一切。但广告具有多种形式,其中包括赞助商展示广告、按印象收费广告、按点击收费广告以及提成广告,从而使得营收统计变得十分复杂。

媒体网站流程

用户增长实战 5

用户生成内容

许多用户选择潜水,一些会贡献些许内容,其余用户则会专注于内容生成,这种80/20的定律存在于你希望用户完成的所有活动当中。提高用户回访率并保持较高的参与度,需要各种策略,把用户贡献看的比什么都重要,但大多时候却需要利用广告来维持收支平衡。

生成内容流程

用户增长实战 6

双边市场

双边市场是传统电商的一个变种,公司通过帮助买家和卖家在网上达成交易来盈利(C2C)两种其他模型结合:电子商务和用户生成内容。很多双边市场均采用佣金制来获取利益,但此外还可采取其他手段来盈利,如帮助卖家推广商品或收取一次性的商品上架费等等。

双边市场流程

用户增长实战 7

3.6 小结

介绍了一些常用的指标,分析方法,你可以分析下当下公司的商业模式,第一关键指标,怎么样演进?

当初医疗考试培训产品,在最初时的状态,很多事情没有明确,数据基础架构没有构建,商业模式Saas+移动应用,管理平台中角色:医院管理员,科室管理员。功能:设置活动,问卷,试卷,题库,考试,测试,课件,课程,评价体系。

最初时第一关键指标是用户留存,那是产品已经生产三年了,很多功能迭代出来了,当时的最核心的功能考试方面,当时题库有五百多万道,当时产品设计功能导向,很不成熟,功能选择性太大,反馈性不好,比如导题,有如下几种,Excel,Word,页面上传等方式,选择性太多,而且有的功能复杂性高,效率不高,就算是设计好模板,成功率也会总出现问题,直接影响用户使用。围绕第一关键指标,改善核心功能。

四.用户演进的阶段

分析阶段

移情:你需要深入目标市场,着手解决人们关心的问题,从而促使消费者愿意为你的商品买单。可以用漏斗模型,看模式转化率,例如管理员进入,题库-》试卷-》考试转化率,调查问卷等等黏性:你需要了解自己能否找到已发现问题的解决方案。如果产品糟糕到访客一看到就会厌恶地离开的程度,那么再大力的推广也是毫无意义的。病毒性:在保证产品或服务的黏性后,即可开始口碑营销。营收:该阶段应着手盈利事宜,但并不意味着此前不存在收费行为。规模化:盈利后,公司即可从自身发展模式切换至市场扩张模式。多级分销,地推。

五.底线

你要知道目标在哪里,哪些指标最重要,比如以下几点

增长率留存率获客成本病毒性可用性和可靠性

5.1 增长率

各个公司定的增长率标准不一样,比如些基准指标,每周能做到5%~7%算是好的增长率,如果1%的话,基本可以算没有想好想要干什么,当然如果公司在营收,那么公司增长率就是按照营收来计算的,如果没有收取费用,那么其增长率则根据活跃用户进行计算。

最开始肯定关注增长率,但是过早的扩张有些问题,容易激化产品质量,资金和用户留存方面的问题,把握公司产品与市场契合点,所以定制合理增长率标准。

5.2 留存率

指标细化,把按照时间维度的用户,不断缩减到一定指标,比如30%的用户每月用下应用,10%的用户每天都在用,这样目标让更多人每天用,并将这些指标与你的商业模式的预测进行对比,形成一个基准。当然可以引入价格方面形成线性关系。

注意

这里还可以细化出来,观察不同页面的差异,不要差异太大,这样可能出现很多问题,功能问题,用户成分单一等等。

5.3 获客成本

尽管无法告诉你获取一位新用户的成本是多少,但可以用客户终身价值的一个比例来定义它。客户终身价值是一位客户与你存在合作关系的这段时间内,为你带来的总收入。你计算出的客户终身价值可能是错的。任何商业模式都有不确定因素。在客户生命周期中,你能从他身上获得多少收入实际上是你猜出来的。如果你刚刚起步,那你可能花了过多的钱来获取这位客户,过了很长时间才能发现是否低估了流失率或高估了客户收益。“按照我的经验,流失率对于客户终身价值的影响最大,可惜,流失率是一个滞后的指标。

客户生命周期

最简单的公式是:

(客户收入*客户生命周期)— 获客和维护成本

HubSpot 用的计算公式是:

平均购买价值*平均购买频率*平均客户寿命

David Skok 的算法是:

(每个用户的平均 MRR *毛利率)/收入流失率

客户生命周期价值是可以衡量的最重要的 SaaS 指标之一,甚至是最重要的。这是因为,它不仅可以使你了解客户现在的价值,还能预测客户将来的业务价值,从而帮你制定长远的客户战略。

注意

当你获取一位客户时,你的花费不要超过能从他(以及受他邀请加入的客户)身上获得的收入的1/3,除非你有充分的理由这样做。

5.4 病毒性

病毒性其实有两个指标:每个现有用户成功邀请了多少新用户(即病毒式传播系数)和他花多长时间才会邀请用户(即病毒传播周期)。病毒性没有所谓“正常”的标准。两个指标都依赖于产品的性质,以及市场饱和度。

病毒性没有“典型值”。如果病毒式传播系数低于1,它仍然会帮你减小你的客户获取成本;如果它高于1,你会不断增长。如果你的病毒式传播系数超过0.75就是一个好现象。要试着在产品中加入内在病毒性,跟踪这个指标并与你的商业模式进行对比。将人工病毒式传播与客户获取同等看待,用它带来的客户贡献的价值来对其进行划分。

5.5 可用性和可靠性

互联网时代赋予服务端一大特征跟桌面端不同的是,可用性,可靠性,获得99.95%以上的可用性也是代价高昂的,这意味着每年你只能宕机4.4小时。如果你的用户非常忠诚也非常活跃,他们就可以忍受短时间的宕机。如果你能在社交网络上公开宕机信息,让用户知情,那就更好了。

5.6 小结

你应该已经有一些正在追踪(或想要追踪)的关键指标了。你的比较结果如何?哪个指标最差?

我当初首先的版本赋予关于留存指标,因为留存的差异性太大,本身产品,给管理员培训的时间特别长(1个月左右),然后有的功能性问题太大。

六.基础平台建设

基本一些概念介绍完毕,在如今的市场环境中,品牌营销的转化效率逐渐降低。传统营销方式的失效,让企业需要从根本上进行调整革新。你要了解客户的真实需求,知道他们在想什么,想要什么,下面我把当时的一些思考罗列下来。

6.1 荒芜期

刚开始接触做增长平台,在自研平台还是第三方面平台犹豫过,后来拍板定下自研系统,因为当时产品不是很成熟,刚起步,谈不到盈利,很多人才方面很欠缺,思维方面转变不过来,很多需求呢拍脑袋定的,造就很多纠纷,因为无法量化出来的东西,就投入生产,那就相当于所有用户再做回归测试,很是要命。

最初环境方面大数据集群,熟悉的技术工具,处理一些计算游刃有余,计算引擎Spark,语言Java/Scala。

6.2 开辟期

环境基本建设完成,当时先跟一些人员沟通好,因为平台得埋点,会设计到一些应用端开发的人员调配,埋点方式分为客户端埋点和服务端埋点两种。客户端埋点适用于用户界面行为的上报,服务端埋点适用于业务操作的上报。

用户增长实战 8

维度:日期,科室,医院,角色。一些基本指标,增长率,活跃度,环比,同比等等。当时这些基本指标出来后,不怎么清楚定位如何,但是就可以形成年报,月报,当时的商业模式,注定不是互联网那么快节奏,跟市场结合,半年出份报告就可以,分析出很多东西。

6.3 探索期

在开辟期做的一些基础性指标,现在看来,有几点帮助,做些新需求,模块一些使用情况。这个阶段已经明白第一关键指标关注留存,因为当时是虚假繁荣,每天很忙,计划不断,但是反馈信息不是很好,访问量会按阶段持续增长,但是留存量的增长效率不好,还有功能分布情况不理想。然后以如下指标为例,留存率,路径分析,漏斗转化率,热点图,故障分析。路径分析,用的是唯一值,串联起来用户一次访问的各个功能使用情况。

6.4 增长期

到了这个阶段很多成果出来了,功能规划,市场方面,用户反馈层面,一些积攒的疑难杂症也解决了。然后理解加深,提出用户分层,新用户=新普通用户+新管理员,新用户的定义是在这个时间阶段内第一次使用模块功能,关注管理员的行为,期望新用户在周期内增长。

根据这个情况提供闭环模型,改变模块,提升新用户增长。根据这个流程,实现整理出概况数据指标,有活跃医院,各个模块的新老用户,对应资源库,新老医院等等。目前来看优点:1.对于模块用户有了更好的认知,以便后续加深挖掘。2.产品行为更好的量化起来。3.更好的重构模块。

最后我的目光不断从底层,慢慢前移面向市场,听听市场讲课。然后研究出新的方案,公司产品刚开始分试用期,试用期管理员分层,其实普通用户对于pc端影响不大。关注管理员带动能更好的增长模式使用情况。现在由于app端,pc端的使用情况,其实在二三线城市不断在下移,它的多样性更加丰富,总结特点,发现相似性,提出更好决策,帮助市场,反哺产品。

七.架构体系

7.1 架构

用户增长实战 9

这个是用户增长架构图,包含以下几个特点:

数据源中日志,服务输出日志,Nginx等等,通过ELK传输出来,一部分从库处理业务数据副本。数据仓库,用的星型模式,维度方面,科室,医院,角色,日期。存储在Hive。计算引擎Spark任务调度Azkaban实时数据Kafka,Elasticsearch,Hbase。Lambda架构,这个在以前的Beam文章有介绍,不再赘述。

 

7.2 数据交换

数据交换的作用处理异库数据导入导出操作,当然这也是数据中台那几个常见的中间件,当时用的Spark做的一套,也可以用别的,比如以前讲的Beam(https://gitbook.cn/new/gitchat/activity/5dad728e7c3fea79dbc619a4“),都可以看具体应用情形。配置文件略,演示几种处理流程。

数据样例类

object Data {
abstract class Data
abstract class tables
/**
* hive
* @param warehouseLocation 数据存储位置
* @param dbName 数据库名称
* @param tableName 表名称
* @param year 年
* @param month 月
* @param day 日
*/
case class Hive1(warehouseLocation:String,dbName:String,tableName:String,year:String,month:String,day:String)

/**
* hive
* @param warehouseLocation 数据位置
*/
case class Hive2(warehouseLocation:String,dbName:String,tableName:String) extends Data

/**
* hive
* @param warehouseLocation 数据位置
*/
case class Hive3(warehouseLocation:String,sparkSession:SparkSession) extends Data
/**
* mysql
* @param url 路径带用户密码库名称
* @param sql sql
*/
case class MySql1(driver:String,url:String,tableName:String,sql:String) extends Data

case class MySqlByTable(tableName:String,columns: String,values:Array[Row]) extends tables

/**
* mysql
* @param url url
* @param tableName 表名
*/
case class MySql2(driver:String,url:String,tableName:String) extends Data

def callCase(f:Data): Any =f match {
case Hive3(a,b)=>
//SparkSession
val appName:String="export_"+TimeUtils.getCertainDayTime(0)
val sparkSession:SparkSession = SparkUtil.hiveSpark(appName,a)
Hive3(a,sparkSession)

case MySql1(a,b,c,d)=>SparkUtil.relationDatabase(a,b)
case MySql2(a,b,c)=>SparkUtil.relationDatabase(a,b

这个的作用有以下几点:

操作路由(导入/导出)配置信息

工具类

object HiveUtil {
/**
* 禁止严格模式 查询
* @param sqlContext
* @param sqlModel sql
*/
def queryByHive(sqlContext:SQLContext,sqlModel:SqlModel2): DataFrame ={
sqlContext.sql(s"USE ${sqlModel.db}")
sqlContext.sql("SET hive.mapred.mode=nonstrict")
sqlContext.sql(s"select * from ${sqlModel.tableName}").drop("year","month","day")
}

/**
* 查询分区表
* @param sqlContext
* @param sqlModel sql
*/
def queryByHiveToPartition(sqlContext:SQLContext,sqlModel:SqlModel1): DataFrame ={
sqlContext.sql(s"USE ${sqlModel.db}")
sqlContext.sql("SET hive.exec.dynamic.partition=true")
sqlContext.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
val sqlStringBuilder:StringBuilder=new StringBuilder
sqlStringBuilder.append(s"select * from ${sqlModel.tableName} where year=${sqlModel.year} and month=${sqlModel.month} ")

if(sqlModel.day.nonEmpty){
sqlStringBuilder.append(s" and day=${sqlModel.day}")
sqlContext.sql(sqlStringBuilder.toString()).drop("year","month","day")
}else{
sqlContext.sql(sqlStringBuilder.toString()).drop("year","month")
}
}

/**
* 插入hive,分区
* @param sql
* @param sqlModel
* @param view
*/
def addHive(sql:SQLContext,sqlModel:SqlModel1,view:String): Unit ={
val year=sqlModel.year
val month=sqlModel.month
val day=sqlModel.day

sql.sql(s"USE ${sqlModel.db}")
sql.sql("SET hive.exec.dynamic.partition=true")
sql.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
sql.sql(s"INSERT OVERWRITE TABLE ${sqlModel.tableName} PARTITION (year ='$year',month ='$month',day ='$day') SELECT * FROM $view")
}

/**
* 全量插入hive
* @param sql
* @param sqlModel
* @param view
*/
def addHive(sql:SQLContext,sqlModel:SqlModel2,view:String): Unit ={

sql.sql(s"USE ${sqlModel.db}")
sql.sql("SET hive.exec.dynamic.partition=true")
sql.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
sql.sql(s"INSERT OVERWRITE TABLE ${sqlModel.tableName} SELECT * FROM $view")
}

/**
* 查看表详情
*/
def tableDetails(sql:SQLContext,sqlModel:SqlModel2): Array[Row] ={
sql.sql(s"USE ${sqlModel.db}")
sql.sql("SET hive.exec.dynamic.partition=true")
sql.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
sql.sql(s"desc ${sqlModel.tableName}").collect()
}

/**
* 转换数据
* @param desc hive表详情
* @param dataList mysql数据map集合
* @param sql
* @param view
*/
def changeData(desc:Array[Row], dataList:List[Map[String, Any]], sql:SQLContext, view:String): Unit ={

val fields=desc.filter(f=>FilterUtils.isValidatePartitionLine(f.getString(0)))
val fields2=fields.map(x=>{ DataTypes.createStructField(x.getString(0), DataTypes.StringType, false)})
val newData = new util.ArrayList[Row]

//安照hive数据顺序
dataList.foreach(f1 => {
val value=fields.map(f2 => {
var rows:String="null"
val columnName = f2.getString(0)
if (f1.get(columnName).mkString.isEmpty) {
rows ="null"
} else {
rows =f1.get(columnName).mkString
}
rows
})
newData.add(Row.fromSeq(value))
})

sql.createDataFrame(newData,DataTypes.createStructType(fields2)).createOrReplaceTempView(view)

}

}
case class SqlModel1(db:String,tableName:String,year:String,month:String,day:String)
case class SqlModel2(db:String,tabl

 

MySQL导入到Hive

object ToHive {
val logger: Logger = LoggerFactory.getLogger(ToHive.getClass)

/**
* 全量导入hive
* @param hive
* @param mySql
*/
def add(hive:Hive2,mySql:MySql1): Unit ={
Data.callCase(mySql)
println("全量:hive"+hive+" mySql:"+mySql)
val hiveSpark:SparkSession=Data.callCase(Hive3(hive.warehouseLocation,null)).asInstanceOf[Hive3].sparkSession
val hiveSql=hiveSpark.sqlContext
import hiveSql.implicits._
import hiveSpark.implicits._

//mysql视图
val datas=MySqlUtils.queryAll(mySql.sql)
val desc=HiveUtil.tableDetails(hiveSql,SqlModel2(hive.dbName,hive.tableName))

//清洗数据
HiveUtil.changeData(desc,datas,hiveSql,"model")
HiveUtil.addHive(hiveSql,SqlModel2(hive.dbName,hive.tableName),"model")

ConnectionPool.closeAll
hiveSpark.stop()
}

/**
*
* hive分区批量增加
* @param hive
* @param mySql
*/
def add(hive:Hive1,mySql:MySql1): Unit ={
Data.callCase(mySql)
println("分区增量:hive"+hive+" mySql:"+mySql)
val hiveSpark:SparkSession=Data.callCase(Hive3(hive.warehouseLocation,null)).asInstanceOf[Hive3].sparkSession
val hiveSql=hiveSpark.sqlContext
import hiveSpark.implicits._
import hiveSql.implicits._

//mysql视图
val datas=MySqlUtils.queryAll(mySql.sql)
val desc=HiveUtil.tableDetails(hiveSql,SqlModel2(hive.dbName,hive.tableName))

HiveUtil.changeData(desc,datas,hiveSql,"model")
HiveUtil.addHive(hiveSql,SqlModel1(hive.dbName,hive.tableName,hive.year,hive.month,hive.day),"model")

ConnectionPool.closeAll
hiveSpark.st

Hive导出

object ToRelation {
val logger: Logger = LoggerFactory.getLogger(ToRelation.getClass)

/**
* 增量
* @param hive
* @param mySql
*/
def incrementByTime(hive:Hive1,mySql:MySql2): Unit ={
Data.callCase(mySql)
println("incrementByTime:hive"+hive+" mySql:"+mySql)
val hiveSpark:SparkSession=Data.callCase(Hive3(hive.warehouseLocation,null)).asInstanceOf[Hive3].sparkSession
val hiveSql=hiveSpark.sqlContext

val hiveDbName=hive.dbName
val hiveTableName=hive.tableName
val mySqlTableName=mySql.tableName

val hivedF=HiveUtil.queryByHiveToPartition(hiveSql,SqlModel1(hiveDbName,hiveTableName,hive.year,hive.month,hive.day))
val columns=hivedF.columns.filter(f=>FilterUtils.isValidatePartitionLine(f.toString)).mkString(",")
MySqlUtils.batchSave(MySqlByTable(mySqlTableName,columns,hivedF.collect()))

logger.info("incrementByTime:hive"+hive+" mySql:"+mySql)
ConnectionPool.closeAll
hiveSpark.stop()
}

/**
* 批量增加所有数据
* @param hive
* @param mySql
*/
def add(hive:Hive2,mySql:MySql2): Unit ={
Data.callCase(mySql)
println("add:hive"+hive+" mySql:"+mySql)
val hiveSpark:SparkSession=Data.callCase(Hive3(hive.warehouseLocation,null)).asInstanceOf[Hive3].sparkSession
val hiveSql=hiveSpark.sqlContext

val hiveDbName=hive.dbName
val hiveTableName=hive.tableName
val mySqlTableName=mySql.tableName

val hivedF=HiveUtil.queryByHive(hiveSql,SqlModel2(hiveDbName,hiveTableName))
val columns=hivedF.columns.filter(f=>FilterUtils.isValidatePartitionLine(f.toString)).mkString(",")
MySqlUtils.batchSave(MySqlByTable(mySqlTableName,columns,hivedF.collect()))

logger.info("add:hive"+hive+" mySql:"+mySql)
ConnectionPool.closeAll
hiveSpark.stop()

 

这几常用的方法,可以类似做个模式匹配,把各种数据交换需求中转一下。

7.3 数据仓库

数据仓库,英文名称为Data Warehouse,可简写为DW或DWH。数据仓库,是为企业所有级别的决策制定过程,提供所有类型数据支持的战略集合。它出于分析性报告和决策支持目的而创建。 为需要业务智能的企业,提供指导业务流程改进、监视时间、成本、质量以及控制。

用户增长实战 10

模型架构

这个模型架构当时按照这种方式处理,下面介绍一部分表设计,仅供参考。

用户增长实战 11

一部分Hive设计

类似数据仓库表设计,最重要不是两头,ods和dw两类表,而是中间的运转,有的借助kafka,有的设计一些,集合表,分布式计算分解时容易计算,比如把用户Id,请求Id,聚合出来,两种集合,加上一些方法名,类名,把结果聚合出来,这样后续不论是一些指标需求,还是计算路径都可以用到。

 

7.4 任务处理流程

用户增长实战 12

处理流程

这个流程做好自动化,因为这种任务很多。收集日志,为后续异常处理。

7.5 指标

这个计算得出指标,具体搞清楚以下几个问题:

监控对象监控那些指标哪些维度监控

7.6 小结

上述这些是我当初一些架构流程,你可以思考下,你们打算做类似的平台,流程怎么运转,监控哪些指标,哪些维度?

再抽象演进下,如果要监控微服务,要考虑指标(7.5)的几个问题,如何设计?

八.基础指标

这个阶段处理一些,基础指标,比如,增长率,流失率,留存率,试卷排名,其实如果对数据敏感度高的人看基础指标,可以分析出很多问题。

处理请求日志

/**
* 127.0.0.1 - - [06/Jun/2018:00:00:00 +0800] "GET /icu-system/mobile/courseware/query/onlinecourseware/list?page=3&pagesize=10&session_id=f600b1750c2c1ffde0ae23728d7eff6d7b40f8385c289353d4d5027e6a36c48c HTTP/1.0" 200 1495
*
* @author limeng
*
**/
object AccessLog {
val logger = LoggerFactory.getLogger(classOf[AccessLog])
//日志正则
val PARTTERN: Regex =
"""^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (d+)""".r

/**
* 验证数据是否符合格式
*/
def isValidateLogLine(line: String): Boolean = {
val options = PARTTERN.findFirstMatchIn(line)

if(options.isEmpty){
false
}else{
true
}
}

/**
* 解析输入日志数据
*/
def parseLogLine(line:String):AccessLog={
if(!isValidateLogLine(line)){
logger.error("参数格式异常")
throw new IllegalArgumentException("参数格式异常")
}

// 从line中获取匹配的数据
val options = PARTTERN.findFirstMatchIn(line)

val matcher=options.get

// 构建返回值
AccessLog(
matcher.group(1), // 获取匹配字符串中第一个小括号中的值
matcher.group(2),
matcher.group(3),
matcher.group(4),
matcher.group(5),
matcher.group(6),
matcher.group(7),
matcher.group(8),
matcher.group(9)
)
}

}

case class AccessLog(ipAddress: String, // IP地址
clientId: String, // 客户端唯一标识符
userId: String, // 用户唯一标识符
serverTime: String, // 服务器时间
method: String, // 请求类型/方式
endpoint: String, // 请求的资源
protocol: String, // 请求的协议名称
responseCode: String, // 请求返回值:比如:200、401
contentSize: String // 返回的结果数据大小
)

object AccessCutting {
def cuttingFile(accessFile:String): Unit ={
val hiveSpark=SparkUtil.hiveSpark("AccessCutting")
val sc=hiveSpark.sparkContext

val path=accessFile

val fileRdd=sc.textFile(path)
val fileName=path.split("/").last.filter(f=>StringUtils.isSymbol(f.toString))

//过滤数据,对数据转换操作
val accessLog=fileRdd.filter(line=>AccessLog.isValidateLogLine(line))
.map(line=>{
AccessLog.parseLogLine(line)
})

accessLog.cache()

val dataFrame=hiveSpark.createDataFrame(accessLog)
dataFrame.createOrReplaceTempView("url_access")

//年月日
//val yearMonthDay=TimeUtils.getYearMonthDay(-1)
val fileDate=TimeUtils(TimeUtils.strToDate(fileName))
val fileYearMonthDay=TimeUtils.getYearMonthDay(0)
val year=fileYearMonthDay.year
val month=fileYearMonthDay.month
val day=fileYearMonthDay.day

hiveSpark.sql("USE hushijie_dw")
hiveSpark.sql("SET hive.exec.dynamic.partition=true")
hiveSpark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
hiveSpark.sql(s"INSERT OVERWRITE TABLE ods_url_access PARTITION (year ='$year',month ='$month',day ='$day') SELECT * FROM url_access")

accessLog.unpersist()
sc.stop()
hiveSp

医院活跃度

import com.icu.bigdata.model.NewOldUsers
import com.icu.bigdata.utils._
import org.slf4j.LoggerFactory
import scalikejdbc._

import scala.collection.mutable.ListBuffer

object Hospital {
val logger = LoggerFactory.getLogger(Hospital.getClass)
/**
* 增加活跃度
*/
def addActivity(): Unit ={
val hiveSpark=SparkUtil.hiveSpark("addActivity")

val sql=hiveSpark.sqlContext
import sql.implicits._

val fileYearMonthDay=TimeUtils.getYearMonthDay(-1)
val year=fileYearMonthDay.year
val month=fileYearMonthDay.month
val day=fileYearMonthDay.day

sql.sql("USE hushijie_dw")
sql.sql(s"select * from dw_method_total where year ='$year' and month ='$month' and day ='$day' " +
s"and methodname in('appLogin','hospitalLogin') limit 10").createOrReplaceTempView("methods")

//sql.sql(s"select * from dw_method_total where year ='$year' and month ='$month' and day ='$day' " +
// s"and methodname ='doFilter' and classname='c.i.f.HttpRequestCharacterFilter' ").createOrReplaceTempView("methods")

sql.cacheTable("methods")

//获取用户id集合,去除null
val accountids=sql.sql("select accountids from methods").collect().flatMap(x=>{
x.mkString.split(",").filter(line=>NewOldUsers.isValidateAccountids(line))
})

sql.uncacheTable("methods")

MysqlConnectionUtils.formMysql1()
var activity=new ListBuffer[Int]

//取出医院id
accountids.foreach(t=>{
val id=t.toInt
//println(id)
val activitySql="select a.hospital from account a join hospital h on a.hospital=h.id " +
" where 1=1 and a.id="+id+" and h.`level`13"
val value = DB.readOnly(implicit session =>
SQL(activitySql).map({
rs => rs.intOpt("hospital").getOrElse(0)
}).list().apply().mkString)
if(!value.isEmpty){
activity.append(value.toInt)
}
})
val newDate=TimeUtils.getCertainDayTimeStr(0)
val beforeDate=TimeUtils.getCertainDayTimeStr(-1)
val totalSql=s"""select count(1) as num from hospital where 1=1 and to_days(ctime)
SQL(totalSql).map({
rs => rs.int("num")
}).list().apply().mkString)

ConnectionPool.closeAll

//活跃医院总数
val currTotal=activity.distinct.size
sql.createDataset(Seq((total,currTotal.toString,beforeDate))).createOrReplaceTempView("activitys")

val sqlModel=SqlModel("hushijie_dw","dw_hospital_activity","activitys")
SqlUtil.addHive(sql,sqlModel,fileYearMonthDay)

logger.info("成功----"+newDate)
hiveSpar

统计所有方法中间表

/**
* 统计所有方法
*/
def addMethodTotal(): Unit ={
val hiveSpark=SparkUtil.hiveSpark("addMethodTotal")

import hiveSpark.implicits._
val sql=hiveSpark.sqlContext
import sql.implicits._

val fileYearMonthDay=TimeUtils.getYearMonthDay(-1)
val year=fileYearMonthDay.year
val month=fileYearMonthDay.month
val day=fileYearMonthDay.day

sql.sql("USE hushijie_dw")
sql.sql("set hive.fetch.task.conversion=none")
sql.sql("set hive.exec.orc.split.strategy=BI")
sql.sql(s"select classname,methodname,accountid,requestid,result" +
s" from ods_url_logstash where year ='$year' and month ='$month' and day ='$day'").createOrReplaceTempView("logs")
sql.cacheTable("logs")

sql.sql("select classname,methodname,concat_ws(',',collect_set(accountid)) as accountids," +
"concat_ws(',',collect_set(requestid)) as requestids, concat_ws(',',collect_set(result)) as results " +
"from logs group by classname,methodname ").createOrReplaceTempView("methods")

sql.uncacheTable("logs")

val sqlModel=SqlModel("hushijie_dw","dw_method_total","methods")
SqlUtil.addHive(sql,sqlModel,fileYearMonthDay)
logger.info("addMethodTotal成功-----")

hiveSpark

这个上文提到聚合成集合表,可以分解处理各个流程,便于统计。

九.闭环体系

所谓闭环体系,其实是把各个岗位的目标明确化,以数据驱动,把指标明细化,转换成一个流程循环改善运转,比如:

用户增长实战 13

闭环

这样进行A/B测试,根据不同版本,筛选出不同指标,筛选出不同的用户角色,把整个体系运转出来,这样就可知道,类似功能,根据不同设计,可以看出用户指标情况,以后调整后续决策。

比如反馈出问题,题库功能,难点特别大,题目重复性多,导入题库操作性太强等等。

十.用户分级

用户分级是因为,ToB产品的特殊性,一开始,他们仅有的少数早期客户把他们看成咨询顾问,而后的阶段中,他们的客户则要接受更加标准和通用的产品或服务。

后来眼光不断上移,关注用户分级情况,推动市场的一些决策,比如,地推,付费率,回归出收益的具体情况。

用户增长实战 14

用户分级

十.一些周边应用

这个体系构建完成后,基本可以满足一些周边,比如:

因为产品涉及到考试,会有作弊情况,通过爬虫,测试库,可以去比对考试题情况,这个方式一般两种,一种系统端,做些策略,一种业务上做些保护措施,比如加密,通过用户的一些成绩详细信息列入可疑范围名单。题库去重,当时的策略,海明距离。预测收益,行为。标签库。

海明距离

传统的hash 算法只负责将原始内容尽量均匀随机地映射为一个签名值,原理上相当于伪随机数产生算法,进行比对,计算效率慢。Google 的 Simhash 算法产生的签名,可以用来比较原始内容的相似度时,计算效率快,这个最开始应用Google页面去重的算法,有以下几个步骤:

分词hash加权合并降维拆分比对

具体看算法源码

public class Simhash implements Serializable {
private static final long serialVersionUID = 4117064662596213588L;
private JiebaSegmenter segmenter = new JiebaSegmenterChild(); // 分词 名词动词形容词
private List> storage = new ArrayList>();// 按照分段存储simhash,查找更快速
private int bitNum = 64;
private int fracCount = 4; // 默认按照4段进行simhash存储
private int fracBitNum = bitNum / fracCount;//分组16位为一组
private int hammingThresh = 3;// 汉明距离的衡量标准

public Simhash() {
for (int i = 0; i < fracCount; i++) storage.add(new HashMap());
}

public Simhash(int fracCount, int hammingThresh) {
this.fracCount = fracCount;
fracBitNum = bitNum / fracCount;
this.hammingThresh = hammingThresh;
for (int i = 0; i < fracCount; i++) storage.add(new HashMap());
}

/**
* 指定文本计算simhash值
*
* @param content
* @return Long
*/
public Long calSimhash(String content) {
// p{Punct} 标点符号:!"#$%&'()*+,-./:;[email protected][]^_`{|}~
// p{Space} 空白字符:[ tnx0Bfr]
String filterContent = content.trim().replaceAll("\p{Punct}|\p{Space}", "");
// 切词
List lsegStr = segmenter.process(filterContent, JiebaSegmenter.SegMode.SEARCH);

// 按照词语的hash值,计算simHashWeight(低位对齐)
Integer[] weight = new Integer[bitNum];
Arrays.fill(weight, 0);
for (SegToken st : lsegStr) {
long wordHash = Murmur3.hash64(st.word.getBytes());
for (int i = 0; i > i) & 1) == 1) weight[i] += 1;
else weight[i] -= 1;
}
}

// 计算得到Simhash值
StringBuilder sb = new StringBuilder();
for (int i = 0; i 0) sb.append(1);
else sb.append(0);
}

return new BigInteger(sb.toString(), 2).longValue();
}

/**
* 判断文本是否重复
*
* @param content
* @return
*/
public boolean isDuplicate(String content,Long oldSimhash) {
Long simhash=null;
if(oldSimhash == null){
simhash = calSimhash(content);
}else{
simhash = oldSimhash;
}

List lFrac = splitSimhash(simhash);
int dis = 0;
for (int i = 0; i < fracCount; i++) { String frac = lFrac.get(i); Map fracMap = storage.get(i);
if (fracMap.containsKey(frac)) {
for (Long simhash2 : fracMap.get(frac)) {
if (hamming(simhash, simhash2) < hammingThresh) return true; } } } return false; } /** * 按照(frac, )索引进行存储 * * @param simhash * @param content */ public void store(Long simhash, String content) { List lFrac = splitSimhash(simhash); for (int i = 0; i < fracCount; i++) { String frac = lFrac.get(i); Map fracMap = storage.get(i);
if (fracMap.containsKey(frac)) fracMap.get(frac).add(simhash);
else {
List ls = new ArrayList();
ls.add(simhash);
fracMap.put(frac, ls);
}
}

}

// 计算汉明距离
private int hamming(Long s1, Long s2) {
int dis = 0;
for (int i = 0; i > i & 1) != (s2 >> i & 1)) dis++;
}
return dis;
}

private int hamming(String s1, String s2) {
if (s1.length() != s2.length()) return 0;
int dis = 0;
for (int i = 0; i < s1.length(); i++) { if (s1.charAt(i) != s2.charAt(i)) dis++; } return dis; } // 将simhash分成n段 private List splitSimhash(Long simhash) { List ls = new ArrayList(); StringBuilder sb = new StringBuilder(); for (int i = 0; i > i & 1);
if ((i + 1) % fracBitNum == 0) {
ls.add(sb.toString());
sb.setLength(0);
}
}
return ls;
}
}
//hash算法
public class Murmur3 implements Serializable {

// Constants for 32 bit variant
private static final int C1_32 = 0xcc9e2d51;
private static final int C2_32 = 0x1b873593;
private static final int R1_32 = 15;
private static final int R2_32 = 13;
private static final int M_32 = 5;
private static final int N_32 = 0xe6546b64;

// Constants for 128 bit variant
private static final long C1 = 0x87c37b91114253d5L;
private static final long C2 = 0x4cf5ad432745937fL;
private static final int R1 = 31;
private static final int R2 = 27;
private static final int R3 = 33;
private static final int M = 5;
private static final int N1 = 0x52dce729;
private static final int N2 = 0x38495ab5;

private static final int DEFAULT_SEED = 0;
private static final long serialVersionUID = -6219781538366994038L;

/**
* Murmur3 32-bit variant.
*
* @param data - input byte array
* @return - hashcode
*/
public static int hash32(byte[] data) {
return hash32(data, data.length, DEFAULT_SEED);
}

/**
* Murmur3 32-bit variant.
*
* @param data - input byte array
* @param length - length of array
* @param seed - seed. (default 0)
* @return - hashcode
*/
public static int hash32(byte[] data, int length, int seed) {
int hash = seed;
final int nblocks = length >> 2;

// body
for (int i = 0; i < nblocks; i++) { int i_4 = i << 2; int k = (data[i_4] & 0xff) | ((data[i_4 + 1] & 0xff) << 8) | ((data[i_4 + 2] & 0xff) << 16) | ((data[i_4 + 3] & 0xff) << 24); // mix functions k *= C1_32; k = Integer.rotateLeft(k, R1_32); k *= C2_32; hash ^= k; hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32; } // tail int idx = nblocks << 2; int k1 = 0; switch (length - idx) { case 3: k1 ^= data[idx + 2] << 16; case 2: k1 ^= data[idx + 1] <>> 16);
hash *= 0x85ebca6b;
hash ^= (hash >>> 13);
hash *= 0xc2b2ae35;
hash ^= (hash >>> 16);

return hash;
}

/**
* Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
*
* @param data - input byte array
* @return - hashcode
*/
public static long hash64(byte[] data) {
return hash64(data, data.length, DEFAULT_SEED);
}

/**
* Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
*
* @param data - input byte array
* @param length - length of array
* @param seed - seed. (default is 0)
* @return - hashcode
*/
public static long hash64(byte[] data, int length, int seed) {
long hash = seed;
final int nblocks = length >> 3;

// body
for (int i = 0; i < nblocks; i++) { final int i8 = i << 3; long k = ((long) data[i8] & 0xff) | (((long) data[i8 + 1] & 0xff) << 8) | (((long) data[i8 + 2] & 0xff) << 16) | (((long) data[i8 + 3] & 0xff) << 24) | (((long) data[i8 + 4] & 0xff) << 32) | (((long) data[i8 + 5] & 0xff) << 40) | (((long) data[i8 + 6] & 0xff) << 48) | (((long) data[i8 + 7] & 0xff) << 56); // mix functions k *= C1; k = Long.rotateLeft(k, R1); k *= C2; hash ^= k; hash = Long.rotateLeft(hash, R2) * M + N1; } // tail long k1 = 0; int tailStart = nblocks << 3; switch (length - tailStart) { case 7: k1 ^= ((long) data[tailStart + 6] & 0xff) << 48; case 6: k1 ^= ((long) data[tailStart + 5] & 0xff) << 40; case 5: k1 ^= ((long) data[tailStart + 4] & 0xff) << 32; case 4: k1 ^= ((long) data[tailStart + 3] & 0xff) << 24; case 3: k1 ^= ((long) data[tailStart + 2] & 0xff) << 16; case 2: k1 ^= ((long) data[tailStart + 1] & 0xff) <> 4;

// body
for (int i = 0; i < nblocks; i++) { final int i16 = i << 4; long k1 = ((long) data[i16] & 0xff) | (((long) data[i16 + 1] & 0xff) << 8) | (((long) data[i16 + 2] & 0xff) << 16) | (((long) data[i16 + 3] & 0xff) << 24) | (((long) data[i16 + 4] & 0xff) << 32) | (((long) data[i16 + 5] & 0xff) << 40) | (((long) data[i16 + 6] & 0xff) << 48) | (((long) data[i16 + 7] & 0xff) << 56); long k2 = ((long) data[i16 + 8] & 0xff) | (((long) data[i16 + 9] & 0xff) << 8) | (((long) data[i16 + 10] & 0xff) << 16) | (((long) data[i16 + 11] & 0xff) << 24) | (((long) data[i16 + 12] & 0xff) << 32) | (((long) data[i16 + 13] & 0xff) << 40) | (((long) data[i16 + 14] & 0xff) << 48) | (((long) data[i16 + 15] & 0xff) << 56); // mix functions for k1 k1 *= C1; k1 = Long.rotateLeft(k1, R1); k1 *= C2; h1 ^= k1; h1 = Long.rotateLeft(h1, R2); h1 += h2; h1 = h1 * M + N1; // mix functions for k2 k2 *= C2; k2 = Long.rotateLeft(k2, R3); k2 *= C1; h2 ^= k2; h2 = Long.rotateLeft(h2, R1); h2 += h1; h2 = h2 * M + N2; } // tail long k1 = 0; long k2 = 0; int tailStart = nblocks << 4; switch (length - tailStart) { case 15: k2 ^= (long) (data[tailStart + 14] & 0xff) << 48; case 14: k2 ^= (long) (data[tailStart + 13] & 0xff) << 40; case 13: k2 ^= (long) (data[tailStart + 12] & 0xff) << 32; case 12: k2 ^= (long) (data[tailStart + 11] & 0xff) << 24; case 11: k2 ^= (long) (data[tailStart + 10] & 0xff) << 16; case 10: k2 ^= (long) (data[tailStart + 9] & 0xff) << 8; case 9: k2 ^= (long) (data[tailStart + 8] & 0xff); k2 *= C2; k2 = Long.rotateLeft(k2, R3); k2 *= C1; h2 ^= k2; case 8: k1 ^= (long) (data[tailStart + 7] & 0xff) << 56; case 7: k1 ^= (long) (data[tailStart + 6] & 0xff) << 48; case 6: k1 ^= (long) (data[tailStart + 5] & 0xff) << 40; case 5: k1 ^= (long) (data[tailStart + 4] & 0xff) << 32; case 4: k1 ^= (long) (data[tailStart + 3] & 0xff) << 24; case 3: k1 ^= (long) (data[tailStart + 2] & 0xff) << 16; case 2: k1 ^= (long) (data[tailStart + 1] & 0xff) <>> 33);
h *= 0xff51afd7ed558ccdL;
h ^= (h >>> 33);
h *= 0xc4ceb9fe1a85ec53L;
h ^= (h >>> 33);
return h;
}
}
//测试
public class Main {
public static void main(String[] args) throws IOException {
List ls = FileUtils.readLines(new File(""));
Simhash simhash = new Simhash(4, 3);
for (String content : ls) {
Long simhashVal = simhash.calSimhash(content);
System.out.println(Long.toBinaryString(simhashVal));
System.out.println(simhash.isDuplicate(content,simhashVal));
simhash.s

十二.总结

到此也就结束本章的讲解了,我也在每个大标题下的小结中,提出很多问题,有兴趣的思考下,留言给我互相交流。用户增长是个大学问,我也是不断的参悟、不断的放下、不断的完善、慢慢会有所修为。

十三.作者介绍

李孟,目前就职于知因智慧数据科技有限公司,负责数据中台数据引擎基础架构设计和中间件开发,专注云计算大数据方向。

博客:https://blog.csdn.net/qq_19968255"