/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALEditsReplay;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class WALEditsReplaySink {
    private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
    private static final int MAX_BATCH_SIZE = 1024;
    private final Configuration conf;
    private final ClusterConnection conn;
    private final TableName tableName;
    private final MetricsWALEditsReplay metrics;
    private final AtomicLong totalReplayedEdits = new AtomicLong();
    private final boolean skipErrors;
    private final int replayTimeout;
    private final RpcControllerFactory rpcControllerFactory;

    public WALEditsReplaySink(Configuration conf, TableName tableName, ClusterConnection conn) throws IOException {
        this.conf = conf;
        this.metrics = new MetricsWALEditsReplay();
        this.conn = conn;
        this.tableName = tableName;
        this.skipErrors = conf.getBoolean("hbase.hregion.edits.replay.skip.errors", false);
        this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
        this.rpcControllerFactory = RpcControllerFactory.instantiate((Configuration)conf);
    }

    public void replayEntries(List<Pair<HRegionLocation, WAL.Entry>> entries) throws IOException {
        if (entries.isEmpty()) {
            return;
        }
        int batchSize = entries.size();
        HashMap entriesByRegion = new HashMap();
        HRegionLocation loc = null;
        WAL.Entry entry = null;
        List<WAL.Entry> regionEntries = null;
        for (int i = 0; i < batchSize; ++i) {
            loc = (HRegionLocation)entries.get(i).getFirst();
            entry = (WAL.Entry)entries.get(i).getSecond();
            if (entriesByRegion.containsKey(loc.getRegionInfo())) {
                regionEntries = (List)entriesByRegion.get(loc.getRegionInfo());
            } else {
                regionEntries = new ArrayList();
                entriesByRegion.put(loc.getRegionInfo(), regionEntries);
            }
            regionEntries.add(entry);
        }
        long startTime = EnvironmentEdgeManager.currentTime();
        for (Map.Entry _entry : entriesByRegion.entrySet()) {
            HRegionInfo curRegion = (HRegionInfo)_entry.getKey();
            List allActions = (List)_entry.getValue();
            int totalActions = allActions.size();
            int curBatchSize = 0;
            for (int replayedActions = 0; replayedActions < totalActions; replayedActions += curBatchSize) {
                curBatchSize = totalActions > 1024 + replayedActions ? 1024 : totalActions - replayedActions;
                this.replayEdits(loc, curRegion, allActions.subList(replayedActions, replayedActions + curBatchSize));
            }
        }
        long endTime = EnvironmentEdgeManager.currentTime() - startTime;
        LOG.debug((Object)("number of rows:" + entries.size() + " are sent by batch! spent " + endTime + "(ms)!"));
        this.metrics.updateReplayTime(endTime);
        this.metrics.updateReplayBatchSize(batchSize);
        this.totalReplayedEdits.addAndGet(batchSize);
    }

    public String getStats() {
        return this.totalReplayedEdits.get() == 0L ? "" : "Sink: total replayed edits: " + this.totalReplayedEdits;
    }

    private void replayEdits(HRegionLocation regionLoc, HRegionInfo regionInfo, List<WAL.Entry> entries) throws IOException {
        try {
            RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate((Configuration)this.conf, null);
            ReplayServerCallable callable = new ReplayServerCallable((Connection)this.conn, this.rpcControllerFactory, this.tableName, regionLoc, entries);
            factory.newCaller().callWithRetries(callable, this.replayTimeout);
        }
        catch (IOException ie) {
            if (this.skipErrors) {
                LOG.warn((Object)("hbase.hregion.edits.replay.skip.errors=true so continuing replayEdits with error:" + ie.getMessage()));
            }
            throw ie;
        }
    }

    class ReplayServerCallable<R>
    extends ClientServiceCallable<AdminProtos.ReplicateWALEntryResponse> {
        private List<WAL.Entry> entries;

        ReplayServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation regionLoc, List<WAL.Entry> entries) {
            super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY, (RpcController)rpcControllerFactory.newController(), -1);
            this.entries = entries;
            this.setLocation(regionLoc);
        }

        protected AdminProtos.ReplicateWALEntryResponse rpcCall() throws Exception {
            if (this.entries.isEmpty()) {
                return null;
            }
            WAL.Entry[] entriesArray = new WAL.Entry[this.entries.size()];
            entriesArray = this.entries.toArray(entriesArray);
            AdminProtos.AdminService.BlockingInterface remoteSvr = WALEditsReplaySink.this.conn.getAdmin(this.getLocation().getServerName());
            Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
            this.setRpcControllerCellScanner((CellScanner)p.getSecond());
            return remoteSvr.replay(this.getRpcController(), (AdminProtos.ReplicateWALEntryRequest)p.getFirst());
        }

        public void prepare(boolean reload) throws IOException {
            if (!reload) {
                return;
            }
            boolean skip = false;
            for (WAL.Entry entry : this.entries) {
                WALEdit edit = entry.getEdit();
                ArrayList<Cell> cells = edit.getCells();
                Iterator iterator = cells.iterator();
                if (iterator.hasNext()) {
                    Cell cell = (Cell)iterator.next();
                    this.setLocation(WALEditsReplaySink.this.conn.locateRegion(WALEditsReplaySink.this.tableName, CellUtil.cloneRow((Cell)cell)));
                    skip = true;
                }
                if (!skip) continue;
                break;
            }
        }
    }
}

