kafka版本
zookeeper使用版本:zookeeper-3.4.14.tar.gz
kafka使用版本:kafka_2.12-2.0.0.tgz
pom.xml依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
kafka原生序列化
kafka在发送和接受消息的时候,都是以byte[]字节型数组发送或者接受的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行发送或者接受的。 通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式 ,例如:
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafka序列化方式
序列化方式 | 对应java数据类型 | 说明 |
---|---|---|
org.apache.kafka.common.serialization.ByteArraySerializer | byte[] | 原生类型 |
org.apache.kafka.common.serialization.ByteBufferSerializer | ByteBuffer | 关于ByteBuffer |
org.apache.kafka.common.serialization.IntegerSerializer | Interger | |
org.apache.kafka.common.serialization.ShortSerializer | Short | |
org.apache.kafka.common.serialization.LongSerializer | Long | |
org.apache.kafka.common.serialization.DoubleSerializer | Double | |
org.apache.kafka.common.serialization.StringSerializer | String |
kafka反序列化方式
反序列化方式 | 对应java数据类型 | 说明 |
---|---|---|
org.apache.kafka.common.serialization.ByteArrayDeserializer | byte[] | 原生类型 |
org.apache.kafka.common.serialization.ByteBufferDeserializer | ByteBuffer | 关于ByteBuffer |
org.apache.kafka.common.serialization.IntegerDeserializer | Interger | |
org.apache.kafka.common.serialization.ShortDeserializer | Short | |
org.apache.kafka.common.serialization.LongDeserializer | Long | |
org.apache.kafka.common.serialization.DoubleDeserializer | Double | |
org.apache.kafka.common.serialization.StringDeserializer | String |
kafka原生序列化实现
以String为例子,我们分析kafka如何实现序列化与反序列化
String序列化
查看 org.apache.kafka.common.serialization.StringSerializer 类
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}
String的序列化类是继承了Serializer接口,指定
string反序列化
查看 org.apache.kafka.common.serialization.StringDeserializer类
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
}
String的反序列化类是继承了Deserializer接口,指定
float序列化
我们再看float类型数据的序列化类型,查看org.apache.kafka.common.serialization.FloatSerializer类
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
public class FloatSerializer implements Serializer<Float> {
@Override
public byte[] serialize(final String topic, final Float data) {
if (data == null)
return null;
long bits = Float.floatToRawIntBits(data);
return new byte[] {
(byte) (bits >>> 24),
(byte) (bits >>> 16),
(byte) (bits >>> 8),
(byte) bits
};
}
}
可以看到,FloatSerializer类继承了Serializer接口,指定
float反序列化
查看org.apache.kafka.common.serialization.FloatSerializer类
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
public class FloatDeserializer implements Deserializer<Float> {
@Override
public Float deserialize(final String topic, final byte[] data) {
if (data == null)
return null;
if (data.length != 4) {
throw new SerializationException("Size of data received by Deserializer is not 4");
}
int value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return Float.intBitsToFloat(value);
}
}
可以看到,FloatSerializer类继承了Serializer接口,指定
自此我们可以确定,kafka中的序列化方式就是将数据转为byte[]数组方式进行传递,在消费者端通过反序列化类解析byte[]数组方式来解析数据,下面我们来自己定义序列化类。
kakfa自定义序列化类
基于fastjson的序列化方式
配置pom.xml中引入依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.37</version>
</dependency>
student对象实体
/**
*
* Title: Student.java
* Description: TODO
*
* @author MrChen
* @date 2019年12月5日 上午11:22:38
*/
public class Student {
int id;
String name;
int age;
boolean sex;
float[] sorce;
public Student(int id, String name, int age, boolean sex, float[] sorce) {
super();
this.id = id;
this.name = name;
this.age = age;
this.sex = sex;
this.sorce = sorce;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isSex() {
return sex;
}
public void setSex(boolean sex) {
this.sex = sex;
}
public float[] getSorce() {
return sorce;
}
public void setSorce(float[] sorce) {
this.sorce = sorce;
}
}
序列化实现StudentSerializer
/**
*
* Title: StudentSerializer.java
* Description: student序列化类
*
* @author MrChen
* @date 2019年12月4日 下午5:13:41
*/
public class StudentSerializer implements Serializer<Student>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, Student data) {
// TODO Auto-generated method stub
return JSON.toJSONBytes(data);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
反序列化实现StudentDeserializer
/**
*
* Title: StudentDeserializer.java
* Description: student反序列化类
*
* @author MrChen
* @date 2019年12月4日 下午5:13:11
*/
public class StudentDeserializer implements Deserializer<Student>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public DemodulatorParameters deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
return JSON.parseObject(data, Student.class);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
基于protostuff的序列化方式
工具类ProtostuffUtil
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by shirukai on 2018/8/14
* protostuff 序列化/反序列化工具类
*/
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
/**
* 序列化
*
* @param message 序列化数据
* @param tClass .class
* @param <T> 类型
* @return byte[]
*/
public static <T> byte[] serializer(T message, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
return ProtostuffIOUtil.toByteArray(message, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
}
/**
* 反序列化
*
* @param bytes bytes
* @param tClass .class
* @param <T> 类型
* @return T
*/
public static <T> T deserializer(byte[] bytes, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
T message = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, message, schema);
return message;
}
private static <T> Schema<T> getSchema(Class<T> tClass) {
Schema<T> schema = (Schema<T>) cachedSchema.get(tClass);
if (schema == null) {
schema = RuntimeSchema.createFrom(tClass);
cachedSchema.put(tClass, schema);
}
return schema;
}
}
序列化实现StudentProtostuffSerializer
import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* Created by shirukai on 2018/8/25
*/
public class StudentProtostuffSerializer implements Serializer<Student> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Student data) {
return ProtostuffUtil.serializer(data, Person.class);
}
@Override
public void close() {
}
}
反序列化实现StudentProtostuffDeserializer
import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* Created by shirukai on 2018/8/25
*/
public class StudentProtostuffDeserializer implements Deserializer<Student> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Person deserialize(String topic, byte[] data) {
return ProtostuffUtil.deserializer(data, Student.class);
}
@Override
public void close() {
}
}
总结
根据自己的需求对kafka消息进行自定义序列化,将数据信息转为byte[]数组的形式,在反序列化端将byte[]反向转为数据信息。