Monday, February 17, 2014

Flume versus Chukwa comparison

I was looking into the differences between Chukwa and Flume

Jonathan Hsieh at Cloudera posted a biased explanation on Flume google groups
Obviously, since Flume is shipped with Cloudera's distribution. 
It's worth to read the full conversation, but I'll post here the last part of the conversation.

-------------------------------------------------

> While the story with the Chukwa MR job seem fairly clear, with the new HBase
> story, does Chukwa depend on HBase  to demux different kinds of data from
> different sources?  Is it based on the data's key somehow?

The new HBase Writer uses the same mapreduce demux parser classes,
which runs on the collector to extract and filter data at real time.
The composition of row key is based on the decision put on the demux
parser.

> Does Chukwa depend on getting data into HBase before performing extraction
> or analytic operations?

It can do both or sending as raw bytes, but it is preferred to perform
extraction before sending to HBase to trim down the storage bytes.

> > 2) Reliability
>
> > > Flume's reliability levels are tunable, just pick the appropriate sink
> > > to specify the mode you want to collect data at.  It offers three
> > > levels -- best effort, store+retry on failure, and end-to-end mode
> > > that uses acks and a write ahead log.
>

All three methods exist in Chukwa.  The first attempt, SeqFileWriter
uses HDFS client to write data with flush, then ack to the source
where data has deposited.  This method is fairly slow because HDFS
sync can not be called too frequently.  It was decided to sync every
15 seconds.  If there is a collector system crash without closing the
file, the data is still lost without written.  The second method was
to use LocalWriter, where data is written to local disk of the
collector, and uploaded to HDFS as one shot, when the chunks of data
has been close after n minute.  This is also a problem when data
arrival rate is faster than HDFS can write, data are buffering faster
than sinked to hdfs, which causes the collector to become disk full.
The improved version is having agent to write ahead and retry if no
ack received, collector does not buffer any data.  This allows
collector to go down without losing data and open up the possibility
to implement other writer like HbaseWriter.  This model works the best
for us so far.  I think both team has done the same research but in
the different order.  End-to-end reliability mode did not work for
Chukwa because hdfs flush was no ops in Hadoop 0.18, and hasn't been
improved in 0.20.

> As I alluded to, I couldn't find much about Chukwa's reliability story on
> its web pages -- and I stand corrected about a Chukwa collector writing to
> local disk.  Do you have a pointer or write up on the different modes?  Is
> the WAL mode the default?

There are SeqFileWriter (HDFS), SocketTeeWriter (in memory),
LocalWriter (local disk), and soon HbaseWriter (in memory).  All of
them are using agent write ahead, and retry on no ack.

> Flume's logs are durable and is intended to allow and recover from multiple
> failures and  the cluster of machines.   Flume supports a hot failover
> mechanism to different collectors ( I'd imagine that Chukwa supports this).
>  Moreover, Flume also supports  dynamic reconfiguration of nodes -- this
> allows us to allocate more collectors at the system's master and take load
> from other collectors in an automated fashion.

Chukwa doesn't have dynamic allocation of more collectors, but this
can be easily implemented.

> > 3) Manageability
>
> Chukwa's core original mechanisms still requires  MapReduce jobs to run to
> demux data.  I can see that if you drop the console/monitoring portion then
> you can drop the mysql part.    Chukwa still has have specialized an agent
> daemons  and a collectors daemons.

The separation of agent and collector is important because collector
is generally work in push model, where agent can utilize both pull
model or push model.  This has several benefits.  First, this model
reduces the chance of operator error for flipping incorrect data flow
direction.  Second, agent only open one tcp connection to one collect
at any given time.  It reduces the number of TCP connections open to
avoid tcp in-cast problem.  Third, the source can decide to buffer
data for reliability reason without making this decision in middle of
the pipe to cause spills.  Forth, each component is optimized for the
assigned task without generalization trade off, collector does not
need to implement logic for tracking rotated log, and agent does not
need to worry about filter or mirror of data, this reduces memory foot
print to live on source nodes.  Chukwa agent can run with only 4mb of
jvm heap size.

> Flume essentially has a data plane that have one kind of daemon (nodes) and
> a control plane that has another daemon (masters).  With the control plan we
> can dynamically change the role of a nodes and its behavior from a
> centralized point.  This means a flume master can react and deal with newly
>  provisioned nodes in the system, or re-purpose nodes to be come collectors
> in different data flows.

Chukwa has the managing aspect open ended, and it could be added by
using zookeeper to keep track of collector list, in 5 minutes.  With
hope, there may be more experience sharing and collaboration among
Flume and Chukwa.

regards,
Eric

Friday, February 14, 2014

Java Hadoop: Reducer Key counter-intuitive behavior

One of the more curious gotchas once you're into the Hadoop Developer career, is a funny counter-intuitive behaviour that occurs in the Java Reducer.

A SecondarySort example to illustrate


Suppose you have a dataset such as the typical Stock Market data:

Stock  Timestamp  Price

GOOG 2014-02-14 15:51  1000.0
GOOG 2014-02-14 15:52  1001.3
GOOG 2014-02-14 15:53  1001.2
...

Of course with different stocks, not only GOOG. And of course again, not necessarily sorted.

You want to sort by <Stock, Timestamp> to properly do your calculations of how fast GOOG stocks are rising (they never go down, do they?). In other words, you want to know the differences between each and the next so you can draw a slope curve to show off in front of your significant other.

You would make a composite Key including Stock and Timestamp. Call it CompositeKey, implement their toString as the concatenation of the two values, and everything else as you should. I omit this code for brevity.
You would perform a secondary sort so you receive in each reducer your key, with all the values gathered.


public void reduce( CompositeKey key, Iterable<DoubleWritable> values), Context context)
      throws IOException, InterruptedException 
{
    double price = -1;
    for(DoubleWritable value: values) {
        if(price = -1)
            price = value.get();
        double difference = value.get() - price; 
        context.write(key, new DoubleWritable(difference)));
    }
}

Here we should be emitting something like:

GOOG 2014-02-14 15:51  0
GOOG 2014-02-14 15:52  1.3
GOOG 2014-02-14 15:53  -0.1

Are we really doing so?


Look at the variable key.
We are iterating the values. But apparently we are not iterating the key. Inside the reducer, the object key seems to be untouched, unchanged. So, according to regular intuition and Java standards, we should be emitting the same key every time (that would be, for example, the first key, <GOOG, 2014-02-14 15:51> ). So for a non-hadoop Java programmer, the output should be something like:

GOOG 2014-02-14 15:51  0
GOOG 2014-02-14 15:51  1.3
GOOG 2014-02-14 15:51  -0.1

So why does the above code works?
It does because of the iterability on the values.
With each new iteration over the values, the values obviously change. But also does the key: when calling next() method, the key object is also updated.

Why is this not made explicit? This is a good question. And I bet this is a question that appears in many introductory Hadoop courses from those curious enough as to take the steps to delve into Hadoop's guts.

What if this is what I want?


If you would want to keep keys (something you may have to ask yourself twice), you can only do it by performing a deep copy (or hard clone). You can do this elegantly by implementing the clone() method in your CompositeKey, and returning a newly created object and deep-copying there the attributes.