/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.rabbitmq;

import com.alibaba.fastjson2.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.client.ConsumerBatchMessage;
import com.alibaba.otter.canal.client.rabbitmq.AliyunCredentialsProvider;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.CredentialsProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQCanalConnector
implements CanalMQConnector {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQCanalConnector.class);
    private String nameServer;
    private String vhost;
    private String queueName;
    private String accessKey;
    private String secretKey;
    private Long resourceOwnerId;
    private String username;
    private String password;
    private boolean flatMessage;
    private Connection connect;
    private Channel channel;
    private long batchProcessTimeout = 60000L;
    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
    private volatile ConsumerBatchMessage lastGetBatchMessage = null;

    public RabbitMQCanalConnector(String nameServer, String vhost, String queueName, String accessKey, String secretKey, String username, String password, Long resourceOwnerId, boolean flatMessage) {
        this.nameServer = nameServer;
        this.vhost = vhost;
        this.queueName = queueName;
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.username = username;
        this.password = password;
        this.resourceOwnerId = resourceOwnerId;
        this.flatMessage = flatMessage;
        this.messageBlockingQueue = new LinkedBlockingQueue<ConsumerBatchMessage>(1024);
    }

    @Override
    public void connect() throws CanalClientException {
        ConnectionFactory factory = new ConnectionFactory();
        if (this.accessKey.length() > 0 && this.secretKey.length() > 0) {
            factory.setCredentialsProvider((CredentialsProvider)new AliyunCredentialsProvider(this.accessKey, this.secretKey, this.resourceOwnerId));
        } else {
            factory.setUsername(this.username);
            factory.setPassword(this.password);
        }
        factory.setHost(this.nameServer);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(this.vhost);
        try {
            this.connect = factory.newConnection();
            this.channel = this.connect.createChannel();
        }
        catch (IOException | TimeoutException e) {
            throw new CanalClientException("Start RabbitMQ producer error", (Throwable)e);
        }
    }

    @Override
    public void disconnect() throws CanalClientException {
        if (this.connect != null) {
            try {
                this.connect.close();
            }
            catch (IOException e) {
                throw new CanalClientException("stop connect error", (Throwable)e);
            }
        }
        if (this.channel != null) {
            try {
                this.channel.close();
            }
            catch (IOException | TimeoutException e) {
                throw new CanalClientException("stop channel error", (Throwable)e);
            }
        }
    }

    @Override
    public boolean checkValid() throws CanalClientException {
        return true;
    }

    @Override
    public void subscribe(String filter) throws CanalClientException {
        if (this.connect == null) {
            this.connect();
        }
        DefaultConsumer consumer = new DefaultConsumer(this.channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (body != null) {
                    RabbitMQCanalConnector.this.channel.basicAck(envelope.getDeliveryTag(), RabbitMQCanalConnector.this.process(body));
                }
            }
        };
        try {
            this.channel.basicConsume(this.queueName, false, (Consumer)consumer);
        }
        catch (IOException e) {
            throw new CanalClientException("error", (Throwable)e);
        }
    }

    @Override
    public void subscribe() throws CanalClientException {
        this.subscribe(null);
    }

    @Override
    public void unsubscribe() throws CanalClientException {
        this.disconnect();
    }

    @Override
    public Message get(int batchSize) throws CanalClientException {
        return null;
    }

    @Override
    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        return null;
    }

    @Override
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void ack(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void rollback(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<Message> messages = this.getListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage batchMessage = this.messageBlockingQueue.poll(timeout, unit);
            if (batchMessage != null) {
                this.lastGetBatchMessage = batchMessage;
                return batchMessage.getData();
            }
        }
        catch (InterruptedException ex) {
            logger.warn("Get message timeout", (Throwable)ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }

    @Override
    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<FlatMessage> messages = this.getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage batchMessage = this.messageBlockingQueue.poll(timeout, unit);
            if (batchMessage != null) {
                this.lastGetBatchMessage = batchMessage;
                return batchMessage.getData();
            }
        }
        catch (InterruptedException ex) {
            logger.warn("Get message timeout", (Throwable)ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }

    private boolean process(byte[] messageData) {
        boolean isCompleted;
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", (Object)new String(messageData));
        }
        ArrayList<Object> messageList = new ArrayList<Object>();
        if (!this.flatMessage) {
            Message message = CanalMessageDeserializer.deserializer(messageData);
            messageList.add(message);
        } else {
            FlatMessage flatMessage = (FlatMessage)JSON.parseObject((byte[])messageData, FlatMessage.class);
            messageList.add(flatMessage);
        }
        ConsumerBatchMessage batchMessage = !this.flatMessage ? new ConsumerBatchMessage(messageList) : new ConsumerBatchMessage(messageList);
        try {
            this.messageBlockingQueue.put(batchMessage);
        }
        catch (InterruptedException e) {
            logger.error("Put message to queue error", (Throwable)e);
            throw new RuntimeException(e);
        }
        try {
            isCompleted = batchMessage.waitFinish(this.batchProcessTimeout);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted when waiting messages to be finished.", (Throwable)e);
            throw new RuntimeException(e);
        }
        boolean isSuccess = batchMessage.isSuccess();
        return isCompleted && isSuccess;
    }

    @Override
    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        }
        catch (Throwable e) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override
    public void rollback() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }
}

