package backtype.storm.spout;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import net.lag.kestrel.thrift.Item;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

/* loaded from: input_file:backtype/storm/spout/KestrelThriftSpout.class */
public class KestrelThriftSpout extends BaseRichSpout {
    public static Logger LOG;
    public static final long BLACKLIST_TIME_MS = 60000;
    public static final int BATCH_SIZE = 4000;
    private List<String> _hosts;
    private int _port;
    private String _queueName;
    private SpoutOutputCollector _collector;
    private Scheme _scheme;
    private List<KestrelClientInfo> _kestrels;
    private int _emitIndex;
    private Queue<EmitItem> _emitBuffer;
    int _messageTimeoutMillis;
    int countTriples;
    int lastEmit;
    int emptyIterations;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/spout/KestrelThriftSpout$EmitItem.class */
    public class EmitItem {
        public KestrelSourceId sourceId;
        public List<Object> tuple;

        public EmitItem(List<Object> list, KestrelSourceId kestrelSourceId) {
            this.tuple = list;
            this.sourceId = kestrelSourceId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/spout/KestrelThriftSpout$KestrelClientInfo.class */
    public static class KestrelClientInfo {
        public String host;
        public int port;
        public Long blacklistTillTimeMs = 0L;
        private KestrelThriftClient client = null;

        public KestrelClientInfo(String str, int i) {
            this.host = str;
            this.port = i;
        }

        public KestrelThriftClient getValidClient() throws TException {
            if (this.client == null) {
                KestrelThriftSpout.LOG.info("Attempting reconnect to kestrel " + this.host + ":" + this.port);
                this.client = new KestrelThriftClient(this.host, this.port);
            }
            return this.client;
        }

        public void closeClient() {
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/spout/KestrelThriftSpout$KestrelSourceId.class */
    public static class KestrelSourceId {
        int index;
        long id;

        public KestrelSourceId(int i, long j) {
            this.index = i;
            this.id = j;
        }
    }

    public KestrelThriftSpout(List<String> list, int i, String str, Scheme scheme) {
        this._hosts = null;
        this._port = -1;
        this._queueName = null;
        this._emitBuffer = new LinkedList();
        this.countTriples = 1;
        this.lastEmit = this.countTriples;
        this.emptyIterations = 0;
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Must configure at least one host");
        }
        this._port = i;
        this._hosts = list;
        this._queueName = str;
        this._scheme = scheme;
    }

    public KestrelThriftSpout(String str, int i, String str2, Scheme scheme) {
        this((List<String>) Arrays.asList(str), i, str2, scheme);
    }

    public KestrelThriftSpout(String str, int i, String str2) {
        this(str, i, str2, (Scheme) new RawScheme());
    }

    public KestrelThriftSpout(List<String> list, int i, String str) {
        this(list, i, str, (Scheme) new RawScheme());
    }

    public Fields getOutputFields() {
        return this._scheme.getOutputFields();
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._messageTimeoutMillis = 1000 * ((Number) map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        this._collector = spoutOutputCollector;
        this._emitIndex = 0;
        this._kestrels = new ArrayList();
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        int thisTaskIndex = topologyContext.getThisTaskIndex();
        int size2 = this._hosts.size();
        if (size >= size2) {
            this._kestrels.add(new KestrelClientInfo(this._hosts.get(thisTaskIndex % size2), this._port));
        } else {
            Iterator<String> it = this._hosts.iterator();
            while (it.hasNext()) {
                this._kestrels.add(new KestrelClientInfo(it.next(), this._port));
            }
        }
    }

    public void close() {
        Iterator<KestrelClientInfo> it = this._kestrels.iterator();
        while (it.hasNext()) {
            it.next().closeClient();
        }
        this._emitBuffer.clear();
        this._kestrels.clear();
    }

    public boolean bufferKestrelGet(int i) {
        if (!$assertionsDisabled && this._emitBuffer.size() != 0) {
            throw new AssertionError();
        }
        KestrelClientInfo kestrelClientInfo = this._kestrels.get(i);
        if (System.currentTimeMillis() <= kestrelClientInfo.blacklistTillTimeMs.longValue()) {
            return false;
        }
        try {
            List<Item> list = kestrelClientInfo.getValidClient().get(this._queueName, BATCH_SIZE, 0, this._messageTimeoutMillis);
            if (!$assertionsDisabled && list.size() > 4000) {
                throw new AssertionError();
            }
            HashSet hashSet = new HashSet();
            for (Item item : list) {
                List deserialize = this._scheme.deserialize(item.get_data());
                if (deserialize != null) {
                    if (!this._emitBuffer.offer(new EmitItem(deserialize, new KestrelSourceId(i, item.get_id())))) {
                        throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
                    }
                } else {
                    hashSet.add(Long.valueOf(item.get_id()));
                }
            }
            if (hashSet.size() > 0) {
                try {
                    kestrelClientInfo.client.confirm(this._queueName, hashSet);
                } catch (TException e) {
                    blacklist(kestrelClientInfo, e);
                }
            }
            return list.size() > 0;
        } catch (TException e2) {
            LOG.error("Error reading from client: " + e2.getMessage());
            blacklist(kestrelClientInfo, e2);
            return false;
        }
    }

    public void tryEachKestrelUntilBufferFilled() {
        int i = 0;
        while (true) {
            if (i >= this._kestrels.size()) {
                break;
            }
            int size = (this._emitIndex + i) % this._kestrels.size();
            if (bufferKestrelGet(size)) {
                this._emitIndex = size;
                break;
            }
            i++;
        }
        this._emitIndex = (this._emitIndex + 1) % this._kestrels.size();
    }

    public void nextTuple() {
        if (this._emitBuffer.isEmpty()) {
            tryEachKestrelUntilBufferFilled();
        }
        if (this.countTriples % 1000 == 0 && this.countTriples != this.lastEmit) {
            LOG.debug("Number of triples emitted: " + this.countTriples);
            LOG.debug("Number of empty iterations: " + this.emptyIterations);
            this.emptyIterations = 0;
            this.lastEmit = this.countTriples;
        }
        EmitItem poll = this._emitBuffer.poll();
        if (poll != null) {
            this.countTriples++;
            this._collector.emit(poll.tuple, poll.sourceId);
        } else {
            this.emptyIterations++;
            Utils.sleep(10L);
        }
    }

    private void blacklist(KestrelClientInfo kestrelClientInfo, Throwable th) {
        kestrelClientInfo.closeClient();
        kestrelClientInfo.blacklistTillTimeMs = Long.valueOf(System.currentTimeMillis() + BLACKLIST_TIME_MS);
        int indexOf = this._kestrels.indexOf(kestrelClientInfo);
        Iterator<EmitItem> it = this._emitBuffer.iterator();
        while (it.hasNext()) {
            if (it.next().sourceId.index == indexOf) {
                it.remove();
            }
        }
    }

    public void ack(Object obj) {
        KestrelSourceId kestrelSourceId = (KestrelSourceId) obj;
        KestrelClientInfo kestrelClientInfo = this._kestrels.get(kestrelSourceId.index);
        try {
            if (kestrelClientInfo.client != null) {
                HashSet hashSet = new HashSet();
                hashSet.add(Long.valueOf(kestrelSourceId.id));
                kestrelClientInfo.client.confirm(this._queueName, hashSet);
            }
        } catch (TException e) {
            blacklist(kestrelClientInfo, e);
        }
    }

    public void fail(Object obj) {
        KestrelSourceId kestrelSourceId = (KestrelSourceId) obj;
        KestrelClientInfo kestrelClientInfo = this._kestrels.get(kestrelSourceId.index);
        try {
            if (kestrelClientInfo.client != null) {
                HashSet hashSet = new HashSet();
                hashSet.add(Long.valueOf(kestrelSourceId.id));
                kestrelClientInfo.client.abort(this._queueName, hashSet);
            }
        } catch (TException e) {
            blacklist(kestrelClientInfo, e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(getOutputFields());
    }

    static {
        $assertionsDisabled = !KestrelThriftSpout.class.desiredAssertionStatus();
        LOG = Logger.getLogger(KestrelThriftSpout.class);
    }
}
