package org.openimaj.hadoop.tools.twitter.token.mode.dfidf;

import com.jayway.jsonpath.Filter;
import com.jayway.jsonpath.JsonPath;
import gnu.trove.map.hash.TObjectIntHashMap;
import gnu.trove.procedure.TLongObjectProcedure;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.joda.time.DateTime;
import org.kohsuke.args4j.CmdLineException;
import org.openimaj.hadoop.mapreduce.stage.StageProvider;
import org.openimaj.hadoop.mapreduce.stage.helper.TextLongByteStage;
import org.openimaj.hadoop.tools.HadoopToolsUtil;
import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet;
import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType;
import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter;
import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder;
import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
import org.openimaj.io.IOUtils;
import org.openimaj.twitter.USMFStatus;

/* loaded from: input_file:org/openimaj/hadoop/tools/twitter/token/mode/dfidf/CountTweetsInTimeperiod.class */
public class CountTweetsInTimeperiod extends StageProvider {
    private String[] nonHadoopArgs;
    private boolean inmemoryCombine;
    private boolean buildTimeIndex;
    private long timedelta;
    public static final String TIMECOUNT_DIR = "timeperiodTweet";
    public static final String TIMEINDEX_FILE = "timeindex";
    public static final String GLOBAL_STATS_FILE = "globalstats";
    private static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timedelta";
    public static final String TIMEINDEX_LOCATION_PROP = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timeindex";

    /* loaded from: input_file:org/openimaj/hadoop/tools/twitter/token/mode/dfidf/CountTweetsInTimeperiod$InMemoryCombiningReducer.class */
    public static class InMemoryCombiningReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
        protected void reduce(LongWritable longWritable, Iterable<BytesWritable> iterable, Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            TweetCountWordMap tweetCountWordMap = new TweetCountWordMap();
            Iterator<BytesWritable> it = iterable.iterator();
            while (it.hasNext()) {
                tweetCountWordMap.combine(IOUtils.read(new ByteArrayInputStream(it.next().getBytes()), TweetCountWordMap.class));
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            IOUtils.writeBinary(byteArrayOutputStream, tweetCountWordMap);
            context.write(longWritable, new BytesWritable(byteArrayOutputStream.toByteArray()));
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((LongWritable) obj, (Iterable<BytesWritable>) iterable, (Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/openimaj/hadoop/tools/twitter/token/mode/dfidf/CountTweetsInTimeperiod$Map.class */
    public static class Map extends Mapper<LongWritable, Text, LongWritable, BytesWritable> {
        public static final LongWritable END_TIME = new LongWritable(-1);
        public static final LongWritable TOTAL_TIME = new LongWritable(-2);
        private HadoopTwitterTokenToolOptions options;
        private long timeDeltaMillis;
        private JsonPath jsonPath;
        private JsonPathFilterSet filters;
        private HashMap<Long, TweetCountWordMap> tweetWordMap;

        protected synchronized void loadOptions(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException {
            if (this.options == null) {
                try {
                    this.options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings("TOKEN_ARGS"));
                    this.options.prepare();
                    this.filters = this.options.getFilters();
                    this.timeDeltaMillis = context.getConfiguration().getLong(CountTweetsInTimeperiod.TIMEDELTA, 60L) * 60 * 1000;
                    this.jsonPath = JsonPath.compile(this.options.getJsonPath(), new Filter[0]);
                } catch (CmdLineException e) {
                    throw new IOException((Throwable) e);
                } catch (Exception e2) {
                    throw new IOException(e2);
                }
            }
        }

        protected void setup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            loadOptions(context);
            this.tweetWordMap = new HashMap<>();
        }

        protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            try {
                String text2 = text.toString();
                USMFStatus uSMFStatus = new USMFStatus(this.options.getStatusType().type());
                uSMFStatus.fillFromString(text2);
                if (!uSMFStatus.isInvalid() && this.filters.filter(text2)) {
                    List<String> list = (List) this.jsonPath.read(text2);
                    if (list == null) {
                        context.getCounter(TextEntryType.INVALID_JSON).increment(1L);
                        return;
                    }
                    if (list.size() == 0) {
                        context.getCounter(TextEntryType.INVALID_ZEROLENGTH).increment(1L);
                        return;
                    }
                    DateTime createdAt = uSMFStatus.createdAt();
                    if (createdAt == null) {
                        context.getCounter(TextEntryType.INVALID_TIME).increment(1L);
                        return;
                    }
                    long millis = (createdAt.getMillis() / this.timeDeltaMillis) * this.timeDeltaMillis;
                    TweetCountWordMap tweetCountWordMap = this.tweetWordMap.get(Long.valueOf(millis));
                    if (tweetCountWordMap == null) {
                        HashMap<Long, TweetCountWordMap> hashMap = this.tweetWordMap;
                        Long valueOf = Long.valueOf(millis);
                        TweetCountWordMap tweetCountWordMap2 = new TweetCountWordMap();
                        tweetCountWordMap = tweetCountWordMap2;
                        hashMap.put(valueOf, tweetCountWordMap2);
                    }
                    TObjectIntHashMap tweetWordMap = tweetCountWordMap.getTweetWordMap();
                    tweetCountWordMap.incrementTweetCount(1);
                    ArrayList arrayList = new ArrayList();
                    for (String str : list) {
                        if (!arrayList.contains(str)) {
                            arrayList.add(str);
                            tweetWordMap.adjustOrPutValue(str, 1, 1);
                        }
                    }
                    context.getCounter(TextEntryType.VALID).increment(1L);
                }
            } catch (Exception e) {
            }
        }

        protected void cleanup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            System.out.println("Cleaing up mapper, seen " + this.tweetWordMap.entrySet().size() + " time slots");
            for (Map.Entry<Long, TweetCountWordMap> entry : this.tweetWordMap.entrySet()) {
                Long key = entry.getKey();
                TweetCountWordMap value = entry.getValue();
                System.out.println("... time( " + key + ") seen " + value.getTweetWordMap().size() + " words");
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                IOUtils.writeBinary(byteArrayOutputStream, value);
                BytesWritable bytesWritable = new BytesWritable(byteArrayOutputStream.toByteArray());
                context.write(END_TIME, bytesWritable);
                context.write(new LongWritable(key.longValue()), bytesWritable);
                context.getCounter(TextEntryType.ACUAL_EMITS).increment(1L);
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/openimaj/hadoop/tools/twitter/token/mode/dfidf/CountTweetsInTimeperiod$TimeIndexReducer.class */
    public static class TimeIndexReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
        private TimeFrequencyHolder timeMap = new TimeFrequencyHolder();

        protected void reduce(LongWritable longWritable, Iterable<BytesWritable> iterable, Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            if (longWritable.get() == Map.END_TIME.get()) {
                Iterator<BytesWritable> it = iterable.iterator();
                while (it.hasNext()) {
                    context.write(longWritable, it.next());
                }
                return;
            }
            TweetCountWordMap tweetCountWordMap = new TweetCountWordMap();
            for (BytesWritable bytesWritable : iterable) {
                tweetCountWordMap.combine(IOUtils.read(new ByteArrayInputStream(bytesWritable.getBytes()), TweetCountWordMap.class));
                context.write(longWritable, bytesWritable);
            }
            TimeFrequencyHolder.TimeFrequency timeFrequency = new TimeFrequencyHolder.TimeFrequency(longWritable.get(), tweetCountWordMap.getNTweets());
            this.timeMap.put(timeFrequency.time, timeFrequency);
        }

        protected void cleanup(Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
            Path path = new Path(context.getConfiguration().getStrings(CountTweetsInTimeperiod.TIMEINDEX_LOCATION_PROP)[0] + "/" + context.getTaskAttemptID());
            System.out.println("Writing time index to: " + path);
            System.out.println("Timemap contains: " + this.timeMap.size());
            CountTweetsInTimeperiod.writeTimeIndex(this.timeMap, path);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((LongWritable) obj, (Iterable<BytesWritable>) iterable, (Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context) context);
        }
    }

    public CountTweetsInTimeperiod(String[] strArr, long j) {
        this.buildTimeIndex = true;
        this.nonHadoopArgs = strArr;
        this.inmemoryCombine = false;
        this.timedelta = j;
    }

    public CountTweetsInTimeperiod(String[] strArr, boolean z, long j) {
        this.buildTimeIndex = true;
        this.nonHadoopArgs = strArr;
        this.inmemoryCombine = z;
        this.timedelta = j;
    }

    /* renamed from: stage, reason: merged with bridge method [inline-methods] */
    public TextLongByteStage m5stage() {
        return new TextLongByteStage() { // from class: org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod.1
            private Path actualOutputLocation;

            public void setup(Job job) {
                job.getConfiguration().setStrings("TOKEN_ARGS", CountTweetsInTimeperiod.this.nonHadoopArgs);
                job.getConfiguration().setLong(CountTweetsInTimeperiod.TIMEDELTA, CountTweetsInTimeperiod.this.timedelta);
                job.getConfiguration().setStrings(CountTweetsInTimeperiod.TIMEINDEX_LOCATION_PROP, new String[]{new Path(this.actualOutputLocation, CountTweetsInTimeperiod.TIMEINDEX_FILE).toString()});
                if (CountTweetsInTimeperiod.this.inmemoryCombine) {
                    return;
                }
                if (CountTweetsInTimeperiod.this.buildTimeIndex) {
                    job.setNumReduceTasks(10);
                } else {
                    job.setNumReduceTasks(0);
                }
            }

            public Class<? extends Mapper<LongWritable, Text, LongWritable, BytesWritable>> mapper() {
                return Map.class;
            }

            public Class<? extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>> reducer() {
                return CountTweetsInTimeperiod.this.inmemoryCombine ? InMemoryCombiningReducer.class : CountTweetsInTimeperiod.this.buildTimeIndex ? TimeIndexReducer.class : super.reducer();
            }

            public Job stage(Path[] pathArr, Path path, Configuration configuration) throws Exception {
                this.actualOutputLocation = path;
                return super.stage(pathArr, path, configuration);
            }

            public String outname() {
                return CountTweetsInTimeperiod.TIMECOUNT_DIR;
            }

            public void finished(Job job) {
                try {
                    Counters counters = job.getCounters();
                    Path path = new Path(this.actualOutputLocation, CountTweetsInTimeperiod.GLOBAL_STATS_FILE);
                    try {
                        IOUtils.writeASCII(HadoopToolsUtil.getFileSystem(path).create(path), new WritableEnumCounter<TextEntryType>(counters, TextEntryType.values()) { // from class: org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod.1.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter
                            public TextEntryType valueOf(String str) {
                                return TextEntryType.valueOf(str);
                            }
                        });
                    } catch (IOException e) {
                    }
                } catch (IOException e2) {
                }
            }
        };
    }

    public static void writeTimeIndex(TimeFrequencyHolder timeFrequencyHolder, Path path) throws IOException {
        OutputStream outputStream = null;
        try {
            outputStream = HadoopToolsUtil.getFileSystem(path).create(path, true);
            IOUtils.writeBinary(outputStream, timeFrequencyHolder);
            outputStream.flush();
            outputStream.close();
        } catch (Throwable th) {
            outputStream.close();
            throw th;
        }
    }

    public static TimeFrequencyHolder readTimeIndex(Path path) throws IOException {
        if (!HadoopToolsUtil.fileExists(path.toString())) {
            return null;
        }
        System.out.println("Reading time index from: " + path);
        final TimeFrequencyHolder timeFrequencyHolder = new TimeFrequencyHolder();
        FileSystem fileSystem = HadoopToolsUtil.getFileSystem(path);
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            System.out.println("Reading index part: " + fileStatus.getPath());
            InputStream inputStream = null;
            try {
                inputStream = fileSystem.open(fileStatus.getPath());
                IOUtils.read(inputStream, TimeFrequencyHolder.class).forEachEntry(new TLongObjectProcedure<TimeFrequencyHolder.TimeFrequency>() { // from class: org.openimaj.hadoop.tools.twitter.token.mode.dfidf.CountTweetsInTimeperiod.2
                    public boolean execute(long j, TimeFrequencyHolder.TimeFrequency timeFrequency) {
                        TimeFrequencyHolder.this.put(j, timeFrequency);
                        return true;
                    }
                });
                inputStream.close();
            } catch (Throwable th) {
                inputStream.close();
                throw th;
            }
        }
        timeFrequencyHolder.recalculateCumulativeFrequencies();
        return timeFrequencyHolder;
    }

    public static Path constructIndexPath(Path path) {
        return new Path(new Path(path, TIMECOUNT_DIR), TIMEINDEX_FILE);
    }
}
