Spark-submit消费kafka数据生产案例

package com.sm.stream


import org.apache.log4j.Level

import org.apache.log4j.Logger

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Seconds

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.kafka.KafkaUtils

import java.text.SimpleDateFormat

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Put

import com.alibaba.fastjson.JSON

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase.client.ConnectionFactory

import org.apache.hadoop.hbase.TableName

import java.util.Date

import org.apache.hadoop.hbase.client.Table

import java.util.ArrayList

import com.alibaba.fastjson.JSONObject

import org.apache.hadoop.hbase.HConstants

import org.apache.hadoop.hbase.client.Connection


object Consumer {

  def main(args: Array[String]) {

    println("version:0.0.4")

    if (args.length < 5) {

      System.err.println("Usage: Consumer <zkQuorum>,<zkHbase> ,<group>, <topics>, <numThreads>")

      System.exit(1)

    }

    Logger.getRootLogger.setLevel(Level.WARN)

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val Array(zkQuorum, zkHbase, group, topics, numThreads) = args

    val sparkConf = new SparkConf().setAppName("Consumer")

      .setMaster("local[*]")

    val ssc = new StreamingContext(sparkConf, Seconds(30))


    ssc.checkpoint("checkpoint")


    val topicMap = topics.split(",").map(x => (x.trim(), numThreads.toInt)).toMap

//    println(s"the topicMap is:$topicMap")

    val keyvalues = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

    keyvalues.foreachRDD(rdd => {

      rdd.foreachPartition(recordPart => {

        val beg = System.currentTimeMillis()

        val conf = HBaseConfiguration.create()

        conf.set(HConstants.ZOOKEEPER_QUORUM, zkHbase)

        val conn = ConnectionFactory.createConnection(conf)

        recordPart.foreach {

          record =>

            try {

              record._1 match {

                case "safeclound_smoke"             => save(conn, "safeclound.tb_smoke", record, format)

                case "safeclound_co1"               => save(conn, "safeclound.tb_co", record, format)

                case "safeclound_ray"               => save(conn, "safeclound.tb_ray", record, format)

                case "safeclound_atmos"             => save(conn, "safeclound.tb_atmos", record, format)

                case "safeclound_air"               => save(conn, "safeclound.tb_air", record, format)

                case "safeclound_water"             => save(conn, "safeclound.tb_water", record, format)

                case "safeclound_hum"               => save(conn, "safeclound.tb_hum", record, format)

                case "safeclound_elec_power_facter" => save(conn, "safeclound.tb_elec_powerfactor", record, format)

                case "safeclound_elec_power"        => save(conn, "safeclound.tb_elec_power", record, format)

                case "safeclound_elec_phase_v"      => save(conn, "safeclound.tb_elec_phasev", record, format)

                case "safeclound_elec_phase_i"      => save(conn, "safeclound.tb_elec_phasei", record, format)

                case "safeclound_elec_line_v"       => save(conn, "safeclound.tb_elec_linev", record, format)

                case "safeclound_elec_freq"         => save(conn, "safeclound.tb_elec_freq", record, format)

                case "safeclound_water_njyc"        => save(conn, "safeclound.tb_water_njyc", record, format)

                case "safeclound_ammeter"           => save(conn, "safeclound.tb_ammeter", record, format)

                case "safeclound_harmonic_i"        => save(conn, "safeclound.tb_harmonic_i", record, format)

                case "safeclound_harmonic_v"        => save(conn, "safeclound.tb_harmonic_v", record, format)

                case "safeclound_rate_i"            => save(conn, "safeclound.tb_rate_i", record, format)

                case "safeclound_rate_v"            => save(conn, "safeclound.tb_rate_v", record, format)

                case "safeclound_phase_v_rate"      => save(conn, "safeclound.tb_phase_v_rate", record, format)

                case "safeclound_freq_rate"         => save(conn, "safeclound.tb_freq_rate", record, format)

                case "safeclound_door_ctrl"         => save(conn, "safeclound.tb_door_ctrl", record, format)

                case "safeclound_demand"            => save(conn, "safeclound.tb_demand", record, format)

                case "safeclound_heartbeat"         => save(conn, "safeclound.tb_heartbeat", record, format)

                case "safeclound_pump_controller"         => save(conn, "safeclound.tb_pump_controller", record, format)

                case "safeclound_ggj_controller"         => save(conn, "safeclound.tb_ggj_controller", record, format)

                case "safeclound_compensation_module"         => save(conn, "safeclound.tb_compensation_module", record, format)

                case "safeclound_hcho"             => save(conn, "safeclound.tb_hcho", record, format)

                case "safeclound_tvoc"             => save(conn, "safeclound.tb_tvoc", record, format)

                case "safeclound_co2"              => save(conn, "safeclound.tb_co2", record, format)

                case "safeclound_largestDemand"         => save(conn, "safeclound.tb_largestDemand", record, format)

                case other                          => println(s"the other is:$other")

              }

            } catch {

              case e: Exception => e.printStackTrace()

            }

        }

        conn.close()

       // val time = System.currentTimeMillis() - beg

       // println(s"save to hbase cost time:$time ms")

      })

    })

    ssc.start()

    ssc.awaitTermination()

  }

  def save(conn: Connection, table: String, record: (String, String), format: SimpleDateFormat) {

    val obj = JSON.parseObject(record._2);

    val timestamp = obj.getString("timestamp").toLong

    val date = format.format(new Date(timestamp))

    val addr = obj.getString("addr")

    val dataType = obj.getString("dataType")

    val meterType = obj.getString("meterType")

    val order = obj.getString("order")

    val tag = (date == null || addr == null || dataType == null || meterType == null || order == null)

    if (!tag) {

      val rowkey = Array(addr, dataType, meterType, order, date).mkString(",")

      val put = new Put(Bytes.toBytes(rowkey))

     // if("ENV_CO".equals(dataType))

      //  println(s"the rowkey is:$rowkey")

      analysis(table,obj, put)

      val tab = TableName.valueOf(table)

      val tabConn = conn.getTable(tab)

      tabConn.put(put)

      tabConn.close()

    } else {

      println(s"date:$date,addr:$addr,dataType:$dataType,meterType:$meterType,order:$order")

    }

  }

  def analysis(table:String,json: JSONObject, put: Put) {

    val iterator = json.keySet().iterator()

    while (iterator.hasNext()) {

      val key = iterator.next();

      val tag = key.equals("addr") || key.equals("dataType") || key.equals("meterType") || key.equals("order") || key.equals("timestamp")

      if (!tag) {

        val vObj = json.get(key)

        //处理特殊字符

        val decodeKey = if ("value".equals(key)) "value_" else key

        if (vObj.isInstanceOf[String] || vObj.isInstanceOf[java.math.BigDecimal]) {

          val value = json.getString(key)

//          println(s"colname:$decodeKey,value:$value")

//          put.addColumn("info".getBytes, decodeKey.getBytes, value.getBytes)

          putRowData(table,put,decodeKey,value)

        } else if ((vObj.isInstanceOf[JSONObject])) {

          val jsonObj = vObj.asInstanceOf[JSONObject]

          analysis(table,jsonObj, put)

        } else {

          val clazz = vObj.getClass

          println(s"unknow type:$clazz,the key value:($decodeKey,$vObj)")

        }

      }

    }

  }

  

  def putRowData(table: String, put: Put,key: String,value: String){

    var columnFamily : String = "info"

    if("safeclound.tb_compensation_module".equals(table)){

      val map = Map("bid"->"info1","cid"->"info1","ITHDa"->"info1","ITHDb"->"info1","ITHDc"->"info1","Ia"->"info1","Ib"->"info1","Ic"->"info1",

                    "T1"->"info1","T2"->"info1","T3"->"info1","T4"->"info1","T5"->"info1","Vab"->"info1","Vbc"->"info1","Vca"->"info1",

                    "fanStatus"->"info2","recoverTimesLeft"->"info2","runStatus"->"info2","sAntiTimeLimitP"->"info2",

                    "sLowVoltage"->"info2","sOverVoltage"->"info2","sQuickBreak"->"info2","sSimulate"->"info2","sStopPhase"->"info2",

                    "sT1"->"info2","sT2"->"info2","sT3"->"info2","sT4"->"info2","sT5"->"info2","sTHD"->"info2","switchTimes"->"info2")

      columnFamily = map(key)

    }

    var realVal = value

    if(value==null||value==""){

      realVal = "0"

    }

    //println(s"columnFamily:$columnFamily,colname:$key,value:realVal")

    put.addColumn(columnFamily.getBytes, key.getBytes, realVal.getBytes)

  }

}


评论
© Saxon | Powered by LOFTER
上一篇 下一篇