|     kafka-connect kafka java kafka-connect kafka   |    

产生消息

还是从MySqlConnectorTask.poll()看起,最终调用的是AbstractReader.poll()

MySqlConnectorTask.poll()-->ChainedReader.poll()-->AbstractReader.poll()

AbstractReader可以是SnapshotReader或者BinlogReader,增量是BinlogReader,全量是SnapshotReader

poll() 主要逻辑代码

logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {}
return batch;

就是从records队列一次获取maxBatchSize量的SourceRecord,这个records是一个LinkedBlockingDeque

 this.records = new LinkedBlockingDeque<>(context.maxQueueSize());

既然是BlockingDeque,直接看入队列好了,在AbstractReader.enqueueRecord()中可以找到put入队列

protected void enqueueRecord(SourceRecord record) throws InterruptedException {
    if (record != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Enqueuing source record: {}", record);
        }
        this.records.put(record);
    }
}

这个enqueueRecord在很多地方调用到

BinlogReader.handleInsert(Event event)
BinlogReader.handleUpdate(Event event)
BinlogReader.handleDelete(Event event)
SnapshotReader.handleQueryEvent(Event event)
SnapshotReader.execute() 
SnapshotReader.enqueueSchemaChanges

看下insert,update,delete事件的处理,handleUpdate为例,代码略去很多logger代码

protected void handleUpdate(Event event) throws InterruptedException {
    UpdateRowsEventData update = unwrapData(event);
    long tableNumber = update.getTableId();
    BitSet includedColumns = update.getIncludedColumns();
    // BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
    RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
    if (recordMaker != null) {
        List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
        Long ts = context.clock().currentTimeInMillis();
        int count = 0;
        int numRows = rows.size();
        if (startingRowNumber < numRows) {
            for (int row = startingRowNumber; row != numRows; ++row) {
                Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
                Serializable[] before = changes.getKey();
                Serializable[] after = changes.getValue();
                count += recordMaker.update(before, after, ts, row, numRows);
            }

        } else {
            // All rows were previously processed ...
            logger.debug("Skipping previously processed update event: {}", event);
        }
    } else {
        logger.debug("Skipping update row event: {}", event);
    }
    startingRowNumber = 0;
}

重要的是两行

RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)
count += recordMaker.update(before, after, ts, row, numRows);

recordMaker

public int update(Object[] before, Object[] after, long ts, int rowNumber, int numberOfRows) throws InterruptedException {
    return converter.update(source, before, after, rowNumber, numberOfRows, includedColumns, ts, consumer);
}

Converter

public int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns,
                  long ts,
                  BlockingConsumer<SourceRecord> consumer)
        throws InterruptedException {
    int count = 0;
    Object key = tableSchema.keyFromColumnData(after);
    Struct valueAfter = tableSchema.valueFromColumnData(after);
    logger.info("valueAfter: {},key {}",valueAfter,key);
    if (valueAfter != null || key != null) {
        Object oldKey = tableSchema.keyFromColumnData(before);
        Struct valueBefore = tableSchema.valueFromColumnData(before);
        Schema keySchema = tableSchema.keySchema();
        Map<String, ?> partition = source.partition();
        Map<String, ?> offset = source.offsetForRow(rowNumber, numberOfRows);
        Struct origin = source.struct(id);
        if (key != null && !Objects.equals(key, oldKey)) {
            // The key has changed, so we need to deal with both the new key and old key.
            // Consumers may push the events into a system that won't allow both records to exist at the same time,
            // so we first want to send the delete event for the old key... 
            logger.info("update record oldkey: {},key: {} "+oldKey,key);
            SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
                    keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
            consumer.accept(record);
            ++count;

            // Next send a tombstone event for the old key ...
            record = new SourceRecord(partition, offset, topicName, partitionNum, keySchema, oldKey, null, null);
            consumer.accept(record);
            ++count;

            // And finally send the create event ...
            record = new SourceRecord(partition, offset, topicName, partitionNum,
                    keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
            consumer.accept(record);
            ++count;
        } else {
            // The key has not changed, so a simple update is fine ...
            SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
                    keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
            logger.info("update record {}",record.toString());
            consumer.accept(record);
            ++count;
        }
    }
    return count;
}

这里有个逻辑是如果key 发生了变化,会将生成多个SourceRecord,就是想删后insert新的 eg;

建表语句

CREATE TABLE `test` (
  `ID` int(11) NOT NULL AUTO_INCREMENT,
  `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8
insert into test(id,x) values(null,now());

sql

mysql> select * from test where id=100;
+-----+---------------------+
| ID  | X                   |
+-----+---------------------+
| 100 | 2017-09-15 00:34:20 |
+-----+---------------------+
1 row in set (0.03 sec)
mysql> update test set x=now() where id =100;
Query OK, 1 row affected (0.02 sec)
mysql> select * from test where id =100;
+-----+---------------------+
| ID  | X                   |
+-----+---------------------+
| 100 | 2017-09-15 00:47:43 |
+-----+---------------------+
1 row in set (0.00 sec)

Binlog解析事件, 明显差了8个小时,UTC吧

{before=[100, 2017-09-14T16:47:05+08:00[PRC]], after=[100, 2017-09-14T16:47:43+08:00[PRC]]}

key 这里id是Primary key,100

valueAfter: Struct{ID=100,X=2017-09-14T16:47:43+08:00},key Struct{ID=100} 

生成的SourceRecord

[2017-09-15 00:47:53,078] INFO update record SourceRecord{sourcePartition={server=b555533}, sourceOffset={ts_sec=1505407663, file=mysql-bin.000022, pos=26544, row=1, server_id=2, event=2}} ConnectRecord{topic='b555533.test2.test', kafkaPartition=null, key=Struct{ID=100}, value=Struct{before=Struct{ID=100,X=2017-09-14T16:47:05+08:00},after=Struct{ID=100,X=2017-09-14T16:47:43+08:00},source=Struct{name=b555533,server_id=2,ts_sec=1505407663,file=mysql-bin.000022,pos=26666,row=0,thread=694,db=test2,table=test},op=u,ts_ms=1505407673078}, timestamp=null} (io.debezium.connector.mysql.RecordMakers:269)