-
KafkaAvroSerializer/Deserializer 를 이용하는 테스트개발/Spring 2022. 6. 1. 23:44
아래 글을 참고해서 거의 동일한 내용이지만, 코드를 열어보면서 더 대충 정리한 글입니다.
https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1
개요
Kafka 에 데이터를 전송할 때 바이트 배열로 변환해줄 Serializer 가 필요하다.
Json, Avro, Thrift 등등이 있는데, 현재 Avro 를 사용하고 있다.
테스트를 작성해야하는데, 최대한 간단하고 멱등성있는 테스트를 작성하고 싶었다.
- 인메모리 기반의 EmbeddedKafka가 있기 때문에 테스트컨테이너는 사용하지 않으려고 했다
- Schema-Registry 까지 굳이 테스트할 생각은 없었다. (카프카로 전송하는 과정만을 테스트하고 싶었으므로)
Error serializing Avro message
그래서 EmbeddedKafka 를 사용해서 테스트를 진행했다.
Error serializing Avro message org.apache.kafka.common.errors.SerializationException: Error serializing Avro message at app//io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:107) at app//io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:58) at app//org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) at app//org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954) at app//org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) at app//org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993) at app//org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) at app//org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
그러나 단순히 publish 만 하는 테스트를 실행했을 때 위와 같은 에러가 발생한다.
원인은 CachedSchemaRegistryClient 라는 녀석에게 있다.
Stacktrace 에 찍힌 AbstractKafkaAvroSerializer.serializeImpl 부분을 보면 다음과 같은 코드가 있다.
// AbstractKafkaAvroSerializer.serializeImpl if (this.autoRegisterSchema) { restClientErrorMsg = "Error registering Avro schema: "; id = this.schemaRegistry.register(subject, new AvroSchema(schema)); } else { restClientErrorMsg = "Error retrieving Avro schema: "; id = this.schemaRegistry.getId(subject, new AvroSchema(schema)); }
저기서 schemaRegistry 의 기본 구현체 CachedSchemaRegistryClient 다.
스키마 레지스트리의 동작을 간단하게 풀어보면
1-1. 스키마 자동 등록 설정이 되어있으면, 등록하고 ID 를 받아온다.
1-2. 스키마 자동 등록 설정이 안되어있으면, 스키마로 ID 를 조회한다.
2-1. 스키마가 캐싱되어 있으면 가져온다.
2-2. 스키마가 캐싱되어 있지 않으면 원격 스키마 레지스트리를 조회해서 ID를 가져온다.
스키마 자동등록 설정이 되어있었기 때문에 schema.registry.url 에 적힌 url 로 접근하여 스키마를 register / get 하려고 시도한다.
그런데 Schema-Registry 는 테스트 대상이 아니므로 올바른 URL 을 입력했을리가 없고, 에러가 발생하는 것이다.
// CachedSchemaRegistryClient public synchronized int register(String subject, ParsedSchema schema, int version, int id) throws IOException, RestClientException { ... if (cachedId != null) { ... } else if (schemaIdMap.size() >= this.identityMapCapacity) { ... } else { int retrievedId = id >= 0 ? this.registerAndGetId(subject, schema, version, id) : this.registerAndGetId(subject, schema); ... } } private int registerAndGetId(String subject, ParsedSchema schema) throws IOException, RestClientException { return this.restService.registerSchema(schema.canonicalString(), schema.schemaType(), schema.references(), subject); } // RestService public int registerSchema(Map<String, String> requestProperties, RegisterSchemaRequest registerSchemaRequest, String subject) throws IOException, RestClientException { UriBuilder builder = UriBuilder.fromPath("/subjects/{subject}/versions"); String path = builder.build(new Object[]{subject}).toString(); // HTTP Request 를 한다 RegisterSchemaResponse response = (RegisterSchemaResponse)this.httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(StandardCharsets.UTF_8), requestProperties, REGISTER_RESPONSE_TYPE); return response.getId(); }
MockSchemaRegistryClient
CachedSchemaRegistryClient 대신, MockSchemaRegistryClient 를 사용하도록 한다.
MockSchemaRegistryClient 는 실제 Schema-Registry 를 다녀오지 않는다.
class CustomKafkaAvroSerializer extends KafkaAvroSerializer { CustomKafkaAvroSerializer() { super() super.schemaRegistry = new MockSchemaRegistryClient() } }
기존의 KafkaAvroSerializer 를 상속받아서 schemaRegistry 만 교체해주면 끝이다.
Deserializer
deserializer 도 같은 일이 발생한다. 그래서 마찬가지로 MockSchemaRegistryClient 를 사용해주면 된다.
차이점은 Serializer에서 생성시점에 schemaRegistry를 변경해주었지만,
Deserializer 는 topic 마다 다른 스키마를 내려줘야 하므로 deserialize 시점마다 분기를 태워 변경을 해주도록 한다.
class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer { @Override Object deserialize(String topic, byte[] bytes) { if (topic.equals("event-occured")) { this.schemaRegistry = getMockClient(EventOccured.SCHEMA$); } if (topic.equals("event-canceled") { this.schemaRegistry = getMockClient(EventCanceled.SCHEMA$); } return super.deserialize(topic, bytes) } private static SchemaRegistryClient getMockClient(final Schema schema$) { return new MockSchemaRegistryClient() { @Override synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException { return new AvroSchema(schema$) } } } }