Flink 与rabbitmq集成 并开启checkpoint

作者&投稿:关学 (若有异议请与网页底部的电邮联系)
~ 如果不开启checkpoint机制,flink job 在运行时如果遇到异常整个job就会停止。

如果开启了checkpoint机制,就会根据恢复点进行数据重试,这个是一个非常复杂的机制,需要单独的文章进行解析。

所以开启checkpoint是必然要做的配置。

在与rabbitmq集成时有个点必须要注意,就是mq 发送消息时候,必须要带上CorrelationId。我们看一下flink的官方文档。

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointInterval(10000);

DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig,

" kgraph",true,new SimpleStringSchema()) );

dataStreamSource.print();

上面是构造RMQSource(…)的参数,如下

queueName: The RabbitMQ queue name.

usesCorrelationId : true when correlation ids should be used, false otherwise (default is false).

deserializationScehma : Deserialization schema to turn messages into Java objects.

根据参数不同,有如下三种模式

Exactly-once (when checkpointed) with RabbitMQ transactions and messages with unique correlation IDs.

At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism (correlation id is not set).

No strong delivery guarantees ( without checkpointing) with RabbitMQ auto-commit mode.

那么 开启 exactly-once 确保消费一次的特性,就必须在传递 mq消息的时候带上 correlationId。

correlation Id 是 mq 消息的一个基本属性,可以用来标识消息的唯一id,通常是mq实现rpc调用时使用,flink 利用其唯一id的特性来做 exactly once的消费。所以在发送mq消息时 加上 correlation_id 的properties 就不会有问题了。

如果使用 spring 结合 rabbitmq 作为客户端,需要对 correlationId 做一个特别的处理

就是需要自己手动设置correaltionId, rabbittemplate 没有自动的封装这个属性, convertAndSend这个方法非常让人confuse,

里面支持传入correlationData字段,但是这个是写入到消息头的,而不是correlation_id,flink那边永远无法读取到。

public void sendMsg(KgraphUpdateMessage kgraphUpdateMessage)

{

CorrelationData correlationId =new CorrelationData(UUID.randomUUID().toString());

ObjectMapper jsonReader =new ObjectMapper();

try

    {

MessageProperties properties =new MessageProperties();

properties.setCorrelationId(correlationId.getId().getBytes());

Message message =new Message(jsonReader.writeValueAsBytes(kgraphUpdateMessage), properties);

rabbitTemplate. convertAndSend (KgraphMqConfig.KGRAPH_EXCHANGE, KgraphMqConfig.KGRAPH_TOPIC_EVENT, message,correlationId);

}catch (JsonProcessingException e)

{

e.printStackTrace();

}

}


永宁县18036364447: Spring整合RabbitMQ 一直收到重复消息怎么处理 -
永咽安宫: 下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制.客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了.假如客户端在发送ack之前意外死掉...

永宁县18036364447: 怎样连接虚拟机中的rabbitmq -
永咽安宫: 2.1安装MQ2.2修改rabbitMQ的配置文件2.3开启后台管理插件:2.4 开启对外端口两个,一个时服务中心的,一个是程序访问的(或者直接关闭防火墙):2.5启动服务器

永宁县18036364447: 如何在windows下突破Rabbitmq的socket限制 -
永咽安宫: 如何在windows下突破Rabbitmq的socket限制 利用Rabbitmq进行各类业务系统(如SAP、OA、EHR、KMS、访客系统、AD等)的集成和接口,已有很长一段时间了,初步建立了企业服务总线(ESB).随着新业务系统的不断接入到ESB中,...

永宁县18036364447: rabbitmq与kafka到底用哪个好 -
永咽安宫: Kafka和RabbitMq一样是通用意图消息代理,他们都是以分布式部署为目的.但是他们对消息语义模型的定义的假设是非常不同的.我对"AMQP 更成熟"这个论点是持怀疑态度的.让我们用事实说话来看看用什么解决方案来解决你的问题....

永宁县18036364447: win 7 rabbitmq 需要安装哪些
永咽安宫: 安装 1、Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang. 注意: 1.现在先别装最新的 3.6.3 ,本人在安装完最新的版本,queue 队列有问题,降到了 3.6.2 就解决了. 2.默认安装的Rabbit MQ 监听端口是...

永宁县18036364447: 如何连接 本地 rabbitmq 服务 -
永咽安宫: 1、安装 在Mac下安装RabbitMQ是非常简单的,一般默认复RabbitMQ服务器依赖的Erlang已经安装,只需要用下面两个命令就可以完成RabbitMQ的安装(前提是homebrew已经被安装):?brew update brew install rabbitmq 安装完成后需要将/...

永宁县18036364447: java 怎么通过rabbitmq 实现多channel -
永咽安宫: 展开全部1; version> org.springframework.amqp:pom文件需要加入spring集成rabbitMq的依赖,都测试通过本文代码样例都是在spring集成环境下写的.6;groupId> spring-rabbit

永宁县18036364447: 怎么在Ubuntu linux系统上安装和使用RabbitMQ -
永咽安宫: 首先保证软件包正确,然后解压后依次按下面步骤执行: 打开一个终端,su -成root用户 1>su root 正确切换到root用户,然后进入软件包解压路径 2>cd 软件解压包路径 执行软件解压包路径里的软件安装脚本 3> ./安装脚本

永宁县18036364447: 安装rabbitmq一定要安装erlang吗 -
永咽安宫: 在Mac下安装RabbitMQ是非常简单的,一般默认RabbitMQ服务器依赖的Erlang已经安装

永宁县18036364447: linux中rabbitMQ怎么设置占用内存大小 -
永咽安宫: 在linux中,程序的加载,涉及到两个工具,linker 和loader.Linker主要涉及动态链接库的使用,loader主要涉及软件的加载. exec执行一个程序 elf为现在非常流行的可执行文件的格式,它为程序运行划分了两个段,一个段是可以执行的代码段...

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网