kafka自定义序列化类

kafka版本

zookeeper使用版本:zookeeper-3.4.14.tar.gz

kafka使用版本:kafka_2.12-2.0.0.tgz

pom.xml依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

kafka原生序列化

kafka在发送和接受消息的时候,都是以byte[]字节型数组发送或者接受的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行发送或者接受的。 通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式 ,例如:

1
2
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafka序列化方式

序列化方式 对应java数据类型 说明
org.apache.kafka.common.serialization.ByteArraySerializer byte[] 原生类型
org.apache.kafka.common.serialization.ByteBufferSerializer ByteBuffer 关于ByteBuffer
org.apache.kafka.common.serialization.IntegerSerializer Interger
org.apache.kafka.common.serialization.ShortSerializer Short
org.apache.kafka.common.serialization.LongSerializer Long
org.apache.kafka.common.serialization.DoubleSerializer Double
org.apache.kafka.common.serialization.StringSerializer String

kafka反序列化方式

反序列化方式 对应java数据类型 说明
org.apache.kafka.common.serialization.ByteArrayDeserializer byte[] 原生类型
org.apache.kafka.common.serialization.ByteBufferDeserializer ByteBuffer 关于ByteBuffer
org.apache.kafka.common.serialization.IntegerDeserializer Interger
org.apache.kafka.common.serialization.ShortDeserializer Short
org.apache.kafka.common.serialization.LongDeserializer Long
org.apache.kafka.common.serialization.DoubleDeserializer Double
org.apache.kafka.common.serialization.StringDeserializer String

kafka原生序列化实现

以String为例子,我们分析kafka如何实现序列化与反序列化

String序列化

查看 org.apache.kafka.common.serialization.StringSerializer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
* value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
*/
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}

String的序列化类是继承了Serializer接口,指定泛型,然后实现的Serializer接口的configure()、serialize()方法。代码重点的实现是在serialize(),可以看出这个方法将我们传入的String类型的数据,简单的通过data.getBytes()方法进行了序列化。

string反序列化

查看 org.apache.kafka.common.serialization.StringDeserializer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
* String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
* value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
*/
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
}

String的反序列化类是继承了Deserializer接口,指定泛型,然后实现的Deserializer接口的configure()、deserialize()方法。代码重点的实现是在deserialize(),可以看出这个方法将我们传入的byte[]类型的数据,简单的通过return new String(data, encoding)方法进行了反序列化得到了String类型的数据。

float序列化

我们再看float类型数据的序列化类型,查看org.apache.kafka.common.serialization.FloatSerializer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

public class FloatSerializer implements Serializer<Float> {
@Override
public byte[] serialize(final String topic, final Float data) {
if (data == null)
return null;

long bits = Float.floatToRawIntBits(data);
return new byte[] {
(byte) (bits >>> 24),
(byte) (bits >>> 16),
(byte) (bits >>> 8),
(byte) bits
};
}
}

可以看到,FloatSerializer类继承了Serializer接口,指定泛型,然后实现的Serializer接口的serialize()方法,即直接将float类型数据转为byte数组返回。

float反序列化

查看org.apache.kafka.common.serialization.FloatSerializer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

public class FloatDeserializer implements Deserializer<Float> {
@Override
public Float deserialize(final String topic, final byte[] data) {
if (data == null)
return null;
if (data.length != 4) {
throw new SerializationException("Size of data received by Deserializer is not 4");
}

int value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return Float.intBitsToFloat(value);
}
}

可以看到,FloatSerializer类继承了Serializer接口,指定泛型,然后实现的Serializer接口的serialize()方法,即直接将byte数组转回为float类型返回。

自此我们可以确定,kafka中的序列化方式就是将数据转为byte[]数组方式进行传递,在消费者端通过反序列化类解析byte[]数组方式来解析数据,下面我们来自己定义序列化类。

kakfa自定义序列化类

基于fastjson的序列化方式

配置pom.xml中引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.37</version>
</dependency>

student对象实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
*
* Title: Student.java
* Description: TODO
*
* @author MrChen
* @date 2019年12月5日 上午11:22:38
*/
public class Student {
int id;
String name;
int age;
boolean sex;
float[] sorce;

public Student(int id, String name, int age, boolean sex, float[] sorce) {
super();
this.id = id;
this.name = name;
this.age = age;
this.sex = sex;
this.sorce = sorce;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public boolean isSex() {
return sex;
}

public void setSex(boolean sex) {
this.sex = sex;
}

public float[] getSorce() {
return sorce;
}

public void setSorce(float[] sorce) {
this.sorce = sorce;
}

}

序列化实现StudentSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
*
* Title: StudentSerializer.java
* Description: student序列化类
*
* @author MrChen
* @date 2019年12月4日 下午5:13:41
*/
public class StudentSerializer implements Serializer<Student>{

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, Student data) {
// TODO Auto-generated method stub
return JSON.toJSONBytes(data);
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}

反序列化实现StudentDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
*
* Title: StudentDeserializer.java
* Description: student反序列化类
*
* @author MrChen
* @date 2019年12月4日 下午5:13:11
*/
public class StudentDeserializer implements Deserializer<Student>{

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public DemodulatorParameters deserialize(String topic, byte[] data) {
// TODO Auto-generated method stub
return JSON.parseObject(data, Student.class);
}

@Override
public void close() {
// TODO Auto-generated method stub

}

}

基于protostuff的序列化方式

工具类ProtostuffUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by shirukai on 2018/8/14
* protostuff 序列化/反序列化工具类
*/
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

/**
* 序列化
*
* @param message 序列化数据
* @param tClass .class
* @param <T> 类型
* @return byte[]
*/
public static <T> byte[] serializer(T message, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
return ProtostuffIOUtil.toByteArray(message, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
}

/**
* 反序列化
*
* @param bytes bytes
* @param tClass .class
* @param <T> 类型
* @return T
*/
public static <T> T deserializer(byte[] bytes, Class<T> tClass) {
Schema<T> schema = getSchema(tClass);
T message = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, message, schema);
return message;
}

private static <T> Schema<T> getSchema(Class<T> tClass) {
Schema<T> schema = (Schema<T>) cachedSchema.get(tClass);
if (schema == null) {
schema = RuntimeSchema.createFrom(tClass);
cachedSchema.put(tClass, schema);
}
return schema;
}
}

序列化实现StudentProtostuffSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class StudentProtostuffSerializer implements Serializer<Student> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public byte[] serialize(String topic, Student data) {
return ProtostuffUtil.serializer(data, Person.class);
}

@Override
public void close() {

}
}

反序列化实现StudentProtostuffDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Created by shirukai on 2018/8/25
*/
public class StudentProtostuffDeserializer implements Deserializer<Student> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public Person deserialize(String topic, byte[] data) {
return ProtostuffUtil.deserializer(data, Student.class);
}

@Override
public void close() {

}
}

总结

根据自己的需求对kafka消息进行自定义序列化,将数据信息转为byte[]数组的形式,在反序列化端将byte[]反向转为数据信息。

Donate comment here