频道栏目
首页 > 网络 > 云计算 > 正文

FlinkkeyBy分流与Window操作解析

2018-04-11 04:20:13         来源:whr_yy的博客  
收藏   我要投稿

FlinkkeyBy分流与Window操作解析。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.CachingTokenFilter;
import redis.clients.jedis.JedisCluster;
public class MySelfSourceTest01 {
    static Logger logger = Logger.getLogger(MySelfSourceTest01.class);
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource> redisDS = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                Random random = new Random();
                while (true) {
                    ctx.collect(new Tuple2( "1", "A"));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {

            }
        });
        DataStreamSource> redisDSB = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2("2", "B"));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {

            }
        });
        DataStreamSource> redisDSC = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2("3", "C"));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        DataStream> union = redisDS.union(redisDSB).union(redisDSC);
        KeyedStream, Integer> tuple2IntegerKeyedStream = union.keyBy(new KeySelector, Integer>() {
            @Override
            public Integer getKey(Tuple2 value) throws Exception {
                return Integer.parseInt(value.f0) % 1;
            }
        });
        tuple2IntegerKeyedStream.timeWindow(Time.seconds(2)).apply(new WindowFunction, String, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow window, Iterable> input, Collector out) throws Exception {
                StringBuffer stringBuffer = new StringBuffer();
                input.forEach(t -> {
                    stringBuffer.append(t.toString()).append("  ");
                });
                System.out.println(stringBuffer.toString());
                System.out.println();
            }
        });


//                .timeWindow(Time.seconds(2)).apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() {
//            @Override
//            public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector> out) throws Exception {
//                StringBuffer stringBuffer = new StringBuffer();
//                input.forEach(t -> {
//                    stringBuffer.append(t.toString()).append("  ");
//                });
//                System.out.println(stringBuffer.toString());
//                System.out.println();
//            }
//        });
//        union.timeWindowAll(Time.seconds(2)).apply(new AllWindowFunction, String, TimeWindow>() {
//            @Override
//            public void apply(TimeWindow window, Iterable> values, Collector out) throws Exception {
//                StringBuffer stringBuffer = new StringBuffer();
//                values.forEach(t -> {
//                    stringBuffer.append(t.toString()).append("  ");
//                });
//                System.out.println(stringBuffer.toString());
//                System.out.println();
//            }
//        });


        try {
            env.execute("a");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

keyBy分流是按选择的key进行分流的,如果直接使用timeWindowAll只是一个分流,并行度就是1,如果keyBy使用自定义的key分流,如例,key等于1 2 3 在keyBy中可以进行取模分流,这样有可能会产生分流倾斜。

上一篇:azkabanupload报错:Errorinitializingprojectid:4version:2
下一篇:大数据技术学习笔记之网站流量日志分析项目:数据采集层的实现
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站