ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • KafkaAvroSerializer/Deserializer 를 이용하는 테스트
    개발/Spring 2022. 6. 1. 23:44

    아래 글을 참고해서 거의 동일한 내용이지만, 코드를 열어보면서 더 대충 정리한 글입니다.

    https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1

     

    개요

    Kafka 에 데이터를 전송할 때 바이트 배열로 변환해줄 Serializer 가 필요하다.

    Json, Avro, Thrift 등등이 있는데, 현재 Avro 를 사용하고 있다. 

     

     

    테스트를 작성해야하는데, 최대한 간단하고 멱등성있는 테스트를 작성하고 싶었다.

    • 인메모리 기반의 EmbeddedKafka가 있기 때문에 테스트컨테이너는 사용하지 않으려고 했다
    • Schema-Registry 까지 굳이 테스트할 생각은 없었다. (카프카로 전송하는 과정만을 테스트하고 싶었으므로)

     

    Error serializing Avro message 

    그래서 EmbeddedKafka 를 사용해서 테스트를 진행했다.

    Error serializing Avro message
    org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    	at app//io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:107)
    	at app//io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:58)
    	at app//org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    	at app//org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954)
    	at app//org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914)
    	at app//org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993)
    	at app//org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655)
    	at app//org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)

    그러나 단순히 publish 만 하는 테스트를 실행했을 때 위와 같은 에러가 발생한다.

     

     

    원인은 CachedSchemaRegistryClient 라는 녀석에게 있다.

    Stacktrace 에 찍힌 AbstractKafkaAvroSerializer.serializeImpl 부분을 보면 다음과 같은 코드가 있다.

    // AbstractKafkaAvroSerializer.serializeImpl
    if (this.autoRegisterSchema) {
        restClientErrorMsg = "Error registering Avro schema: ";
        id = this.schemaRegistry.register(subject, new AvroSchema(schema));
    } else {
        restClientErrorMsg = "Error retrieving Avro schema: ";
        id = this.schemaRegistry.getId(subject, new AvroSchema(schema));
    }

    저기서 schemaRegistry 의 기본 구현체 CachedSchemaRegistryClient 다.

     

    스키마 레지스트리의 동작을 간단하게 풀어보면

    1-1. 스키마 자동 등록 설정이 되어있으면, 등록하고 ID 를 받아온다.

    1-2. 스키마 자동 등록 설정이 안되어있으면, 스키마로 ID 를 조회한다.

    2-1.  스키마가 캐싱되어 있으면 가져온다.

    2-2. 스키마가 캐싱되어 있지 않으면 원격 스키마 레지스트리를 조회해서 ID를 가져온다.

     

    스키마 자동등록 설정이 되어있었기 때문에 schema.registry.url 에 적힌 url 로 접근하여 스키마를 register / get 하려고 시도한다.

    그런데 Schema-Registry 는 테스트 대상이 아니므로 올바른 URL 을 입력했을리가 없고, 에러가 발생하는 것이다.

    // CachedSchemaRegistryClient
    public synchronized int register(String subject, ParsedSchema schema, int version, int id) throws IOException, RestClientException {
            ...
            if (cachedId != null) {
               ...
            } else if (schemaIdMap.size() >= this.identityMapCapacity) {
                ...
            } else {
                int retrievedId = id >= 0 ? this.registerAndGetId(subject, schema, version, id) : this.registerAndGetId(subject, schema);
    			...
            }
        }
    
    private int registerAndGetId(String subject, ParsedSchema schema) throws IOException, RestClientException {
        return this.restService.registerSchema(schema.canonicalString(), schema.schemaType(), schema.references(), subject);
    }
    
    // RestService
    public int registerSchema(Map<String, String> requestProperties, RegisterSchemaRequest registerSchemaRequest, String subject) throws IOException, RestClientException {
        UriBuilder builder = UriBuilder.fromPath("/subjects/{subject}/versions");
        String path = builder.build(new Object[]{subject}).toString();
        // HTTP Request 를 한다
        RegisterSchemaResponse response = (RegisterSchemaResponse)this.httpRequest(path, "POST", registerSchemaRequest.toJson().getBytes(StandardCharsets.UTF_8), requestProperties, REGISTER_RESPONSE_TYPE);
        return response.getId();
    }

     

    MockSchemaRegistryClient

    CachedSchemaRegistryClient 대신, MockSchemaRegistryClient 를 사용하도록 한다.

    MockSchemaRegistryClient 는 실제 Schema-Registry 를 다녀오지 않는다.

    class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
        CustomKafkaAvroSerializer() {
            super()
            super.schemaRegistry = new MockSchemaRegistryClient()
        }
    }

    기존의 KafkaAvroSerializer 를 상속받아서 schemaRegistry 만 교체해주면 끝이다.

     

     

    Deserializer

    deserializer 도 같은 일이 발생한다. 그래서 마찬가지로 MockSchemaRegistryClient 를 사용해주면 된다.

    차이점은 Serializer에서 생성시점에 schemaRegistry를 변경해주었지만,

    Deserializer 는 topic 마다 다른 스키마를 내려줘야 하므로 deserialize 시점마다 분기를 태워 변경을 해주도록 한다. 

    class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
        @Override
        Object deserialize(String topic, byte[] bytes) {
            if (topic.equals("event-occured")) {
                this.schemaRegistry = getMockClient(EventOccured.SCHEMA$);
            }
            if (topic.equals("event-canceled") {
                this.schemaRegistry = getMockClient(EventCanceled.SCHEMA$);
            }
            return super.deserialize(topic, bytes)
        }
    
        private static SchemaRegistryClient getMockClient(final Schema schema$) {
            return new MockSchemaRegistryClient() {
                @Override
                synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException {
                    return new AvroSchema(schema$)
                }
            }
        }
    }

     

    댓글

Designed by Tistory.