package org.openimaj.picslurper;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.openimaj.twitter.collection.StreamJSONStatusList;

/* loaded from: input_file:org/openimaj/picslurper/LocalTweetSpout.class */
public abstract class LocalTweetSpout implements IRichSpout {
    private static final long serialVersionUID = 202196766956713428L;
    private static boolean isStreamsFinished = false;
    private static boolean isProcessingFinished = false;
    private Iterator<StreamJSONStatusList.ReadableWritableJSON> iterator;
    private Set<Object> waitingFor = new HashSet();
    protected SpoutOutputCollector collector;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    public void close() {
    }

    public void activate() {
    }

    public void deactivate() {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public void nextTuple() {
        StreamJSONStatusList.ReadableWritableJSON nextTweet = nextTweet();
        if (nextTweet == null) {
            return;
        }
        Object obj = nextTweet.get("id");
        this.collector.emit(Arrays.asList(nextTweet), obj);
        addWaitingFor(obj);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"tweet"}));
    }

    protected StreamJSONStatusList.ReadableWritableJSON nextTweet() {
        if (this.iterator != null && this.iterator.hasNext()) {
            return this.iterator.next();
        }
        try {
            InputStream nextInputStream = nextInputStream();
            if (nextInputStream != null) {
                this.iterator = StreamJSONStatusList.read(nextInputStream).iterator();
                return this.iterator.next();
            }
            setStreamFinished(true);
            if (this.waitingFor.size() == 0) {
                setProcessingFinished(true);
            }
            this.iterator = null;
            return null;
        } catch (Exception e) {
            return nextTweet();
        }
    }

    private static synchronized void setStreamFinished(boolean z) {
        isStreamsFinished = z;
    }

    private static synchronized void setProcessingFinished(boolean z) {
        isProcessingFinished = z;
    }

    protected abstract InputStream nextInputStream() throws Exception;

    public static synchronized boolean isFinished() {
        return isStreamsFinished && isProcessingFinished;
    }

    protected void addWaitingFor(Object obj) {
        this.waitingFor.add(obj);
        setProcessingFinished(false);
    }

    public void ack(Object obj) {
        this.waitingFor.remove(obj);
        if (this.waitingFor.size() == 0) {
            setProcessingFinished(true);
        }
    }

    public void fail(Object obj) {
        ack(obj);
    }
}
