Spark-submit消费kafka数据生产案例
package com.sm.stream
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import java.text.SimpleDateFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.TableName
import java.util.Date
import org.apache.hadoop.hbase.client.Table
import java.util.ArrayList
import com.alibaba.fastjson.JSONObject
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.Connection
object Consumer {
def main(args: Array[String]) {
println("version:0.0.4")
if (args.length < 5) {
System.err.println("Usage: Consumer <zkQuorum>,<zkHbase> ,<group>, <topics>, <numThreads>")
System.exit(1)
}
Logger.getRootLogger.setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val Array(zkQuorum, zkHbase, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("Consumer")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(30))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map(x => (x.trim(), numThreads.toInt)).toMap
// println(s"the topicMap is:$topicMap")
val keyvalues = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")
keyvalues.foreachRDD(rdd => {
rdd.foreachPartition(recordPart => {
val beg = System.currentTimeMillis()
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, zkHbase)
val conn = ConnectionFactory.createConnection(conf)
recordPart.foreach {
record =>
try {
record._1 match {
case "safeclound_smoke" => save(conn, "safeclound.tb_smoke", record, format)
case "safeclound_co1" => save(conn, "safeclound.tb_co", record, format)
case "safeclound_ray" => save(conn, "safeclound.tb_ray", record, format)
case "safeclound_atmos" => save(conn, "safeclound.tb_atmos", record, format)
case "safeclound_air" => save(conn, "safeclound.tb_air", record, format)
case "safeclound_water" => save(conn, "safeclound.tb_water", record, format)
case "safeclound_hum" => save(conn, "safeclound.tb_hum", record, format)
case "safeclound_elec_power_facter" => save(conn, "safeclound.tb_elec_powerfactor", record, format)
case "safeclound_elec_power" => save(conn, "safeclound.tb_elec_power", record, format)
case "safeclound_elec_phase_v" => save(conn, "safeclound.tb_elec_phasev", record, format)
case "safeclound_elec_phase_i" => save(conn, "safeclound.tb_elec_phasei", record, format)
case "safeclound_elec_line_v" => save(conn, "safeclound.tb_elec_linev", record, format)
case "safeclound_elec_freq" => save(conn, "safeclound.tb_elec_freq", record, format)
case "safeclound_water_njyc" => save(conn, "safeclound.tb_water_njyc", record, format)
case "safeclound_ammeter" => save(conn, "safeclound.tb_ammeter", record, format)
case "safeclound_harmonic_i" => save(conn, "safeclound.tb_harmonic_i", record, format)
case "safeclound_harmonic_v" => save(conn, "safeclound.tb_harmonic_v", record, format)
case "safeclound_rate_i" => save(conn, "safeclound.tb_rate_i", record, format)
case "safeclound_rate_v" => save(conn, "safeclound.tb_rate_v", record, format)
case "safeclound_phase_v_rate" => save(conn, "safeclound.tb_phase_v_rate", record, format)
case "safeclound_freq_rate" => save(conn, "safeclound.tb_freq_rate", record, format)
case "safeclound_door_ctrl" => save(conn, "safeclound.tb_door_ctrl", record, format)
case "safeclound_demand" => save(conn, "safeclound.tb_demand", record, format)
case "safeclound_heartbeat" => save(conn, "safeclound.tb_heartbeat", record, format)
case "safeclound_pump_controller" => save(conn, "safeclound.tb_pump_controller", record, format)
case "safeclound_ggj_controller" => save(conn, "safeclound.tb_ggj_controller", record, format)
case "safeclound_compensation_module" => save(conn, "safeclound.tb_compensation_module", record, format)
case "safeclound_hcho" => save(conn, "safeclound.tb_hcho", record, format)
case "safeclound_tvoc" => save(conn, "safeclound.tb_tvoc", record, format)
case "safeclound_co2" => save(conn, "safeclound.tb_co2", record, format)
case "safeclound_largestDemand" => save(conn, "safeclound.tb_largestDemand", record, format)
case other => println(s"the other is:$other")
}
} catch {
case e: Exception => e.printStackTrace()
}
}
conn.close()
// val time = System.currentTimeMillis() - beg
// println(s"save to hbase cost time:$time ms")
})
})
ssc.start()
ssc.awaitTermination()
}
def save(conn: Connection, table: String, record: (String, String), format: SimpleDateFormat) {
val obj = JSON.parseObject(record._2);
val timestamp = obj.getString("timestamp").toLong
val date = format.format(new Date(timestamp))
val addr = obj.getString("addr")
val dataType = obj.getString("dataType")
val meterType = obj.getString("meterType")
val order = obj.getString("order")
val tag = (date == null || addr == null || dataType == null || meterType == null || order == null)
if (!tag) {
val rowkey = Array(addr, dataType, meterType, order, date).mkString(",")
val put = new Put(Bytes.toBytes(rowkey))
// if("ENV_CO".equals(dataType))
// println(s"the rowkey is:$rowkey")
analysis(table,obj, put)
val tab = TableName.valueOf(table)
val tabConn = conn.getTable(tab)
tabConn.put(put)
tabConn.close()
} else {
println(s"date:$date,addr:$addr,dataType:$dataType,meterType:$meterType,order:$order")
}
}
def analysis(table:String,json: JSONObject, put: Put) {
val iterator = json.keySet().iterator()
while (iterator.hasNext()) {
val key = iterator.next();
val tag = key.equals("addr") || key.equals("dataType") || key.equals("meterType") || key.equals("order") || key.equals("timestamp")
if (!tag) {
val vObj = json.get(key)
//处理特殊字符
val decodeKey = if ("value".equals(key)) "value_" else key
if (vObj.isInstanceOf[String] || vObj.isInstanceOf[java.math.BigDecimal]) {
val value = json.getString(key)
// println(s"colname:$decodeKey,value:$value")
// put.addColumn("info".getBytes, decodeKey.getBytes, value.getBytes)
putRowData(table,put,decodeKey,value)
} else if ((vObj.isInstanceOf[JSONObject])) {
val jsonObj = vObj.asInstanceOf[JSONObject]
analysis(table,jsonObj, put)
} else {
val clazz = vObj.getClass
println(s"unknow type:$clazz,the key value:($decodeKey,$vObj)")
}
}
}
}
def putRowData(table: String, put: Put,key: String,value: String){
var columnFamily : String = "info"
if("safeclound.tb_compensation_module".equals(table)){
val map = Map("bid"->"info1","cid"->"info1","ITHDa"->"info1","ITHDb"->"info1","ITHDc"->"info1","Ia"->"info1","Ib"->"info1","Ic"->"info1",
"T1"->"info1","T2"->"info1","T3"->"info1","T4"->"info1","T5"->"info1","Vab"->"info1","Vbc"->"info1","Vca"->"info1",
"fanStatus"->"info2","recoverTimesLeft"->"info2","runStatus"->"info2","sAntiTimeLimitP"->"info2",
"sLowVoltage"->"info2","sOverVoltage"->"info2","sQuickBreak"->"info2","sSimulate"->"info2","sStopPhase"->"info2",
"sT1"->"info2","sT2"->"info2","sT3"->"info2","sT4"->"info2","sT5"->"info2","sTHD"->"info2","switchTimes"->"info2")
columnFamily = map(key)
}
var realVal = value
if(value==null||value==""){
realVal = "0"
}
//println(s"columnFamily:$columnFamily,colname:$key,value:realVal")
put.addColumn(columnFamily.getBytes, key.getBytes, realVal.getBytes)
}
}