HowTo: Implement Avro Schema Inheritance
Avro Schema does not support Inheritance, but we can implement avro schema inheritance with some effort.
Before getting started, clone the git repo:
git clone https://github.com/mykidong/implement-avro-inheritance.git
Implement Avro Schema Inheritance
Let’s see the implementation class:
package io.shunters.coda.protocol;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by mykidong on 2017-08-24.
*/
public class AvroSchemaLoader {
private static Logger log = LoggerFactory.getLogger(AvroSchemaLoader.class);
public static final String DEFAULT_AVRO_SCHEMA_DIR_PATH = "/META-INF/avro";
private Map<String, Schema> schemas = new HashMap<>();
private static final Object lock = new Object();
private static AvroSchemaLoader avroSchemaLoader;
public static AvroSchemaLoader singleton(String pathDir)
{
if(avroSchemaLoader == null)
{
synchronized (lock)
{
if(avroSchemaLoader == null)
{
avroSchemaLoader = new AvroSchemaLoader(pathDir);
}
}
}
return avroSchemaLoader;
}
public static AvroSchemaLoader singletonForSchemaPaths(String... schemaPaths)
{
if(avroSchemaLoader == null)
{
synchronized (lock)
{
if(avroSchemaLoader == null)
{
avroSchemaLoader = new AvroSchemaLoader(schemaPaths);
}
}
}
return avroSchemaLoader;
}
private AvroSchemaLoader(String... schemaPaths) {
List<String> jsonList = new ArrayList<>();
for (String schemaPath : schemaPaths) {
String json = fileToString(schemaPath);
jsonList.add(json);
}
resolveSchemaRepeatedly(jsonList);
}
private AvroSchemaLoader(String pathDir)
{
List<String> files = readFileNamesFromClasspath(pathDir);
List<String> jsonList = new ArrayList<>();
files.stream().forEach(f -> {
String path = pathDir + "/" + f;
String json = fileToString(path);
jsonList.add(json);
});
resolveSchemaRepeatedly(jsonList);
}
private List<String> readFileNamesFromClasspath(String pathDir)
{
List<String> fileNames = new ArrayList<>();
ClassLoader cl = this.getClass().getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
try {
Resource[] resources = resolver.getResources("classpath*:" + pathDir + "/*.avsc");
for (Resource resource : resources) {
fileNames.add(resource.getFilename());
}
return fileNames;
}catch (IOException e)
{
e.printStackTrace();
}
return fileNames;
}
private void resolveSchemaRepeatedly(List<String> jsonList) {
List<String> unresolvedSchemaList = this.putSchemaToMap(jsonList);
//log.info("unresolvedSchemaList: [" + unresolvedSchemaList.size() + "]");
while (unresolvedSchemaList.size() > 0)
{
unresolvedSchemaList = this.putSchemaToMap(unresolvedSchemaList);
//log.info("unresolvedSchemaList: [" + unresolvedSchemaList.size() + "]");
}
}
private List<String> putSchemaToMap(List<String> jsonList) {
List<String> unresolvedSchemaList = new ArrayList<>();
for(String json : jsonList) {
try {
String completeSchema = resolveSchema(json);
Schema schema = new Schema.Parser().parse(completeSchema);
String name = schema.getFullName();
//log.info("schema: " + name + "\n" + schema.toString(true));
//log.info("\n");
schemas.put(name, schema);
}catch (RuntimeException e)
{
unresolvedSchemaList.add(json);
}
}
return unresolvedSchemaList;
}
private String fileToString(String filePath) {
try (InputStream inputStream = new ClassPathResource(filePath).getInputStream()) {
return IOUtils.toString(inputStream);
} catch (IOException ie) {
throw new RuntimeException(ie);
}
}
public Schema getSchema(String name) {
return schemas.get(name);
}
public Map<String, Schema> getSchemas() {
return schemas;
}
private String resolveSchema(String json) {
for (Map.Entry<String, Schema> entry : schemas.entrySet()) {
json = json.replaceAll("\"" + entry.getKey() + "\"", entry.getValue().toString());
}
return json;
}
}
This avro schema inheritance implementation reads the avro schema avsc files from the classpath and loads avro schema instances with avro schema name key.
There are avro avsc files in resources/META-INF/avro. Let’s see produce-request.avsc file:
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Produce Request",
"name":"ProduceRequest",
"fields":[
{
"name":"requestHeader",
"type":"io.shunters.coda.avro.api.RequestHeader"
},
{
"name":"requiredAcks",
"type":"int"
},
{
"name":"timeout",
"type":"int"
},
{
"name":"produceRequestMessageArray",
"type":{
"type":"array",
"items":{
"type":"record",
"name":"ProduceRequestMessage",
"namespace":"io.shunters.coda.avro.api",
"fields":[
{
"name":"topicName",
"type":"string"
},
{
"name":"produceRequestSubMessageArray",
"type":{
"type":"array",
"items":{
"type":"record",
"name":"ProduceRequestSubMessage",
"namespace":"io.shunters.coda.avro.api",
"fields":[
{
"name":"partition",
"type":"int"
},
{
"name":"records",
"type":"io.shunters.coda.avro.api.Records"
}
]
}
}
}
]
}
}
}
]
}
Take a looke at the type “io.shunters.coda.avro.api.RequestHeader” and “io.shunters.coda.avro.api.Records” which will be replaced with the following avro schema json string.
request-header.avsc for the type “io.shunters.coda.avro.api.RequestHeader”:
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Request Header",
"name":"RequestHeader",
"fields":[
{
"name":"correlationId",
"type":"int"
},
{
"name":"clientId",
"type":"string"
}
]
}
records.avsc file for the type “io.shunters.coda.avro.api.Records”:
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Record Array",
"name":"Records",
"fields":[
{
"name":"firstOffset",
"type":"long"
},
{
"name":"partitionLeaderEpoch",
"type":"int"
},
{
"name":"magic",
"type":"int"
},
{
"name":"crc",
"type":"int"
},
{
"name":"attributes",
"type":"int"
},
{
"name":"lastOffsetDelta",
"type":"int"
},
{
"name":"firstTimestamp",
"type":"long"
},
{
"name":"maxTimestamp",
"type":"long"
},
{
"name":"producerId",
"type":"long"
},
{
"name":"producerEpoch",
"type":"int"
},
{
"name":"firstSequence",
"type":"int"
},
{
"name":"records",
"type":{
"type":"array",
"items":"io.shunters.coda.avro.api.Record"
}
}
]
}
In this records.avsc file, the record type “io.shunters.coda.avro.api.Record” can be found which will be replaced with the corresponding avro schema json string like above.
Run Avro Schema Loader
Let’s invoke AvroSchemaLoader which is implemented here.
List<String> pathList = new ArrayList<>();
pathList.add("/META-INF/avro/request-header.avsc");
pathList.add("/META-INF/avro/record-header.avsc");
pathList.add("/META-INF/avro/record.avsc");
pathList.add("/META-INF/avro/records.avsc");
pathList.add("/META-INF/avro/produce-request.avsc");
AvroSchemaLoader avroSchemaLoader = AvroSchemaLoader.singletonForSchemaPaths((String[])pathList.toArray(new String[0]));
String schemaKey = "io.shunters.coda.avro.api.ProduceRequest";
Schema schema = avroSchemaLoader.getSchema(schemaKey);
System.out.println("schema key: [" + schemaKey + "]\n" + schema.toString(true));
All the classpath avro avsc files contained in the list will be loaded as avro schema instances which can be found by the key of avro schema name, for instance, “io.shunters.coda.avro.api.ProduceRequest”.
You can also use the classpath directory from which all the avro avsc files will be loaded as avro schema instances:
String pathDir = "/META-INF/avro";
AvroSchemaLoader avroSchemaLoader = AvroSchemaLoader.singleton(pathDir);
String schemaKey = "io.shunters.coda.avro.api.ProduceRequest";
Schema schema = avroSchemaLoader.getSchema(schemaKey);
log.info("schema key: [" + schemaKey + "]\n" + schema.toString(true));
Conclusion
Even though avro schema inheritance is not supported, the implementation of it can be done as mentioned above with which all the duplicated avro record schema string in the avsc files can be eliminated.