package org.openimaj.kestrel;

import backtype.storm.spout.KestrelThriftClient;
import com.hp.hpl.jena.graph.Triple;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;
import org.openimaj.rdf.storm.eddying.topology.builder.ExampleEddySteMTopologyBuilder;
import org.openimaj.util.parallel.Operation;
import org.openimaj.util.parallel.Parallel;
import org.openjena.atlas.lib.Sink;
import org.openjena.riot.RiotReader;

/* loaded from: input_file:org/openimaj/kestrel/KestrelTupleWriter.class */
public abstract class KestrelTupleWriter implements Sink<Triple> {
    protected static final Logger logger = Logger.getLogger(KestrelTupleWriter.class);
    private List<InputStream> tripleSources;
    private String[] queues;
    private List<KestrelThriftClient> clients;
    private ArrayList<Triple> tripleCache;
    private int tripleCacheSizeLimit;
    int currentIndex;

    public KestrelTupleWriter(URL url) throws IOException {
        this.currentIndex = 0;
        this.tripleSources.add(url.openStream());
    }

    public KestrelTupleWriter(InputStream inputStream) throws IOException {
        this.currentIndex = 0;
        this.tripleSources.add(inputStream);
    }

    public KestrelTupleWriter(ArrayList<URL> arrayList) throws IOException {
        this.currentIndex = 0;
        this.tripleSources = new ArrayList();
        Iterator<URL> it = arrayList.iterator();
        while (it.hasNext()) {
            this.tripleSources.add(it.next().openStream());
        }
    }

    public void write(List<KestrelServerSpec> list, String... strArr) throws TException, IOException {
        logger.debug("Opening kestrel client");
        this.clients = new ArrayList();
        for (KestrelServerSpec kestrelServerSpec : list) {
            this.clients.add(new KestrelThriftClient(kestrelServerSpec.host, kestrelServerSpec.port));
        }
        this.queues = strArr;
        this.tripleCache = new ArrayList<>();
        this.tripleCacheSizeLimit = ExampleEddySteMTopologyBuilder.STEMSIZE;
        Parallel.forEach(this.tripleSources, new Operation<InputStream>() { // from class: org.openimaj.kestrel.KestrelTupleWriter.1
            public void perform(InputStream inputStream) {
                RiotReader.createParserNTriples(inputStream, KestrelTupleWriter.this).parse();
                KestrelTupleWriter.logger.debug("Finished parsing");
            }
        });
        close();
    }

    public void close() {
        flushTripleCache();
        Iterator<KestrelThriftClient> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void send(Triple triple) {
        this.tripleCache.add(triple);
        if (this.tripleCache.size() >= this.tripleCacheSizeLimit) {
            flushTripleCache();
        }
    }

    private void flushTripleCache() {
        send((List<Triple>) this.tripleCache);
        this.tripleCache.clear();
    }

    public abstract void send(List<Triple> list);

    public void flush() {
        logger.debug("Queue flushed");
    }

    public KestrelThriftClient getNextClient() {
        KestrelThriftClient kestrelThriftClient = this.clients.get(this.currentIndex);
        this.currentIndex++;
        if (this.currentIndex == this.clients.size()) {
            this.currentIndex = 0;
        }
        return kestrelThriftClient;
    }

    public String[] getQueues() {
        return this.queues;
    }
}
