package backtype.storm.spout;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import net.lag.kestrel.thrift.Item;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.kestrel.KestrelServerSpec;
import org.openimaj.time.Timer;

/* loaded from: input_file:backtype/storm/spout/UnreliableKestrelThriftSpout.class */
public class UnreliableKestrelThriftSpout extends BaseRichSpout {
    private static final long serialVersionUID = -3531693744499668571L;
    private static final int HOLD_ITEMS = 1000;
    private static final int TIMEOUT = 100;
    private List<KestrelServerSpec> clients;
    private Scheme scheme;
    private Queue<EmitItem> tuples;
    private String queue;
    private int MAX_ITEMS_PER_QUEUE;
    private SpoutOutputCollector collector;
    private int port;
    private String ackQueue;
    private Iterator<KestrelThriftClient> ackIterator;
    private Timer ackTimer;
    private static final Logger logger = Logger.getLogger(UnreliableKestrelThriftSpout.class);
    private static final Gson gson = new GsonBuilder().create();
    int readTotal = 0;
    int acked = 0;
    private List<String> hosts = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/spout/UnreliableKestrelThriftSpout$EmitItem.class */
    public class EmitItem {
        public KestrelSourceId sourceId;
        public List<Object> tuple;

        public EmitItem(List<Object> list, KestrelSourceId kestrelSourceId) {
            this.tuple = list;
            this.sourceId = kestrelSourceId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/spout/UnreliableKestrelThriftSpout$KestrelSourceId.class */
    public static class KestrelSourceId {
        int index;
        long id;

        public KestrelSourceId(int i, long j) {
            this.index = i;
            this.id = j;
        }

        public String toString() {
            return String.format("{client:%s,id:%s}", Integer.valueOf(this.index), Long.valueOf(this.id));
        }
    }

    public UnreliableKestrelThriftSpout(List<KestrelServerSpec> list, Scheme scheme, String str) {
        this.scheme = scheme;
        this.queue = str;
        this.port = -1;
        for (KestrelServerSpec kestrelServerSpec : list) {
            this.hosts.add(kestrelServerSpec.host);
            this.port = kestrelServerSpec.port;
        }
        this.ackQueue = null;
    }

    public void setAckQueue(String str) {
        this.ackQueue = str;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.clients = new ArrayList();
        Iterator<String> it = this.hosts.iterator();
        while (it.hasNext()) {
            this.clients.add(new KestrelServerSpec(it.next(), this.port));
        }
        this.MAX_ITEMS_PER_QUEUE = HOLD_ITEMS / this.clients.size();
        this.tuples = new LinkedList();
        this.ackIterator = KestrelServerSpec.thriftClientIterator(this.clients);
    }

    public void close() {
        Iterator<KestrelServerSpec> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void nextTuple() {
        getSomeMoreTuples();
        if (this.tuples.size() == 0) {
            Utils.sleep(10L);
        } else {
            EmitItem poll = this.tuples.poll();
            this.collector.emit(poll.tuple, poll.sourceId);
        }
    }

    private void getSomeMoreTuples() {
        if (this.tuples.size() > HOLD_ITEMS - (this.MAX_ITEMS_PER_QUEUE / 2)) {
            return;
        }
        int i = 0;
        Iterator<KestrelServerSpec> it = this.clients.iterator();
        while (it.hasNext()) {
            try {
                KestrelThriftClient validClient = it.next().getValidClient();
                List<Item> list = validClient.get(this.queue, this.MAX_ITEMS_PER_QUEUE, TIMEOUT, TIMEOUT * this.MAX_ITEMS_PER_QUEUE);
                this.readTotal += list.size();
                logger.debug("Read total: " + this.readTotal);
                HashSet hashSet = new HashSet();
                for (Item item : list) {
                    long j = item.get_id();
                    hashSet.add(Long.valueOf(j));
                    List deserialize = this.scheme.deserialize(item.get_data());
                    if (deserialize != null) {
                        this.tuples.add(new EmitItem(deserialize, new KestrelSourceId(i, j)));
                    }
                }
                validClient.confirm(this.queue, hashSet);
            } catch (TException e) {
            }
            i++;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.scheme.getOutputFields());
    }

    public void ack(Object obj) {
        if (this.acked == 0) {
            this.ackTimer = Timer.timer();
        }
        this.acked++;
        if (this.acked % HOLD_ITEMS == 0) {
            emitToAckQueue();
        }
    }

    private void emitToAckQueue() {
        logger.debug("Acked: " + this.acked);
        float duration = this.acked / (((float) this.ackTimer.duration()) / 1000.0f);
        if (this.ackQueue != null) {
            try {
                getNextValidClient().put(this.ackQueue, gson.toJson(new AckStats(duration)), 0);
            } catch (TException e) {
                logger.error("Failed to write acknowledgement");
            }
        }
    }

    private KestrelThriftClient getNextValidClient() {
        return this.ackIterator.next();
    }

    public void fail(Object obj) {
        logger.debug("Failing: " + ((KestrelSourceId) obj));
    }
}
