flume-ng+kafka+spark streaming实现日志收集分析 | AiTi修炼|重剑无锋,拈花微笑
Header
Header

基于flume-ng+kafka+spark streaming实现日志收集分析

基于flume-ng+kafka+spark streaming可以快速实现日志收集分析功能,在架构上可以划分为三个模块:日志收集模块、日志分发模块、日志分析模块,在设计上考虑模块之间无缝衔接,以及实现日志收集分析时需要保障体系架构具有分布式、高扩展性、高可靠性、实时性的特点。

日志收集——flume-ng

日志收集模块的功能主要由flume-ng组件实现,负责从不同的日志源收集日志数据,并转发给日志分发模块,收集汇总日志数据。

日志分发——kafka

日志分发主要正成为大数据处理的重要消息中间——kafka实现,当日志分发模块收到日志数据后,根据不同的主题把日志流进行分组,再把对应的日志流发送给订阅了该主题的日志分析模块,进行分析实时分析处理。

日志分析——spark streaming

在日志分析方面主要采用spark streaming进行实现,可以从kafka实时的消费数据,进行近实时的数据处理与分析。

整个逻辑架构如下:
flume_kafka_spark

在上述的架构中,其实spark streaming是可以直接读取flume-ng的数据进行直接数据分析,但是我们在实际生产环境中并没有采用这种方式,主要的原因是生产环境中一般数据量都比较大,如果spark streaming处理数据的效率慢于flume-ng端收集的数据的话,就会造成flume-ng端需要更多承担数据缓存,甚至面临数据丢失的风险。因此在生产环境中我们会在flume-ng与spark之间增加一个数据缓冲组件kafka,flume-ng将数据快速写入kafka的集群几点中,而spark streaming只需要从kafka中根据flume-ng生产的不同topic日志数据,订阅消费数据就可以。

不过对于加深spark的应用学习,我们有必要了解spark 如何直接从flume-ng读取数据进行处理。

spark streaming读取flume-ng的数据

  • 使用spark streaming读取flume-ng的数据(被动收取数据flume-ne ——> spark streaming)
    需要在代码中引入一个jar包,** FlumeUtils.createStream**来创建JavaReceiverInputDStream,具体引入的jar根据spark官网不同版本文档指引导入。
groupId = org.apache.spark
artifactId = spark-streaming-flume_2.10
version = 2.1.1

代码实现如下:

package sparkapp.sparkStreamig;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import scala.Tuple2;

import java.lang.reflect.Array;
import java.util.Arrays;

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with an AvroSink in Flume. It will start
 *  an Avro server on at the request host:port address and listen for requests.
 *  Your Flume AvroSink should be pointed to this address.
 *
 *  Usage: JavaFlumeEventCount <host> <port>
 *    <host> is the host the Flume receiver will be started on - a receiver
 *           creates a server and listens for flume events.
 *    <port> is the port the Flume receiver will listen on.
 *
 *  To run this example:
 *     `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
 */
public final class JavaFlumeEventCount {
  private JavaFlumeEventCount() {
  }

  public static void main(String[] args) {
    if (args.length != 2) {
      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
      System.exit(1);
    }

    StreamingExamples.setStreamingLogLevels();

    String host = args[0];
    int port = Integer.parseInt(args[1]);

    Duration batchInterval = new Duration(2000);
    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");

    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
    /**
     * 通过FlumeUtils来创建JavaReceiverInputDStream
     */
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
    /**
     * 分割字符串
     */
    JavaDStream<String> words=flumeStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
      @Override
      public Iterable<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
        String content=new String(sparkFlumeEvent.event().getBody().array());
        return Arrays.asList(content.split(" "));
      }
    });
    /**
     *
     */
    JavaPairDStream<String,Integer> pairs= words.mapToPair(new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String word) throws Exception {
        return new Tuple2<String, Integer>(word,1);
      }
    });
    /**
     * 统计单词个数
     */
    JavaPairDStream<String, Integer> reduceInfo=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) throws Exception {
        return i1+i2;
      }
    });
    /**
     * 触发job
     */
    reduceInfo.print();

    ssc.start();
    ssc.awaitTermination();
  }
}

  • 使用spark streaming拉取flume-ng的数据(主动收取数据spark streaming——> flume-ne)

使用 FlumeUtils.createPollingStream(ssc,host,port)方法来创建JavaReceiverInputDStream,上述代码只需要修改获取JavaReceiverInputDStream的代码即可改变spark streaming读取数据的方式。该方式主要是考虑spark streaming自己处理多少数据就拉取多少数据,而不是由flume-ng将各个客户端收集的数据推送到spark streaming进行处理。

FlumeUtils.createPollingStream(ssc,host,port);

rscala.com版权所有,本文基于flume-ng+kafka+spark streaming实现日志收集分析转载请注明出处:http://rscala.com/index.php/461.html

该文章归档分类于 spark实践, 大数据基础

Leave a Reply

电子邮件地址不会被公开。 必填项已用*标注

*

code