您现在的位置:首页 >> 选购知识

大数据职业培训Flink 常见的维表 Join 方案

发布时间:2025/10/24 12:17    来源:苍南家居装修网

ql,redis或者hbase)的相连,利用api 通过orderId查找

// 最后积体电路资料格式 就是join所得

collector.collect(null)

}

}

}

.addSink(

new FlinkKafkaProducer[String](

"",

"",

new SimpleStringSchema()

env.execute("")

异步

AsyncIO 可以即刻地由此可知决问题多个允诺,很小相对上减少了对 subtask 线程的阻塞。

def analyses(): Unit ={

val env: StreamExecutionEnvironment = FlinkStreamEnv.get()

val source: DataStream[String] = KafkaSourceEnv.getKafkaSourceStream(env, List("test"))

.map(JSON.parseObject(_))

.filter(_ != null)

.flatMap(

new FlatMapFunction[JSONObject, String] {

override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = {

// 如果topic就一张请注意,须要区分,如果多张请注意,可以通过database 与 table 区分,放上下一步去由此可知决问题

// 请注意的人名

val databaseName: String = jSONObject.getString("database")

// 请注意的人名

val tableName: String = jSONObject.getString("table")

// 资料操作多种类型 INSERT UPDATE DELETE

val operationType: String = jSONObject.getString("type")

// ;大体资料

val tableData: JSONArray = jSONObject.getJSONArray("data")

// old 数值

val old: JSONArray = jSONObject.getJSONArray("old")

// canal json 可能普遍存在预由此可知决问题出现data资料多条

for (i

val data: String = tableData.get(i).toString

collector.collect(data)

}

}

}

AsyncDataStream.unorderedWait(

source,

new RichAsyncFunction[String,String] {//自表述的资料源异步由此可知决问题类

override def open(parameters: Configuration): Unit = {

// 初始化

}

override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {

// 将资料搜集

resultFuture.complete(null)

}

override def close(): Unit = {

// 关闭

}

},

1000,//异步超时等待时间

TimeUnit.MILLISECONDS,//等待时间单位

100)//仅次于异步即刻允诺数量

.addSink(

new FlinkKafkaProducer[String](

"",

"",

new SimpleStringSchema()

env.execute("")

}

稳定状态脚本语言,预查找资料到稳定状态当中,按需取

首先把维请注意资料初始化到state当中,所设好改版等待时间,定时去把维请注意。

特性:flink 自己维护稳定状态资料,"荣辱与共",不只能频繁关键字结构性资料源,达到由此可知微。

缺点:不适宜大的维请注意和波动大的维请注意。

.keyBy(_._1)

.process(

new KeyedProcessFunction[String,(String,String,String,String,String), String]{

private var mapState:MapState[String,Map[String,String]] = _

private var first: Boolean = true

override def open(parameters: Configuration): Unit = {

val config: StateTtlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.minutes(5))

.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build()

val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]])

join.enableTimeToLive(config)

mapState = getRuntimeContext.getMapState(join)

}

override def processElement(

in: (String, String, String, String, String),

context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context,

collector: Collector[String]): Unit = {

// 查找维请注意

if(first){

first = false

val time: Long = System.currentTimeMillis()

getSmallDimTableInfo()

// 所设好改版等待时间,定时去把维请注意

context.timerService().registerProcessingTimeTimer(time + 86400000)

}

// 应用软件,过来一条条资料,然后按照自己的业务部门逻辑说是维请注意的资料方可

// 然后积体电路 放上collect当中

collector.collect(null)

}

override def onTimer(

timestamp: Long,

ctx: KeyedProcessFunction[String, (String, String, String, String, String),String]#OnTimerContext,

out: Collector[String]): Unit = {

println("触发器执讫")

mapState.clear()

getSmallDimTableInfo()

println(mapState)

ctx.timerService().registerProcessingTimeTimer(timestamp + 86400000)

}

def getSmallDimTableInfo(): Unit ={

// 查找 词典资料

val select_dictionary="select dic_code,pre_dictionary_id,dic_name from xxxx"

val dictionary: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(select_dictionary, null)

dictionary.foreach(item=>{

mapState.put("dic_dictionary_"+item.get("pre_dictionary_id").toString,item)

})

}

}

.filter(_!=null)

.addSink(

new FlinkKafkaProducer[String](

"",

"",

new SimpleStringSchema()

env.execute("")

思考下:必要表述一个Map集合这样的优缺点是什么?可以留言说谎自己的看法?

和气热资料

学说:先去稳定状态说是,如果没有,去结构性查询,同时去存到稳定状态里面。StateTtlConfig 的过期等待时间可以所设短点。

特性:当中庸取数值由此可知决方案,热备常用资料到寄存器,也避免了资料join相对极少结构性资料源。

缺点:也不能关基团在于由此可知决某些问题,热备资料极少,【追捧尚能硅谷,轻松习IT】或者和气资料过大,都才会对state 或者 结构性资料库造成担忧。

.filter(_._1 != null)

.keyBy(_._1)

.process(

new KeyedProcessFunction[String,(String,String,String,String,String), String]{

private var mapState:MapState[String,Map[String,String]] = _

private var first: Boolean = true

override def open(parameters: Configuration): Unit = {

val config: StateTtlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.days(1))

.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build()

val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]])

join.enableTimeToLive(config)

mapState = getRuntimeContext.getMapState(join)

}

override def processElement(

in: (String, String, String, String, String),

context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context,

collector: Collector[String]): Unit = {

// 应用软件,过来一条条资料,然后按照自己的业务部门逻辑先去mapState去找,如果没有再去 结构性去找

if (mapState.contains("xx_id")){

// 如果普遍存在就取

}else{

// 如果不普遍存在去结构性拿,然后放上mapState当中

val dim_sql="select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id"

val dim: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(dim_sql, null)

mapState.put("xx_id",null)

}

// 然后积体电路 放上collect当中

collector.collect(null)

}

}

电视广播维请注意

比如纸片提到的词典请注意,每一个Task都只能这份资料,那么只能join这份资料的时候就可以用作电视广播维请注意。

val dimStream=env.addSource(MysqlSource)

// 电视广播稳定状态

val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcaststate", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class))

// 电视广播源

val broadStream=dimStream.broadcast()

// ;大资料源

val mainConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaConfig)

val mainStream=env.addSource(mainConsumer)

// 电视广播稳定状态与阶数请注意联合点

val connectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1)

connectedStream.process(new KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String] {

override def processElement(value: User, ctx: KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String]#ReadOnlyContext, out: Collector[String]): Unit = {

// 取到资料就可以美好的玩耍了

val state=ctx.getBroadcastState(broadcastStateDesc)

xxxxxx

}

}

「思考:」 如果把维请注意源也通过实时监控binlog到kafka,当阶数资料发生波动时,改版放上稳定状态当中,这种方式则,究竟更加具有一般性呢?

(1)通过canal把修改binlog方式则发送到kafka当中。

(2)资料源表述成为电视广播源,电视广播到资料到;大资料源当中。

(3)表述一个电视广播稳定状态存储器资料,在;大资料进讫查找匹配,符合要求则join成功。

Temporal Table Join(FlinkSQL与Flink Table API)

由于维请注意是一张迅速波动的请注意(线性请注意只是动态请注意的一种特例)。那如何 JOIN 一张迅速波动的请注意呢?如果用传统的 JOIN 字词来请注意达维请注意 JOIN,是不完整的。因为维请注意是一直在改版波动的,如果用这个字词那么联合点上的是哪个当下的维请注意呢?【追捧尚能硅谷,轻松习IT】我们是不知道的,结果是不确定的。所以 Flink SQL 的维请注意 JOIN 字词引入了Temporal Table 的标准字词,用来声明联合点的是维请注意哪个当下的null。

普通联合点才会一直保留联合点双侧的资料,资料也就才会一直增大,直到翻爆寄存器避免任务失败,Temporal Join则可以每星期清理过期资料,在合理的寄存器配置下方可避免寄存器溢出。

Event Time Temporal Join

字词

SELECT [column_list]

FROM table1 [AS ]

[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ]

ON table1.column-name1 = table2.column-name1

用作事件等待时间属性(即讫等待时间属性),可以参考资料依然某个等待时间点的基团数值。这受限制在一个联合的等待时间点相连两个请注意。

简而言之

假设我们有一个订单请注意,每个订单都有各有不同债券的价格。为了将此请注意正确地规范化为单一债券,每个订单都只能与下订单时的适当债券兑换率借助于。

CREATE TABLE orders (

order_id STRING,

price DECIMAL(32,2),

currency STRING,

order_time TIMESTAMP(3),

WATERMARK FOR order_time AS order_time

) WITH (/* ... */);

CREATE TABLE currency_rates (

currency STRING,

conversion_rate DECIMAL(32, 2),

update_time TIMESTAMP(3) METADATA FROM 于大values.source.timestamp于大 VIRTUAL

WATERMARK FOR update_time AS update_time,

PRIMARY KEY(currency) NOT ENFORCED

) WITH (

'connector' = 'upsert-kafka',

/* ... */

);

event-time temporal join只能temporal join前提条件的等价前提条件当中包含的;大基团

SELECT

order_id,

price,

currency,

conversion_rate,

order_time,

FROM orders

LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time

ON orders.currency = currency_rates.currency

Processing Time Temporal Join

由此可知决问题等待时间时态请注意相连用作由此可知决问题等待时间属性将讫与结构性新版本请注意当中基团的近期新版本相联合点。

根据表述,用作processing-time属性,相连将始终返回给定基团的近期数值。可以将查找请注意视为是一个直观的HashMap,它存储器来自借助于端的所有记录下来。这种相连的强劲之处在于,当在Flink当中能够将请注意具体化为动态请注意时,它受限制Flink必要针对结构性系统工作。

用作FOR SYSTEM_TIME AS OF table1.proctime请注意示当左边请注意的记录下来与右边的维请注意join时,只匹配也就是说由此可知决问题等待时间维请注意所相异的的null资料。

Lookup Table Join

Lookup Join 通常常用通过相连结构性请注意(阶数请注意)补充反馈,要求一个请注意具有由此可知决问题等待时间属性,另一个请注意使 Lookup Source Connector。

JDBC 相连器可以用在时态请注意联合点当中作为一个可 lookup 的 source (又被称作维请注意)。中用的字词是 Temporal Joins 的字词。

s"""

CREATE TABLE users(

id int,

name string,

PRIMARY KEY (id) NOT ENFORCED

WITH (

'connector' = 'jdbc',

'url' = 'xxxx',

'driver'='$DRIVER_CLASS_NAME',

'table-name'='$tableName',

'lookup.cache.max-rows'='100',

'lookup.cache.ttl'='30s'

""".stripMargin

s"""

CREATE TABLE car(

于大id于大 bigint ,

于大user_id于大 bigint,

于大proctime于大 as PROCTIME()

WITH (

'connector' = 'kafka',

'topic' = '$topic',

'scan.startup.mode' = 'latest-offset',

'properties.bootstrap.servers' = '$KAFKA_SERVICE',

'properties.group.id' = 'indicator',

'format' = 'canal-json'

""".stripMargin

SELECT

mc.user_id user_id,

count(1) AS 于大value于大

FROM car mc

inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id

group by mc.user_id

阐述

总体来讲,联合点维请注意有四个基础的方式则:

(1)查找结构性资料源联合点

(2)预查找维请注意联合点(寄存器,稳定状态)

(3)和气热资料储备(不算1和2的结合用作)

(4)维请注意修改日志联合点(电视广播也好,其他方式则的源联合点也好)

「同时考虑:」 吞吐量,一般性,结构性资料源的扭矩,寄存器教育资源,由此可知微性等等总体。

四种join方式则不普遍存在也就是说的关基团在于,更加多的是针对业务部门场景在各指标上的各种因素取舍,因此看官只能结合场景来选取适宜的。

推荐阅读:

大资料开发之Flink sql 的基础用法

大资料职业培训面试Flink八股文个人

大资料Flink职业培训:维请注意Join/双源Join的方法

大资料职业培训Flink 源码由此可知出Async IO

广州男科病治疗费用
轻微中暑头疼怎么缓解
成都看甲状腺哪里比较好
银川前列腺炎治疗哪家好
南昌看男科去哪家医院
肾病内科
孩子嗓子痒总咳嗽怎么办
健脾胃小孩吃什么健脾
心率失常
肚子疼拉肚子

上一篇: 浙江大学回应停用知网:近千万续订费太贵 将用万方和维普替代

下一篇: 罗永浩回应无勇气再做手机引热议:复出科技界要做AR产品

友情链接