kafka版本

zookeeper使用版本:zookeeper-3.4.14.tar.gz

kafka使用版本:kafka_2.12-2.0.0.tgz

pom.xml依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka原生序列化

kafka在发送和接受消息的时候,都是以byte[]字节型数组发送或者接受的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行发送或者接受的。 通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式 ,例如:

        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafka序列化方式

序列化方式 对应java数据类型 说明
org.apache.kafka.common.serialization.ByteArraySerializer byte[] 原生类型
org.apache.kafka.common.serialization.ByteBufferSerializer ByteBuffer 关于ByteBuffer
org.apache.kafka.common.serialization.IntegerSerializer Interger
org.apache.kafka.common.serialization.ShortSerializer Short
org.apache.kafka.common.serialization.LongSerializer Long
org.apache.kafka.common.serialization.DoubleSerializer Double
org.apache.kafka.common.serialization.StringSerializer String

kafka反序列化方式

反序列化方式 对应java数据类型 说明
org.apache.kafka.common.serialization.ByteArrayDeserializer byte[] 原生类型
org.apache.kafka.common.serialization.ByteBufferDeserializer ByteBuffer 关于ByteBuffer
org.apache.kafka.common.serialization.IntegerDeserializer Interger
org.apache.kafka.common.serialization.ShortDeserializer Short
org.apache.kafka.common.serialization.LongDeserializer Long
org.apache.kafka.common.serialization.DoubleDeserializer Double
org.apache.kafka.common.serialization.StringDeserializer String

kafka原生序列化实现

以String为例子,我们分析kafka如何实现序列化与反序列化

String序列化

查看 org.apache.kafka.common.serialization.StringSerializer 类

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 *  String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
 *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
 */
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }
}

String的序列化类是继承了Serializer接口,指定泛型,然后实现的Serializer接口的configure()、serialize()方法。代码重点的实现是在serialize(),可以看出这个方法将我们传入的String类型的数据,简单的通过data.getBytes()方法进行了序列化。

string反序列化

查看 org.apache.kafka.common.serialization.StringDeserializer类

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
 *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
 */
public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }
}

String的反序列化类是继承了Deserializer接口,指定泛型,然后实现的Deserializer接口的configure()、deserialize()方法。代码重点的实现是在deserialize(),可以看出这个方法将我们传入的byte[]类型的数据,简单的通过return new String(data, encoding)方法进行了反序列化得到了String类型的数据。

float序列化

我们再看float类型数据的序列化类型,查看org.apache.kafka.common.serialization.FloatSerializer类

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

public class FloatSerializer implements Serializer<Float> {
    @Override
    public byte[] serialize(final String topic, final Float data) {
        if (data == null)
            return null;

        long bits = Float.floatToRawIntBits(data);
        return new byte[] {
            (byte) (bits >>> 24),
            (byte) (bits >>> 16),
            (byte) (bits >>> 8),
            (byte) bits
        };
    }
}

可以看到,FloatSerializer类继承了Serializer接口,指定泛型,然后实现的Serializer接口的serialize()方法,即直接将float类型数据转为byte数组返回。

float反序列化

查看org.apache.kafka.common.serialization.FloatSerializer类

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.errors.SerializationException;

public class FloatDeserializer implements Deserializer<Float> {
    @Override
    public Float deserialize(final String topic, final byte[] data) {
        if (data == null)
            return null;
        if (data.length != 4) {
            throw new SerializationException("Size of data received by Deserializer is not 4");
        }

        int value = 0;
        for (byte b : data) {
            value <<= 8;
            value |= b & 0xFF;
        }
        return Float.intBitsToFloat(value);
    }
}

可以看到,FloatSerializer类继承了Serializer接口,指定泛型,然后实现的Serializer接口的serialize()方法,即直接将byte数组转回为float类型返回。

自此我们可以确定,kafka中的序列化方式就是将数据转为byte[]数组方式进行传递,在消费者端通过反序列化类解析byte[]数组方式来解析数据,下面我们来自己定义序列化类。

kakfa自定义序列化类

基于fastjson的序列化方式

配置pom.xml中引入依赖:

        <dependency>
        	<groupId>com.alibaba</groupId>
        	<artifactId>fastjson</artifactId>
        	<version>1.2.37</version>
        </dependency>

student对象实体

/**
 * 
 * Title: Student.java 
 * Description: TODO
 *
 * @author MrChen
 * @date 2019年12月5日  上午11:22:38
 */
public class Student {
	int id;
	String name;
	int age;
	boolean sex;
	float[] sorce;
	
	public Student(int id, String name, int age, boolean sex, float[] sorce) {
		super();
		this.id = id;
		this.name = name;
		this.age = age;
		this.sex = sex;
		this.sorce = sorce;
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public boolean isSex() {
		return sex;
	}

	public void setSex(boolean sex) {
		this.sex = sex;
	}

	public float[] getSorce() {
		return sorce;
	}

	public void setSorce(float[] sorce) {
		this.sorce = sorce;
	}
	
}

序列化实现StudentSerializer


/**
 * 
 * Title: StudentSerializer.java 
 * Description: student序列化类
 *
 * @author MrChen
 * @date 2019年12月4日  下午5:13:41
 */
public class StudentSerializer implements Serializer<Student>{

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public byte[] serialize(String topic, Student data) {
		// TODO Auto-generated method stub
		return JSON.toJSONBytes(data);
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}

}

反序列化实现StudentDeserializer


/**
 * 
 * Title: StudentDeserializer.java 
 * Description: student反序列化类
 *
 * @author MrChen
 * @date 2019年12月4日  下午5:13:11
 */
public class StudentDeserializer implements Deserializer<Student>{

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public DemodulatorParameters deserialize(String topic, byte[] data) {
		// TODO Auto-generated method stub
		return JSON.parseObject(data, Student.class);
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}

}

基于protostuff的序列化方式

工具类ProtostuffUtil

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by shirukai on 2018/8/14
 * protostuff 序列化/反序列化工具类
 */
public class ProtostuffUtil {
    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    /**
     * 序列化
     *
     * @param message 序列化数据
     * @param tClass  .class
     * @param <T>     类型
     * @return byte[]
     */
    public static <T> byte[] serializer(T message, Class<T> tClass) {
        Schema<T> schema = getSchema(tClass);
        return ProtostuffIOUtil.toByteArray(message, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
    }

    /**
     * 反序列化
     *
     * @param bytes  bytes
     * @param tClass .class
     * @param <T>    类型
     * @return T
     */
    public static <T> T deserializer(byte[] bytes, Class<T> tClass) {
        Schema<T> schema = getSchema(tClass);
        T message = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, message, schema);
        return message;
    }

    private static <T> Schema<T> getSchema(Class<T> tClass) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(tClass);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(tClass);
            cachedSchema.put(tClass, schema);
        }
        return schema;
    }
}

序列化实现StudentProtostuffSerializer

import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
 * Created by shirukai on 2018/8/25
 */
public class StudentProtostuffSerializer implements Serializer<Student> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Student data) {
        return ProtostuffUtil.serializer(data, Person.class);
    }

    @Override
    public void close() {

    }
}

反序列化实现StudentProtostuffDeserializer

import com.springboot.demo.kafka.entity.Person;
import com.springboot.demo.utils.ProtostuffUtil;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
 * Created by shirukai on 2018/8/25
 */
public class StudentProtostuffDeserializer implements Deserializer<Student> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Person deserialize(String topic, byte[] data) {
        return ProtostuffUtil.deserializer(data, Student.class);
    }

    @Override
    public void close() {

    }
}

总结

根据自己的需求对kafka消息进行自定义序列化,将数据信息转为byte[]数组的形式,在反序列化端将byte[]反向转为数据信息。