卢安娜的飓风 - Flink 实时维度表Join到达实时设置装备摆设更新的体例集合
布景
在我们日常工做中,经常会有一些实时的需求,那些需求往往都是一些拉宽的需求,为了给实时数仓来停止OLAP对来停止Ad-hoc查询。
但是我们工做中一些维度表的数据是会发作变革的,可能是迟缓变革维度,那么那个时候就需要停止flink毗连其他数据源来停止查询,那个时候我们必定能够想到就是来一条查一次,那个是必定能够做到的。
但是在大数据场景下,我们是不是会觉得有点慢呢,我们能否有更好的处理计划,就像我写代码的时候 有时候就会思虑有没有更好的处理计划,但是针关于要停止交付给用户,所以我们并没有那么多的时间停止思虑来停止,因为产物不断都在催你哦。
那么我们就来看看有几种处理计划。
假设上图 是一个实时架构图,当然我们公司已经引入了clickhouse 实时数仓那些已经不是我们所逃求的了,但是其实不阻碍我们的需求,下面我们就来看一下数据。
{"dt":"2019-11-19 20:33:39","countryCode":"TW","data": [{"type":"s1","score":0.8,"level":"D"},{"type":"s2","score":0.1,"level":"B"}]} {"dt":"2019-11-19 20:33:41","countryCode":"KW","data": [{"type":"s2","score":0.2,"level":"A"},{"type":"s1","score":0.2,"level":"D"}]} {"dt":"2019-11-19 20:33:43","countryCode":"HK","data": [{"type":"s5","score":0.5,"level":"C"},{"type":"s2","score":0.8,"level":"B"}]} {"dt":"2019-11-19 20:33:39","countryCode":"TW","data": [{"type":"s1","score":0.8,"level":"D"},{"type":"s2","score":0.1,"level":"B"}]}当然之上是我们的模仿数据,接下来我们看看 营业人员需要什么数据
"dt":"2019-11-19 20:33:39","countryCode":"AREA_CT","type":"s1","score":0.8,"level":"D" "dt":"2019-11-19 20:33:39","countryCode":"AREA_CT","type":"s2","score":0.1,"level":"B"那么那个时候我们能够发现了,其实就是把国度 换成大区,如许入仓之后能够停止 大区的olap实时的一些阐发,例照实时的绩效查核等。还有一些营销活动等,我们就不细细考量了,因为究竟结果都是假数据。
那么我们看到原始数据和成果数据,我们发现,是停止了拆解,例如 一笔记录中带有多个 type 也就是曲播平台,但是成果数据拆成了两个,那个不是udtf吗?同时将国度编码转化为大区编码,那么我们那时候假定大区编码会有变革,因为组织的重构问题,或者组织的架构演进等,那么我们思虑一下有几种处理计划呢。
处理计划 间接查库按时更新 static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{ private transient ConcurrentHashMap<String, String> hashMap = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); System.out.println("更新缓存"); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("更新缓存"); Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); } }, 0, 3, TimeUnit.SECONDS); } @Override public void flatMap(String s, Collector<OutData> collector) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> data = originData.data; String dt = originData.dt; String coutryCode = hashMap.get(countryCode); for (Data datum : data) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); } } } 异步IO static class SimpaleAsyncIoFunction extends RichAsyncFunction<String,OutData> { private transient RedisClient redisClient; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); super.open(parameters); RedisOptions config = new RedisOptions(); config.setHost("hadoop01"); config.setPort(6379); VertxOptions vo = new VertxOptions(); vo.setEventLoopPoolSize(10); vo.setWorkerPoolSize(20); Vertx vertx = Vertx.vertx(vo); redisClient = RedisClient.create(vertx, config); } @Override public void close() throws Exception { super.close(); super.close(); if(redisClient!=null){ redisClient.close(null); } } @Override public void asyncInvoke(String s, ResultFuture<OutData> resultFuture) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; redisClient.hscan("areas", "0", ScanOptions.NONE, new Handler<AsyncResult<JsonArray>>() { @Override public void handle(AsyncResult<JsonArray> result) { if (result.succeeded()){ JsonArray result1 = result.result(); if (result1 == null){ resultFuture.complete(null); return; } JsonArray jsonArray = result1.getJsonArray(1); // ["AREA_US","US","AREA_CT","TW,HK","AREA_AR","PK,KW,SA,XX","AREA_IN","IN"] HashMap<String,String> ss = new HashMap<>(); ArrayList<String> keys = new ArrayList<>(); ArrayList<String> values = new ArrayList<>(); for (int i = 0; i <jsonArray.size() ; i++) { if (i % 2 == 0){ keys.add(jsonArray.getString(i)); }else { values.add(jsonArray.getString(i)); } } for (int i = 0; i < keys.size(); i++) { String s1 = keys.get(i); String s2 = values.get(i); String[] split = s2.split(","); for (String s3 : split) { ss.put(s3,s1); } } String dt = originData.dt; String country = ss.get(countryCode); for (Data datum : originData.data) { OutData outData = OutData.of(dt, country, datum.type, datum.score, datum.level); resultFuture.complete(Collections.singleton(outData)); } } else if(result.failed()){ resultFuture.complete(null); return; } } }); } } Broadcast的体例 static class BroadcastSourceFunction extends RichSourceFunction<String>{ @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true){ Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); HashMap<String, String> hashMap = new HashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s,key); } } sourceContext.collect(JSON.toJSONString(hashMap)); jedisCluster.close(); TimeUnit.SECONDS.sleep(3); } } @Override public void cancel() { } } 异步io连系Cache我相信列位会了根底的 那个很简单也就不写了,有点累了,有时间能够补上。
无非就是造定缓存裁减算法,然后缓存有 就拿缓存的,没有就异步去redis拿罢了。
完好代码 package com.bigdata.dim.join; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.json.JsonArray; import io.vertx.redis.RedisClient; import io.vertx.redis.RedisOptions; import io.vertx.redis.op.ScanOptions; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.ScanResult; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Copyright (c) 2019 bigdata ALL Rights Reserved * Project: learning * Package: com.bigdata.dim.join * Version: 1.0 * * @author qingzhi.wu 2020/11/8 11:12 */ public class Main { private static final int RESTART_ATTEMPTS = 5; private static final int RESTART_INTERVAL = 20; private static Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置Stage战略 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(5000L); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.setCheckpointTimeout(100000L); checkpointConfig.setFailOnCheckpointingErrors(true); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //测试情况不需要设置 backend // FsStateBackend fsStateBackend = new FsStateBackend(CheckpointUtils.getCheckpointDir()); // env.setStateBackend(fsStateBackend); // 延迟时间间隔 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( RESTART_ATTEMPTS, // 测验考试重启次数 org.apache.flink.api.common.time.Time.of(RESTART_INTERVAL, TimeUnit.SECONDS) )); //自定义source 生成数据 DataStreamSource<String> dataStreamSource = env.addSource(new DataSource()); //1、接纳间接用redis的体例 SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.flatMap(new SimpleFlatMapFunction()); //2.asynio // SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = // AsyncDataStream.unorderedWait(dataStreamSource, new SimpaleAsyncIoFunction(), 2000, TimeUnit.MILLISECONDS); // final MapStateDescriptor<String, String> broadcastDes = new MapStateDescriptor<>( "broadcast", String.class, String.class ); BroadcastStream<String> broadcast = env.addSource(new BroadcastSourceFunction()).broadcast(broadcastDes); SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.connect(broadcast).process(new BroadcastProcessFunction<String, String, OutData>() { @Override public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<OutData> collector) throws Exception { ReadOnlyBroadcastState<String, String> broadcastState = readOnlyContext.getBroadcastState(broadcastDes); String broadcastState1 = broadcastState.get("broadcastState"); HashMap<String,String> data = JSONObject.parseObject(broadcastState1, HashMap.class); OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> datas = originData.data; String dt = originData.dt; String coutryCode = data.get(countryCode); for (Data datum : datas) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); } } @Override public void processBroadcastElement(String s , Context context, Collector<OutData> collector) throws Exception { BroadcastState<String, String> broadcastState = context.getBroadcastState(broadcastDes); broadcastState.remove("broadcastState"); broadcastState.put("broadcastState",s); } }); SingleOutputStreamOperator<String> map = outDataSingleOutputStreamOperator.map(new MapFunction<OutData, String>() { @Override public String map(OutData outData) throws Exception { return JSON.toJSONString(outData); } }); map.print(); env.execute(); } static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{ private transient ConcurrentHashMap<String, String> hashMap = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); System.out.println("更新缓存"); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("更新缓存"); Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); } }, 0, 3, TimeUnit.SECONDS); } @Override public void flatMap(String s, Collector<OutData> collector) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> data = originData.data; String dt = originData.dt; String coutryCode = hashMap.get(countryCode); for (Data datum : data) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); } } } static class SimpaleAsyncIoFunction extends RichAsyncFunction<String,OutData> { private transient RedisClient redisClient; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); super.open(parameters); RedisOptions config = new RedisOptions(); config.setHost("hadoop01"); config.setPort(6379); VertxOptions vo = new VertxOptions(); vo.setEventLoopPoolSize(10); vo.setWorkerPoolSize(20); Vertx vertx = Vertx.vertx(vo); redisClient = RedisClient.create(vertx, config); } @Override public void close() throws Exception { super.close(); super.close(); if(redisClient!=null){ redisClient.close(null); } } @Override public void asyncInvoke(String s, ResultFuture<OutData> resultFuture) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; redisClient.hscan("areas", "0", ScanOptions.NONE, new Handler<AsyncResult<JsonArray>>() { @Override public void handle(AsyncResult<JsonArray> result) { if (result.succeeded()){ JsonArray result1 = result.result(); if (result1 == null){ resultFuture.complete(null); return; } JsonArray jsonArray = result1.getJsonArray(1); // ["AREA_US","US","AREA_CT","TW,HK","AREA_AR","PK,KW,SA,XX","AREA_IN","IN"] HashMap<String,String> ss = new HashMap<>(); ArrayList<String> keys = new ArrayList<>(); ArrayList<String> values = new ArrayList<>(); for (int i = 0; i <jsonArray.size() ; i++) { if (i % 2 == 0){ keys.add(jsonArray.getString(i)); }else { values.add(jsonArray.getString(i)); } } for (int i = 0; i < keys.size(); i++) { String s1 = keys.get(i); String s2 = values.get(i); String[] split = s2.split(","); for (String s3 : split) { ss.put(s3,s1); } } String dt = originData.dt; String country = ss.get(countryCode); for (Data datum : originData.data) { OutData outData = OutData.of(dt, country, datum.type, datum.score, datum.level); resultFuture.complete(Collections.singleton(outData)); } } else if(result.failed()){ resultFuture.complete(null); return; } } }); } } static class BroadcastSourceFunction extends RichSourceFunction<String>{ @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true){ Jedis jedisCluster = RedisFactory.getJedisCluster(); ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); HashMap<String, String> hashMap = new HashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s,key); } } sourceContext.collect(JSON.toJSONString(hashMap)); jedisCluster.close(); TimeUnit.SECONDS.sleep(3); } } @Override public void cancel() { } } static class RedisFactory { private static Jedis jedisCluster = null; private RedisFactory() { } public static Jedis getJedisCluster() { jedisCluster = new Jedis(new HostAndPort("hadoop01", Integer.parseInt("6379"))); return jedisCluster; } } static class OriginData { public String dt; public String countryCode; public ArrayList<Data> data; public OriginData() { } public OriginData(String dt, String countryCode, ArrayList<Data> data) { this.dt = dt; this.countryCode = countryCode; this.data = data; } public static OriginData of(String dt, String countryCode, ArrayList<Data> data) { return new OriginData(dt, countryCode, data); } } static class Data { public String type; public Double score; public String level; public Data() { } public Data(String type, Double score, String level) { this.type = type; this.score = score; this.level = level; } public static Data of(String type, Double score, String level) { return new Data(type, score, level); } } static class OutData { public String dt; public String countryCode; public String type; public Double score; public String level; public OutData() { } public OutData(String dt, String countryCode, String type, Double score, String level) { this.dt = dt; this.countryCode = countryCode; this.type = type; this.score = score; this.level = level; } public static OutData of(String dt, String countryCode, String type, Double score, String level) { return new OutData(dt, countryCode, type, score, level); } } static class DataSource extends RichSourceFunction<String> { private static final String YYYYMMDDHHMMSS = "yyyy-MM-dd HH:mm:ss"; private static Random random = new Random(); private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYYMMDDHHMMSS); private static String[] countryCodes = {"US","TW","HK","PK","KW","SA","XX","IN"}; private static String[] users = {"s1","s2","s3","s4","s5","s6","s7","s8","s9","s10","s11","s12","s13","s14","s15","s16"}; private static String[] levels = {"A","B","C","D"}; @Override public void run(SourceContext<String> sourceContext) throws Exception { while (true){ int i = random.nextInt(4); long time = System.currentTimeMillis()+ 1000*i; String resDate = simpleDateFormat.format(time); i = random.nextInt(users.length); String user1 = users[i]; Double score1 = Double.valueOf(String.format("%.1f", random.nextDouble())); String countCode1 = countryCodes[i%countryCodes.length]; String level1 = levels[i%levels.length]; i = random.nextInt(users.length); String user2 = users[i]; String countCode2 = countCode1; String level2 = levels[i%levels.length]; Double score2 = Double.valueOf(String.format("%.1f", random.nextDouble())); Data data1 = Data.of(user1, score1, level1); Data data2 = Data.of(user2, score2, level2); ArrayList<Data> datas = new ArrayList<>(); datas.add(data1); datas.add(data2); OriginData originData = OriginData.of(resDate, countCode1, datas); String s = JSON.toJSONString(originData); sourceContext.collect(s); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { } } }欢送存眷《大数据成神之路》
0