使用flink(scala)和Flume从虚拟机端口上传数据到idae控制台中

  • 配置Flume文件(配置与Flume文件conf文件夹中)
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#source
a1.sources.r1.type=exec
a1.sources.r1.command=nc 0.0.0.0 7777
#sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=flinktestTopic
a1.sinks.k1.kafka.bootstrap.servers=xueai:9092
a1.sinks.k1.kafka.produces.acks=1
#channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  • flink代码(scala版)

    • maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
            <version>1.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>
  • flink代码

import org.apache.flink.configuration
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.time.Time

object test01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop = new Properties()

    prop.setProperty("bootstrap.servers","ip地址:9092")
    val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("flinktestTopic",new SimpleStringSchema(),prop))
    inputStream.print()

    env.execute()
  }
}
  • 步骤

    • 第一步:先运行flink代码
    • 第二步:正常启动集群
    • 第三步:先执行连接端口输入消费单(输入数据):nc -lk 7777
    • 第四步:执行Flume(我这里用的是绝对路径,因为没有配环境变量
      • flume-ng agent -c /home/hduser/bigdata/flume-1.7.0/conf -f /home/hduser/bigdata/flume-1.7.0/conf/ceshi01.conf -n a1 -Dflume.root.logger=INFO,console

                        

  • 最后结果