package databus_apicurio_testing.examples;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.utils.serde.JsonSchemaKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaSerDeConstants;

 * This example demonstrates how to use the Apicurio Registry in a very simple publish/subscribe
 * scenario with JSON as the serialization type (and JSON Schema for validation).  Because JSON
 * Schema is only used for validation (not actual serialization), it can be enabled and disabled
 * without affecting the functionality of the serializers and deserializers.  However, if
 * validation is disabled, then incorrect data could be consumed incorrectly.
 * The following aspects are demonstrated:
 * <ol>
 *   <li>Configuring a Kafka Serializer for use with Apicurio Registry</li>
 *   <li>Configuring a Kafka Deserializer for use with Apicurio Registry</li>
 *   <li>Data sent as a MessageBean</li>
 *   <li>Data consumed as a MessageBean</li>
 * </ol>
 * Pre-requisites:
 * <ul>
 *   <li>Kafka must be running on localhost:9092</li>
 *   <li>Apicurio Registry must be running on localhost:8080</li>
 * </ul>
public class CloudEventJsonSchemaExample2 {

    //private static final String REGISTRY_URL = "http://localhost:8080/api";
    //private static final String SERVERS = "localhost:9092";
    //private static final String TOPIC_NAME = "InventoryEvents2";
    private static final String SUBJECT_NAME = "SampleInventoryEvent";
    public static final void main(String [] args) throws Exception {
        if (args.length < 5) {
	        System.out.println("Usage: CloudEventJsonSchemaExample2 <bootstrap_server> <registry_url> <topic_name> <truststore_location> <truststore_password>");
        System.out.println("Starting example " + CloudEventJsonSchemaExample2.class.getSimpleName());
        String topicName = args[2];
        String subjectName = SUBJECT_NAME;
        // Create the producer.
        Producer<Object, Object> producer = createKafkaProducer(args[0],args[1],args[2],args[3],args[4]);
        // Produce 5 messages.
        int producedMessages = 0;
        try {
            System.out.println("Producing (5) messages.");
            for (int idx = 0; idx < 5; idx++) {
                // Create the message to send
            		EntitySpecification entitySpecification = new EntitySpecification();
            	    InventoryUpdateRequest inventoryUpdateRequest = new InventoryUpdateRequest();
                CloudEventBean message = new CloudEventBean();

                // Send/produce the message on the Kafka Producer
                ProducerRecord<Object, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, message);
                System.out.println("ProducerRecord = "+producedRecord);
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if(e != null) {
                                } else {
                                   System.out.println("The offset of the record we just sent is: " + metadata.offset());

                System.out.println("Messages produced :"+message.getData());
            System.out.println("Messages successfully produced.");
        } catch(Exception e) {
        finally {
            System.out.println("Closing the producer.");

        // Create the consumer
        System.out.println("Creating the consumer.");
        KafkaConsumer<Long, CloudEventBean> consumer = createKafkaConsumer(args[0],args[1],args[2],args[3],args[4]);

        // Subscribe to the topic
        System.out.println("Subscribing to topic " + topicName);

        // Consume the 5 messages.
        try {
            int messageCount = 0;
            System.out.println("Consuming (5) messages.");
            while (messageCount < 5) {
                final ConsumerRecords<Long, CloudEventBean> records = consumer.poll(Duration.ofSeconds(1));
                messageCount += records.count();
                if (records.count() == 0) {
                    // Do nothing - no messages waiting.
                    System.out.println("No messages waiting...");
                } else records.forEach(record -> {
                		CloudEventBean msg = record.value();
                		System.out.println("Consumed a message: " + msg.getSource());
        } finally {

        System.out.println("Done (success).");

     * Creates the Kafka producer.
    private static Producer<Object, Object> createKafkaProducer(String SERVERS,String REGISTRY_URL,String TOPIC_NAME,String TRUSTSTORE_LOCATION,String TRUSTSTORE_PASSWORD) {
        Properties props = new Properties();

        // Configure the following three settings for SSL Encryption
        props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        // Configure kafka settings
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "InventoryProducer-" + TOPIC_NAME);
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // Use the Apicurio Registry provided Kafka Serializer for JSON Schema
        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName());
        // Configure Apicurio Registry location
        props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);
        // Map the topic name (plus -key/value) to the artifactId in the registry

        // Get an existing schema or auto-register if not found
        props.putIfAbsent(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, "true");

        // Create the Kafka producer
        Producer<Object, Object> producer = new KafkaProducer<Object, Object>(props);
        return producer;

     * Creates the Kafka consumer.
    private static KafkaConsumer<Long, CloudEventBean> createKafkaConsumer(String SERVERS,String REGISTRY_URL,String TOPIC_NAME,String TRUSTSTORE_LOCATION,String TRUSTSTORE_PASSWORD) {
        Properties props = new Properties();

        // Configure the following three settings for SSL Encryption
        props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        // Configure Kafka
        props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "InventoryConsumer-" + TOPIC_NAME);
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Use the Apicurio Registry provided Kafka Deserializer for JSON Schema
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName());
        // Configure Apicurio Registry location
        props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);
        props.putIfAbsent(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, "true");

        // Create the Kafka Consumer
        KafkaConsumer<Long, CloudEventBean> consumer = new KafkaConsumer<Long, CloudEventBean>(props);
        return consumer;

