Coder Social home page Coder Social logo

aegisthus's Introduction

Aegisthus

STATUS

Aegisthus has been archived and will receive no further updates.

OVERVIEW

A Bulk Data Pipeline out of Cassandra. Aegisthus implements a reader for the SSTable format and provides a map/reduce program to create a compacted snapshot of the data contained in a column family.

BUILDING

Aegisthus is built via Gradle (http://www.gradle.org). To build from the command line: ./gradlew build

RUNNING

Please see the wiki or checkout the scripts directory to use our sstable2json wrapper for individual sstables.

TESTING

To run the included tests from the command line: ./gradlew build

ENHANCEMENTS

  • Reading
    • Commit log readers
      • Code to do this previously existed in Aegisthus but was removed in commit 35a05e3f.
    • Split compressed input files
    • Add CQL support
      • This way the user doesn't have to add the key and column types as job parameters. Perhaps we will do this by requiring the table schema like SSTableExporter does.
  • Writing
    • Add an option to snappy compress output.
    • Add an output format for easier downstream processing.
      • See discussion on issue #36.
    • Add a pivot format
      • Create an output format that contains a column per row. This can be used to support very large rows without having to have all of the columns in memory at one time.
  • Packaging
    • Publish Aegisthus to Maven Central
    • Publish Shaded/Shadowed/FatJar version of Aegisthus as well

LICENSE

Copyright 2013 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

aegisthus's People

Contributors

charsmith avatar danchia avatar danielbwatson avatar jessicasw avatar roblovelock avatar rspieldenner avatar safletcher avatar sb2nov avatar sriramkrishnan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aegisthus's Issues

Composite row keys not read correctly

Hi,

I have a wide row column family which has composite rows of type (UTF8Type,UTF8Type). I' using -D"aegisthus.keytype=CompositeType(UTF8Type, UTF8Type)" parameter while invoking MR job. Aegisthus is returning "e2" as key for following composite "e2e:abxdsdsd". its because of following line in SSTableRecordReader class, because ":" is treated as separator in composite keys.

key.set(json.substring(2, json.indexOf(':') - 1));

Column values are not read correctly

Hi,
In my wide row column family, i am storing a byte[]. I have this CF exported in json format using Aegisthus to my HDFS. I have another MR job where i'm trying to read these column values. for some when i converted byte[](text to byte[] conversion), i'm not getting the same byte stream. So i'm not able to read the column values correctly. I'm getting following exception

Error: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readUTF(DataInputStream.java:609)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.csr.lts.analytics.mapred.transformation.json.RawDataTransformationReducer.reduce(RawDataTransformationReducer.java:79)

Thanks,
Nishi

"encountered EOF while reading checksum" Exception when working with large compressed data files

When running on an EMR Cluster and pulling files from s3 we are experiencing an issue trying to map large compressed data files.

The issue is in the CompressionInputStream. When reading the checksum you check to see that 4 bytes were read. However sometimes less bytes may have been read and no attempt is made to see if there are more bytes available.

We have implemented a fix for this, I can submit a pull request if you want.

The fix is to replace:

    private void readInput(int length) throws IOException {
        int offset = 0;
        while (offset < length) {
            int size = in.read(input, offset, length - offset);
            if (size == -1) {
                throw new EOFException("encountered EOF while reading block data");
            }
            offset += size;
        }
        // ignore checksum for now
        byte[] checksum = new byte[4];
        int size = in.read(checksum);

        if (size != 4) {
            throw new EOFException("encountered EOF while reading checksum");
        }

        valid = cm.compressor().uncompress(input, 0, length, buffer, 0);
        position = 0;
    }

with

    private void readInput(int length) throws IOException {
        int offset = 0;
        while (offset < length) {

            int size = in.read(input, offset, length - offset);
            if (size == -1) {
                throw new EOFException("encountered EOF while reading block data");
            }
            offset += size;
        }

        // ignore checksum for now
        readChecksum(4);

        valid = cm.compressor().uncompress(input, 0, length, buffer, 0);
        position = 0;
    }

    private void readChecksum(int length) throws IOException {
        byte[] checksum = new byte[length];
        int offset = 0;
        while (offset < length) {

            int size = in.read(checksum, offset, length - offset);
            if (size == -1) {
                throw new EOFException("encountered EOF while reading checksum. Chunk: " + cm.currentChunk() + ", length: " + cm.currentLength());
            }
            offset += size;
        }
    }

s3 support for data handling

Is it possible to use s3 and elastic mapreduce for processing sstables from backups? Do you push data into hdfs and then process it from -> to there?

Add a shutdown hook to kill the hadoop job.

If you submit Aegisthus with "hadoop jar aegisthus.jar com.netflix.Aegisthus" and then that process is killed the hadoop application running on the cluster is not stopped.

Handling very large sstables

I am attempting to run aegisthus against a Priam backup containing some very large sstables (~200GB in size). Being that these files are snappy compressed (in addition to the Cassandra snappy compression) and we are using Cassandra 1.2 I'm currently using the code on the coursera fork. I'm running into problems with this as it takes a very long time (3+ hours) to handle these very large files. Aegisthus appears to expect sstables which haven't been double compressed by Priam, and doesn't appear to split the files if the -CompressionInfo.db files are available.

Questions:

  1. Do you guys use Aegisthus with similarly large sstables?
  2. Do you get any splitting on sstables that have the internal Cassandra snappy data? (i.e. -CompressionInfo.db files are available)

Using the new SSTable format

@danielbwatson given that we're trying to deprecate the JSON output format, I wonder what's the best way for people to write downstream jobs that want to process data in a row manner?

It seems to be that there are two options:

(1) Run the same Mapper and Reducer used in aegisthus, but use a ChainReducer so that we can add a custom map stage after to do the application specific processing.

(2) The SSTables output by Aegisthus are actually special, since it's guaranteed that rows are non-overlapping and the columns are sorted in the right order. I'm wondering if we could expose this to a mapper in some smart way (and avoid the reduce step).

What do you think?

Timestamp clustering keys lose precision

First off, thanks for this project as I'm making great use of it to bring in C* data to our warehouse.

I'm running into an issue with timestamp clustering keys and was hoping someone could point me to the right part of the code base to investigate it further.

Say I have the following table:

CREATE TABLE t (
col1 int,
col2 timestamp,
col3 varchar,
PRIMARY KEY ((col1), col2);

I'm passing this DDL into Aegisthus using the "-D aegisthus.cql_schema" option. In the JSON output, the formatting of col2 is being truncated to the minute. So if the source value was "2016-09-21 12:48:05" the output would look like "2016-09-21 12\:48Z". This becomes an issue when you have two rows with identical primary keys except for the timestamps occurring at different seconds -- now the output looks like it has duplicate primary keys due to the loss of timestamp precision.

It's possible to work around this issue by changing the DDL to use "bigint" instead of "timestamp", which leads me to believe this is simply an output formatting issue.

Any idea where this might be happening or how to fix? My first guess was https://github.com/Netflix/aegisthus/blob/master/aegisthus-hadoop/src/main/java/com/netflix/aegisthus/output/SSTableOutputFormat.java#L104 but not sure about that.

Thank you.

Possible Hadoop Version Issue

Hi,

When I run aegisthus I get the following exception:

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at com.netflix.aegisthus.input.AegisthusInputFormat.initConvertors(AegisthusInputFormat.java:83)
at com.netflix.aegisthus.input.AegisthusInputFormat.getSplits(AegisthusInputFormat.java:178)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
at com.netflix.Aegisthus.run(Aegisthus.java:149)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.netflix.Aegisthus.main(Aegisthus.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

I am using Hadoop version 2.2.0, but the gradle build file seems to rely on version 1.1.0 (unless I'm misreading it). Is this the problem or is this something else?

Thank you,

Document how to run tests

There is a test project, but it's not immediately obvious to me how one would run the tests. It would be good if the README documented how to do so.

Cassandra 2.x support

Cassandra moves on and we need to upgrade. This task needs a little research and at least a dependency change.

Incorrect handling of Cassandra Snappy compressed SSTables.

Aegisthus will actually miss out rows in compressed SSTables, as it does not correctly track the sstable file size.

The SSTableRecordReader tracks the position in file to determine if there are still records to read. However, based on SSTableReader it's tracking the position in the uncompressed stream.

However, in AegSplit after initializing the compressedInputStream, we don't adjust end to be the actual uncompressed stream length (available in CompressionMetadata).

I sort of have a working patch, will submit in a PR in a few days.

Any non-pig decoder/reader for reading aegisthus output json?

I am evaluating aegisthus for converting sstable files to json. I got it to run and I see some output as pasted below but it seems to be encoded in some way i.e. it is not human readable. I saw that there is a pig loader for the output json. But in our data pipeline we use map reduce and it will be useful if we can read and process data in map reduce. Is there any reader/decoder for the aegisthus output json?

fffaa747-5d2d-d775-9be1-3b0866ddbbe4 {"fffaa747-5d2d-d775-9be1-3b0866ddbbe4":
{"deletedAt":-9223372036854775808,"columns":[["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t0008:004g\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t0009:003n\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t000a:004f\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t000e:004d\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808]]}

Running MRMiniCluster with aegisthus

Hi,

I want to debug my RecordReader, so trying to setup a mini MR Cluster. With this following gradle dependencies,

dependencies {
configurations.includeInJar {
transitive = false
}
includeInJar project(':aegisthus-core')
includeInJar 'org.apache.cassandra:cassandra-all:2.0.7'
includeInJar 'org.apache.pig:pig:0.11.1'
includeInJar 'org.xerial.snappy:snappy-java:1.0.4.1'

    compile 'org.slf4j:slf4j-api:1.6.3'
    compile 'org.apache.hadoop:hadoop-client:2.3.0-cdh5.0.1'
    testCompile 'org.apache.mrunit:mrunit:0.9.0-incubating:hadoop2'
    testCompile 'org.apache.hadoop:hadoop-minicluster:2.3.0-cdh5.0.1'
    testCompile 'org.apache.hadoop:hadoop-test:2.3.0-mr1-cdh5.0.1'


    configurations.compile.extendsFrom(configurations.includeInJar)
}

Do we need hadoop-client dependency or hadoop-core ?

I am getting problem loading JobTracker class.

2014-08-18 15:44:34,535 ERROR [Thread-95] mapred.MiniMRCluster (MiniMRCluster.java:run(122)) - Job tracker crashed
java.lang.NoSuchMethodError: org.apache.hadoop.mapred.JobTracker.startTracker(Lorg/apache/hadoop/mapred/JobConf;Ljava/lang/String;)Lorg/apache/hadoop/mapred/JobTracker;
at org.apache.hadoop.mapred.MiniMRCluster$JobTrackerRunner$1.run(MiniMRCluster.java:117)
at org.apache.hadoop.mapred.MiniMRCluster$JobTrackerRunner$1.run(MiniMRCluster.java:115)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.MiniMRCluster$JobTrackerRunner.run(MiniMRCluster.java:115)
at java.lang.Thread.run(Thread.java:744)

SSTableScanner not able to convert column names correctly.

Have a schema ,

CREATE TABLE containerdefinition (
containerdefinitionid text,
availabilityrules text,
availabilityruleslockedbyuid text,
containerdefinitionname text,
containernamespaceid text,
createdby text,
createdtimestamp timestamp,
deletetime bigint,
environment text,
ipaddressallowed list,
metadata text,
partner text,
policymetadata text,
policymetadatalockedbyuid text,
properties list,
swmcomponent text,
updatedby text,
updatedtimestamp timestamp,
PRIMARY KEY (containerdefinitionid)
)

when i set my
job.getConfiguration().set("aegisthus.keytype", "UTF8Type");
job.getConfiguration().set("aegisthus.columntype", "UTF8Type");

Can someone help me with this issue? The same works with if all the columns are "text". If i don't give any column type, it is converting it to BytesType which is very difficult to decrypt.

I am getting error in generating column name.

2014-09-23 18:23:25,079 WARN [Thread-141] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local1118817108_0001
java.lang.Exception: org.apache.cassandra.serializers.MarshalException: String didn't validate.
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: org.apache.cassandra.serializers.MarshalException: String didn't validate.
at org.apache.cassandra.serializers.UTF8Serializer.validate(UTF8Serializer.java:35)
at org.apache.cassandra.db.marshal.AbstractType.getString(AbstractType.java:154)
at xx.aegisthus.io.sstable.SSTableScanner.convertColumnName(SSTableScanner.java:333)
at xx.aegisthus.io.sstable.SSTableScanner.serializeColumns(SSTableScanner.java:272)
at xx.aegisthus.io.sstable.SSTableScanner.next(SSTableScanner.java:214)
at xx.aegisthus.input.readers.SSTableRecordReader.nextKeyValue(SSTableRecordReader.java:96)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:533)

java.io.EOFException while processing SSTables

We are trying out POC for bulk load of data from Cassandra (version 3.0.9) to HDFS. But running into EOFException. Please find the command that we are using. Any help in this matter is greatly appreciated.

hadoop jar /opt/aegisthus/aegisthus-hadoop-0.3.0-SNAPSHOT.jar com.netflix.Aegisthus -inputDir /raw/cassandra/sstables -output /raw/cassandra/json -Daegisthus.columntype=UTF8Type -Daegisthus.keytype=UTF8Type -libjars lz4-1.2.0.jar

Input to the reducer should be the columns for the row sorted by comparator.

Currently Aegisthus operates on a row level. This means we have the following flow

map -> [row, row, row] -> reducer

This worked great before Cassandra1.2 In 1.2 we have RangeTombstones. They need to be handled in order in the row. We could handle this by just reading all the tombstones and handling it per row, but ideally we would work as Cassandra does.

Additionally this would resolve the issue with expiring columns as we could delete them from the snapshot when they have required, making the incrementals work better with that type of data.

Deleted columns reach output in case of non-deleted rows.

Some versions of Cassandra (1.2 for sure, 1.1 possibly) initialize the "deleted at" field for rows in SSTables to Long.MIN_VALUE (which is a very large negative number).

When the CassReducer checks for deleted columns, it is comparing column deletion timestamps to the row deletion timestamp. In case the row has not been deleted, its "deleted at" will be Long.MIN_VALUE and the appropriate condition will not hold (signed negative number deletedAt is not larger than column deletion time ts).

This way, deleted columns will find their way to the output.

Cassandra 1.2 support

I will expand this issue, but currently I believe there are some compiling issues as well as a new file format.

Instructions to get Aegisthus Running

Hi,

I'm pretty new to Aegisthus so I apologize for the novice questions.

I have a 10 node Hadoop Cassandra cluster that I would like to test Aegisthus on. Question is, I'm not exactly sure what steps are invlolved to get the command mentioned in the wiki running: hadoop jar aegisthus-hadoop-0.1.jar com.netflix.Aegisthus -inputDir sstables -output json.

Currently running this:
hadoop jar aegisthus-hadoop-0.1.jar com.netflix.Aegisthus -inputDir sstables -output json

results in the following error:
-bash: hadoop: command not found

Do we have to have gradle installed on the server that we will run the command from?

Also, is it required to have a standalone server that just has Hadoop on it, or should it work with Cassandra running? I have downloaded the Aegisthus zipped files on GitHub and placed it on the server however running the ./gradlew build (as mentioned in the readme) results in the below error:

./gradlew build
-bash: ./gradlew: No such file or directory

I guess all I'm looking for is some basic novice instructions just to get Aegisthus installed to where I can run the hadoop command.

I appreciate your patience and any help you can provide.

Kind Regards,
Cassie

Aegisthus spends a lot of time deserializing AegisthusKey

On the map stage, we tend to be pretty CPU bound, and I took a quick profile, which shows SpillThread using most of the time, and in SpillThread it seems like use a lot of type copying bytes.

@danielbwatson I'm going to experiment with making AegisthusKey RawComparable, and will report back how that goes.

Flat profile of 262.48 secs (10192 total ticks): SpillThread

Interpreted + native Method
11.6% 0 + 778 org.apache.hadoop.io.compress.snappy.SnappyCompressor.compressBytesDirect
1.9% 0 + 128 java.io.FileOutputStream.writeBytes
0.2% 14 + 0 org.apache.hadoop.util.DataChecksum.update
0.2% 13 + 0 org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill
0.2% 11 + 0 org.apache.hadoop.io.compress.snappy.SnappyCompressor.compress
0.1% 7 + 0 com.netflix.aegisthus.io.writable.AegisthusKey.readFields
0.1% 7 + 0 org.apache.cassandra.utils.ByteBufferUtil.compareUnsigned
0.1% 7 + 0 org.apache.cassandra.db.marshal.AbstractCompositeType.compare
0.1% 4 + 0 org.apache.hadoop.io.compress.BlockCompressorStream.finish
0.1% 4 + 0 org.apache.hadoop.util.HeapSort.sort
0.1% 4 + 0 org.apache.hadoop.mapred.IFile$Writer.append
0.0% 3 + 0 org.apache.cassandra.utils.ByteBufferUtil.readBytes
0.0% 3 + 0 org.apache.hadoop.io.compress.BlockCompressorStream.rawWriteInt
0.0% 3 + 0 org.apache.hadoop.mapred.MapTask$MapOutputBuffer.swap
0.0% 3 + 0 org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare
0.0% 1 + 1 java.io.UnixFileSystem.getBooleanAttributes0
0.0% 2 + 0 org.apache.cassandra.db.marshal.Int32Type.compare
0.0% 2 + 0 org.apache.cassandra.db.marshal.ReversedType.compare
0.0% 2 + 0 org.apache.cassandra.utils.ByteBufferUtil.getShortLength
0.0% 2 + 0 org.apache.hadoop.io.compress.CompressorStream.write
0.0% 2 + 0 com.netflix.aegisthus.io.writable.AegisthusKey.compareTo
0.0% 2 + 0 org.apache.hadoop.io.compress.CompressorStream.
0.0% 2 + 0 org.apache.hadoop.io.WritableComparator.compare
0.0% 2 + 0 java.io.BufferedOutputStream.write
0.0% 2 + 0 java.io.ByteArrayInputStream.read
15.5% 136 + 908 Total interpreted (including elided)

 Compiled + native   Method                        

30.5% 2051 + 0 java.io.DataInputStream.readFully
21.8% 1341 + 128 com.netflix.aegisthus.io.writable.AegisthusKey.readFields
13.5% 894 + 15 org.apache.cassandra.db.marshal.AbstractCompositeType.compare
9.8% 658 + 0 org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare
2.6% 173 + 0 org.apache.hadoop.util.QuickSort.sortInternal
2.2% 148 + 0 org.apache.hadoop.io.WritableComparator.compare
1.3% 90 + 0 org.apache.hadoop.io.compress.BlockCompressorStream.write
0.8% 52 + 1 org.apache.hadoop.mapred.IFileOutputStream.write
0.5% 34 + 0 org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill
0.5% 31 + 0 org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write
0.2% 13 + 0 org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write
0.2% 0 + 11 org.apache.cassandra.utils.ByteBufferUtil.readBytesWithShortLength
0.1% 9 + 0 org.apache.hadoop.io.compress.snappy.SnappyCompressor.setInput
0.1% 8 + 0 org.apache.cassandra.db.marshal.AbstractCompositeType.compare
0.1% 4 + 0 org.apache.hadoop.util.HeapSort.downHeap
0.0% 3 + 0 java.nio.Bits.copyToArray
0.0% 3 + 0 java.nio.DirectByteBuffer.get
0.0% 1 + 0 org.apache.hadoop.util.QuickSort.fix
0.0% 0 + 1 java.util.Vector.addElement
0.0% 1 + 0 java.io.DataInputStream.readInt
0.0% 1 + 0 org.apache.hadoop.mapred.IFile$Writer.append
84.2% 5515 + 156 Total compiled

Data Discrepancy

I followed following steps to test a simple scenario using Aegisthus....

Step 1: Prepare Test data in cqlsh

CREATE TABLE hoteldetails (
... key text,
... zip text,
... PRIMARY KEY (key)
... ) WITH COMPACT STORAGE AND
... bloom_filter_fp_chance=0.010000 AND
... caching='KEYS_ONLY' AND
... comment='' AND
... dclocal_read_repair_chance=0.000000 AND
... gc_grace_seconds=864000 AND
... index_interval=128 AND
... read_repair_chance=0.100000 AND
... replicate_on_write='true' AND
... populate_io_cache_on_flush='false' AND
... default_time_to_live=0 AND
... speculative_retry='NONE' AND
... memtable_flush_period_in_ms=0 AND
... compaction={'class': 'SizeTieredCompactionStrategy'} AND
... compression={'sstable_compression': ''};

insert into hoteldetails (key,zip) values('123','456');
insert into hoteldetails (key,zip) values('1234','4567');

select * from hotelks1.hoteldetails;

key | zip
------+------
123 | 456
1234 | 4567

./ nodetool flush

Step 2 : Upload sstables to hdfs...

step 3 : hadoop jar aegisthus-hadoop-0.1.2.jar com.netflix.Aegisthus -inputDir /tmp/jsontest/ -output /tmp/outfile

Step 4 : hadoop fs -cat /tmp/outfile/part*

313233 {"313233": {"deletedAt": 979018199334916, "columns": []}}

NOt able to understand why am i not getting actual data in json format. Please guide

Use of rxJava without backpressure leads to excessive memory consumption

SSTableColumnScanner creates an observable that produces data as fast as it can read from disk.
In SSTableRecordReader, we convert this observable to an iterator, which internally buffers data in a LinkedBlockingQueue of unbounded size.

If for whatever reason the mapper falls behind (e.g. doing a spill and sort), then the blockingQueue can grow to unbounded size. I've (somewhat hackily) fixed this in coursera@807a0f4 by using a custom conversion toIterator that makes use of a bounded blocking queue. It's not very in the spirit of RxJava, however, as the toIterator observer blocks the observable when the queue is full.

Combine Input

Netflix runs with a max number of mappers allowed per job which incremental processing can exceed in some cases. Combining small SSTables will fix this without decreasing performance.

Deleted columns in output.

Assuming a row is present in only one of the input SSTables, it's corresponding value set in the reducer will have only one element. The optimization condition will be triggered and therefore the execution path will not go through the logic removing deleted columns. This will result in deleted columns appearing in the output.

EOF exception

Hi,

I am new to cassandra and aegisthus ... am executing this:

hadoop jar aegisthus-hadoop-0.1.2.jar com.netflix.Aegisthus -libjars lz4-1.1.1.jar -inputDir /tmp/jsontest -output /tmp/outfile

And this leads to error

Error: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:180)
at java.io.DataInputStream.readFully(DataInputStream.java:152)

I have uploaded sstable to hdfs and the executing above code.. please help out...

Process data in a columnar manner

Long rows cause oom problems as we put both the row and the json into memory concurrently. Additionally we may be able to better handle TTL and expiring tombstones.

Functional example?

Is there any doc on a functional example? I'm trying to just process a test data set:

cqlsh> SELECT * FROM mattest.users;

 userid      | emails              | first_name | last_name | todo | top_scores
-------------+---------------------+------------+-----------+------+------------
 matschaffer | {'[email protected]'} |        mat |  schaffer | null |       null

(1 rows)

And running this on EMR:

hadoop jar aegisthus-hadoop-0.3.0-SNAPSHOT.jar com.netflix.Aegisthus -input s3://bucket/mattest2.0/mattest/users/snapshots/1460510053352/mattest-users-jb-1-Data.db -output s3://bucket/outputs2.0/1

The job completes, but all I get back is one JSON file that doesn't look like I can do anything with it:

6d61747363686166666572  {"6d61747363686166666572":{"deletedAt":-9223372036854775808,"columns":[["000000","",1460510043566000],["0006656d61696c7300000f6d61744073636861666665722e6d6500","",1460510043566000],["00096c6173745f6e616d6500","7363686166666572",1460510043566000],["000a66697273745f6e616d6500","6d6174",1460510043566000]]}}

I tested this on snapshots from both 2.0 and 2.1 (via the https://hub.docker.com/_/cassandra/ containers) with the same result. Here's the Aegisthus build line:

Running Aegisthus version 0.3.0-SNAPSHOT built from change 3305276 on host ip-10-0-2-78 on 2016-04-12_20:34:02 with Java 1.8.0_77

Thanks in advance!

Add commit log readers

Previously Aegisthus had readers for commit logs. These were removed in commit 35a05e3 to prepare for C* 2.0.x support.

Reimplement the Commit Log readers with support for reading C* 1.2.x and 2.0.x formats.

java.io.EOFException while running Aegisthus

Hi I am very new to Aegisthus.
Any help in resolving this exception is highly appreciated.

I tried running to process my SSTables using this command. The SSTables are from Cassandra 2.0.9
(Note: without the lz4 jar I encountered a ClassNotFoundException..is this the correct way?)
hadoop jar aegisthus-hadoop-0.2.4.jar com.netflix.Aegisthus -Daegisthus.columntype=UTF8Type -Daegisthus.keytype=UTF8Type -libjars lz4-1.2.0.jar -inputDir myinputdir -output json
My inputDir "myinputdir" has the Data.db and the CompressionInfo.db

Inspiron-N7110 ~/git/aegisthus2/aegisthus-hadoop/build/libs $ hadoop jar aegisthus-hadoop-0.2.4.jar com.netflix.Aegisthus -Daegisthus.columntype=UTF8Type -Daegisthus.keytype=UTF8Type -libjars lz4-1.2.0.jar -inputDir myinputdir -output json 
14/12/07 23:24:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/12/07 23:24:54 INFO tools.DirectoryWalker: hdfs://localhost:9000/user/stax/csst/node1 :    2 file(s)
14/12/07 23:24:54 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/12/07 23:24:55 INFO input.FileInputFormat: Total input paths to process : 1
14/12/07 23:24:55 INFO input.AegisthusInputFormat: aegisthus.keytype: UTF8Type
14/12/07 23:24:55 INFO input.AegisthusInputFormat: aegisthus.columntype: UTF8Type
14/12/07 23:24:55 INFO input.AegisthusInputFormat: end path: csst-users-jb-1-Data.db:0:79
14/12/07 23:24:55 INFO input.AegSplit: start: 0, end: 79
14/12/07 23:24:55 INFO input.AegisthusCombinedInputFormat: sstable AegSplits: 1
14/12/07 23:24:55 INFO input.AegisthusCombinedInputFormat: sstables Added AegSplits: 1
14/12/07 23:24:55 INFO input.AegisthusCombinedInputFormat: other AegSplits: 0
14/12/07 23:24:55 INFO input.AegisthusCombinedInputFormat: AegCombinedSplits: 1
14/12/07 23:24:55 INFO mapreduce.JobSubmitter: number of splits:1
14/12/07 23:24:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1417989836400_0003
14/12/07 23:24:56 INFO impl.YarnClientImpl: Submitted application application_1417989836400_0003
14/12/07 23:24:56 INFO mapreduce.Job: The url to track the job: http://stax-Dell-System-Inspiron-N7110:8088/proxy/application_1417989836400_0003/
job_1417989836400_0003
Inspiron-N7110:8088/proxy/application_1417989836400_0003/
14/12/07 23:24:56 INFO mapreduce.Job: Running job: job_1417989836400_0003
14/12/07 23:25:02 INFO mapreduce.Job: Job job_1417989836400_0003 running in uber mode : false
14/12/07 23:25:02 INFO mapreduce.Job:  map 0% reduce 0%
14/12/07 23:25:06 INFO mapreduce.Job: Task Id : attempt_1417989836400_0003_m_000000_0, Status : FAILED
Error: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:395)
    at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:371)
    at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:74)
    at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:69)
    at com.netflix.aegisthus.io.sstable.SSTableScanner.serializeColumns(SSTableScanner.java:231)
    at com.netflix.aegisthus.io.sstable.SSTableScanner.next(SSTableScanner.java:205)
    at com.netflix.aegisthus.input.readers.SSTableRecordReader.nextKeyValue(SSTableRecordReader.java:94)
    at com.netflix.aegisthus.input.readers.CombineSSTableReader.nextKeyValue(CombineSSTableReader.java:50)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:533)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)```

aegisthus-pig map-columns not using hex value of ascii column name

When running the aegisthus-pig example listed in the wiki using map_columns#'[column-name]'.value to get the value the pig script will not find a json element although it reports the conversion happens in pig output.

However when I use map_columns#'[hex-value-of-column]'.value the correct value is returned from the json object.

I also noticed the json in the files generated by the MR job the column names are still hex strings; However in the CasseducerTest test case code the columns are ASCII

running latest version from master

Style clarification

Hi guys, I was wondering if you could clarify what's the official style rules being followed for this codebase?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.