为什么使用kafka处理mysql binlog

作者&投稿:乐军 (若有异议请与网页底部的电邮联系)
为什么有的binlog文件不在binlog index中~

这个你可以看配置文件 启用了才有这样的记录默认是没有的 /etc/my.conf log-bin = mysqlbin 一般放在/var/lib/mysql 比如上面的设置重启数据库会生成mysqlbin.000001文件

mysql binlog 了解一下
数据产生--> kafka

在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果如果本文有讲述不详细,或者错误指出,肯请指出,谢谢对于 binlog 数据,每一次操作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)在数据库中,有 id, name 两个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下idnameage
  1john30
  2john40
  那么你进行操作
  update table set age = 50 where name = john的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。
  下面,我们给出具体的代码,然后对代码进行分析def desirializeByte(b: (String, Array[Byte])) : (String, String) = {val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录val pkeys = binlogEntry.getPrimaryKeys.asScala //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 Listval rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //获取具体的信息val strRowDatas = rowDatas.map(a => { //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化val b = a.getBeforeColumns.asScala //获取 beforColumnsval c = a.getAfterColumns.asScala //获取 afterColumnsval mb = b.map(d => (d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值val mc = c.map(c => (c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值(mb, mc) //返回转换后的 beforeColumns 和 afterColumns})
  //下面利用 json4s 进行 Json 化
  (binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{w => List("row_data" -> ("before" -> w._1.toMap) ~ ("after" -> w._2.toMap)) //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,//w._1 按理是 Map类型,为什么还需要强制转换成 Map//而且用 strRowDatas.foreach(x => println(s"${x._1} ${x._2}")打印的结果表名是 Map}))
  desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。
  BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。
  第 4 行,我们得到表对应的主键,第 5 行获得具体的数据第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。
  第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap 操作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。
  利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的){"rowdata":
  [{"row_data":
  {"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}
  }
  }]
  }"
  到这里,基本就完成了一种将 binlog 数据 Json 化的代码。
  附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。
  public static BinlogEntryserializeToBean(byte[] input) {BinlogEntrybinlogEntry = null;
  Entryentry = deserializeFromProtoBuf(input);//从 protobuf 反序列化if(entry != null) {
  binlogEntry = serializeToBean(entry);
  }
  return binlogEntry;
  }
  public static EntrydeserializeFromProtoBuf(byte[] input) {Entryentry = null;
  try {
  entry = Entry.parseFrom(input);
  //com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成} catch (InvalidProtocolBufferExceptionvar3) {logger.error("Exception:" + var3);
  }
  return entry;
  }
  //将 Entry 解析为一个 bean 类
  public static BinlogEntryserializeToBean(Entryentry) {RowChangerowChange = null;
  try {
  rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exceptionvar8) {
  throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);}
  BinlogEntrybinlogEntry = new BinlogEntry();String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");String logFileNo = "000000";
  if(logFileNames.length > 1) {
  logFileNo = logFileNames[1];
  }
  binlogEntry.setBinlogFileName(logFileNo);binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());binlogEntry.setTableName(entry.getHeader().getTableName());binlogEntry.setEventType(entry.getHeader().getEventType().toString());IteratorprimaryKeysList = rowChange.getRowDatasList().iterator();while(primaryKeysList.hasNext()) {
  RowDatarowData = (RowData)primaryKeysList.next();BinlogRowrow = new BinlogRow(binlogEntry.getEventType());row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));binlogEntry.addRowData(row);
  }
  if(binlogEntry.getRowDatas().size() >= 1) {BinlogRowprimaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));} else {
  ArrayListprimaryKeysList2 = new ArrayList();binlogEntry.setPrimaryKeys(primaryKeysList2);}
  return binlogEntry;
  }
  public class BinlogEntry implements Serializable {private String binlogFileName;
  private long binlogOffset;
  private long executeTime;
  private String tableName;
  private String eventType;
  private List<String> primaryKeys;
  private List<BinlogRow> rowDatas = new ArrayList();}
  public class BinlogRow implements Serializable {public static final String EVENT_TYPE_INSERT = "INSERT";public static final String EVENT_TYPE_UPDATE = "UPDATE";public static final String EVENT_TYPE_DELETE = "DELETE";private String eventType;
  private Map<String, BinlogColumn> beforeColumns;private Map<String, BinlogColumn> afterColumns;}
  public class BinlogColumn implements Serializable {private int index;
  private String mysqlType;
  private String name;
  private boolean isKey;
  private boolean updated;
  private boolean isNull;
  private String value;
  }


为什么要用kafka?kafka适用什么样的场景?
使用kafka的理由:1.分布式,高吞吐量,速度快(kafka是直接通过磁盘存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的对象创建和垃圾回收)2.同时支持实时和离线两种解决方案(相信很多项目都有类似的需求,这也是Linkedin的官方架构,我们是一部分数据通过storm做实时计算处理...

Kafka在大数据环境中如何应用呢?
Kafka是一个分布式的、高吞吐的、基于发布\/订阅的消息系统。利用kafka技术可以在廉价PC Server上搭建起大规模的消息系统。Kafka具有消息持久化、高吞吐、分布式、实时、低耦合、多客户端支持、数据可靠等诸多特点,适合在线和离线的消息处理。互联网关采集到变化的路由信息,通过kafka的producer将归集后的信息...

Python kafka 主流库的使用
对于kafka-python,尽管它提供了Python风格的消费者迭代器,但存在可能导致死锁的bug,因此在生产环境中推荐使用confluent-kafka-python。综上,confluent-kafka-python是生产环境的理想选择,而kafka-python更适合作为开发和测试工具。

Kafka使用场景
Kafka作为一个传统的消息代理的替代品表现得非常出色。使用消息代理有各种各样的原因(将处理与数据生成器解耦,缓冲未处理的消息,等等)。与大多数消息传递系统相比,Kafka有更好的吞吐量、内置分区、复制和容错性,这使得它成为大规模消息处理应用的一个很好的解决方案。根据我们的经验,消息传递的使用通常...

kafka框架在后端开发中如何使用?
使用:1. 生产消息:通过 RdKafka 扩展构建生产者实例,配置相关参数后发送消息。2. 消费消息:创建消费者实例,指定 topic 和 partition 进行消息消费。附加问题:遇到 Kafka 服务停止时抛出的异常:ERROR Shutdown broker because all log dirs in \/tmp\/kafka-logs have failed。解决方法:清理 kafka-...

Kafka简介+Kafka Tool使用简介+使用实例
把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。 zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。有 了 消息...

Kafka,Mq和Redis作为消息队列使用
kafka是个日志处理缓冲组件,在大数据信息处理中使用。和传统的消息队列相比较简化了队列结构和功能,以流形式处理存储(持久化)消息(主要是日志)。日志数据量巨大,处理组件一般会处理不过来,所以作为缓冲层的kafka,支持巨大吞吐量。为了防止信息丢失,其消息被调用后不直接丢弃,要多存储一段时间,等...

分布式消息Kafka的原理、基础架构、使用场景
一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。2.消息系统 解耦和生产者和消费者、缓存消息等。3.用户活动跟踪 4.运营指标 Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告...

kafka的原理是什么?
1、Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流数据(ActivityStream)和运营数据处理管道(Pipeline)的基础。现在它已被多家公司作为多种类型的数据管道和消息系统使用。2、Kafka的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点...

kafka 如何脱离 zookeeper 单独使用?
Kafka 不能脱离 Zookeeper 单独使用,因为 Kafka 使用 Zookeeper 管理和协调 Kafka 的节点服务器。但是,最新的 3.0 版本中,Kafka 依然兼容 zookeeper Controller,但 Kafka Raft 元数据模式,已经可以在不依赖 zookeeper 的情况下启动 Kafka 了。如果您想尝试使用 Kafka Raft 启动 Kafka,可以在 config ...

汶川县17883237093: 为什么使用kafka处理mysql binlog -
系倩眩晕: 在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路.另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面.如果如果本文...

汶川县17883237093: 为什么要使用MySQL代理? -
系倩眩晕: 大部人都知道使用代理的好处,毕竟,随着互联网越来越普及,互联网系统越来越庞大、复杂,性能要求越来越高,为了让整个系统具有更好的扩展性、更高的性能、解藕等多种特性,在数据库层面引入代理层是目前互联网系统常见的架构设计方案.总的来说,在数据库层面引入代理会带来以下好处: 将不同类型的请求分发的不同的server以此实现读写分离、负载均衡; 来自不同客户端的请求分发到不同的server实现后端多租户数据库服务,当然,类似的原理 还可以实现分库分表、一个请求写到多个server或者不同的源端如消息队列; 监控统计客户端的请求情况,请求分布统计、请求类型等,以此来优化数据库的使用; 总之,可以实现你想要的诸多功能.

汶川县17883237093: kafka apache 使用在什么场合 -
系倩眩晕: 1、Messaging对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性来和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息...

汶川县17883237093: kafka 的实现依赖了哪些东西 -
系倩眩晕: 1. 通常来说,kafka的使用是为了消息的持久化(persistent messages)2. 吞吐量是kafka设计的主要目标3. 关于消费的状态被记录为consumer的一部分,而不是server.这点稍微解释下,这里的server还是只broker,谁消费了多少数据都记录在消费者自己手中,不存在broker中.按理说,消费记录也是一个日志,可以放在broker中,至于为什么要这么设计,我们写下去了再说.4. Kafka的分布式可以表现在producer、broker、consumer都可以分布在多台机器上.

汶川县17883237093: kafka解决了什么问题 -
系倩眩晕: 分区实际上是调优Kafka并行度的最小单元. 对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息; 而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明). 所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大.

汶川县17883237093: mysql 和innodb的区别 -
系倩眩晕: MySQL数据库有多种存储引擎:比如:MyISAM、InnoDB、MERGE、MEMORY(HEAP)、BDB(BerkeleyDB)、EXAMPLE、FEDERATED、ARCHIVE、CSV、BLACKHOLE等等,最常见的也就是MyISAM和InnoDB了,下面主要讲解下MyISAM...

汶川县17883237093: data source explorer 配置mysql的连接,该怎么处理 -
系倩眩晕: 在window----show view下打开database explorer 在左边找到MYsql选择到相应的版本(连5.0选4.1就行).之后在右边的Connection URL details里输入连接需要的相应信...

汶川县17883237093: 怎么在java中操作mysql数据库 -
系倩眩晕: 1.安装好jre环境和和jdk,设置好环境变量,很基础,网上教程很多;2.安装mysql数据库,不用设置数据源和环境变量,只是安装好就可以,具体过程见网上教程,有时候不好安装,可能是因为之前安装没有卸载干净等原因,多查一查怎么弄,...

汶川县17883237093: java客户端使用kafka时什么情况下使用kafka client和spring kafka? -
系倩眩晕: spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作 所以你使用spring的情况下,可以用spring-kafka,当然直接用kafka client也行

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网