Sunday, April 27, 2014

Storm-YARN at Yahoo! - Realtime streaming data processing over HDFS at Hadoop Summit

In Hadoop Summit last year, Andrew Feng of Yahoo! delivered a presentation on Storm-on-YARN, using as a use case the personalization they use in their web pages.

Apache Storm is a streaming data processing aimed at achieving real-time processing over big data. It is fault-tolerant (if any node fails, the task will be respawned in other nodes). We have previously talked about Lambda Architectures recently (and talking about Lambdoop at the same time), and about its seminal article by Nathan Marz. Storm is actually led by nathanmarz, and it proposes to form a topology of tasks separated in Spouts(sources of data) and Bolts (processors with inputs and outputs) in a DAG, which reminds me that of Apache Tez.

I've found the Storm explanation at Hortonworks page quite concise, concrete and to the point - so I'll just leave it there if you want to know more of Storm. I also recommend to browse the concepts at Storm home page, now at Apache Incubator.



I actually find interesting in the presentation, the explanation on how does Storm work internally by way of showing how storm-yarn makes your life easier.

PS. They actually tell everyone that Yahoo! had been using (by July 2013) YARN in a 10.000 nodes cluster. For those that doubt about stability and production-readiness of the system that brings that extra flex to HDFS.

PS2. All due respect, I find Mr. Feng's accent a bit hard to follow, but gotta say that as the presentation advanced I began to like it :)

Thursday, April 24, 2014

Hadoop number of reducers rule of thumb explained in map reduce

Reviewing my documentation for my CCDH exam in a few weeks, I have encountered the topic of the recommended number of reducers.
There is a rule of thumb widely known: 0.95 or 1.75 times (#nodes * mapred.tasktracker.tasks.maximum).

With 0.95 factor, all reduce tasks are run at the same time, but each one takes typically longer. With 1.75 factor, each reduce task finishes faster, but some reduce tasks are waiting in a queue to be executed. Prefer the 1.75 factor for better load balancing.

From Cloudera documentation:

Typical way to determine how many Reducers to specify:
– Test the job with a relatively small test data set
– Extrapolate to calculate the amount of intermediate data expected from the 'real' input data.
– Use that to calculate the number of Reducers which should be specified.

However this doesn't adjust to all, or most problems and/or environments. Looking for a deeper explanation and hints from industry leaders, I found in this interesting conversation in Quora, the following advice from Allen Wittenauer from Linkedin:

 I tend to tell users that their ideal reducers should be the optimal value that gets them closest to:

- A multiple of the block size
- A task time between 5 and 15 minutes
- Creates the fewest files possible

Anything other than that means there is a good chance your reducers are less than great.  There is a tremendous tendency for users to use a REALLY high value ("More parallelism means faster!") or a REALLY low value ("I don't want to blow my namespace quota!").  Both are equally dangerous, resulting in one or more of:

- Terrible performance on the next phase of the workflow
- Terrible performance due to the shuffle
- Terrible overall performance because you've overloaded the namenode with objects that are ultimately useless
- Destroying disk IO for no really sane reason
- Lots of network transfers due to dealing with crazy amounts of CFIF/MFIF work

Now, there are always exceptions and special cases. One particular special case is that if following that advice makes the next step in the workflow do ridiculous things, then we need to likely 'be an exception' in the above general rules of thumb.
[...]
One other thing to mention:  if you are changing the reducer count just to make the partitioning non-skewy in order to influence the partition, you're just plain "doing it wrong".

There is another issue, that is being in a shared cluster. If you use a FairScheduler, you can be affected by other jobs' and users' quotas. You'd need to adjust your "#nodes" value according to it.

More insight from Pere Ferrera from Datasalt:

- # reducers not too few because if they are very big and one fails, retrying it will take much more time than retrying a smaller one.
- # reducers is not generally the problem. If there is one, it is partitioning. It doesn't matter whether you use 100 reducers if only 1 is going to handle all the work.

Two good points. I like the second: partitioning the data correctly is way more important than the number of "slots" where you are sending the data to be reduced. 

Tuesday, April 15, 2014

What is Apache Tez? A Hadoop YARN DAG-based framework for speed and flexibility

A very nice presentation on Apache Tez by Bikas Saha, Hadoop and Tez committer currently at Hortonworks, at QCon 2013 in NY. The slides are quite dense, but illuminating about what Tez (and YARN) can offer to the Hadoop ecosystem.

Apache Tez: Accelerating Hadoop Query Processing:
http://www.infoq.com/presentations/apache-tez

Tez is an Apache project that aims to accelerate Hadoop querying using YARN's possibilities. It is a client-side application (more on this later). But it is not a end-user application: it is thought to build applications on top of Tez.

CHARACTERISTICS

Expressive dataflow definition APIs.

It uses sampling to better partitioning data.

Flexible Input-Output-Processor runtime model

The idea is to have a library of inputs, outputs and processors that can be plugged into one another dinamically to perform the computations.
For example, you can define a mapper as a sequence like this:
INPUT: HDFSInput
PROCESSOR: MapProcessor
OUTPUT: FileSortedOutput
or a reducer as:
INPUT: ShuffleInput
PROCESSOR: ReduceProcessor
OUTPUT: HDFSOutput
This is an expression of what actually happens behind the scenes in MRv1, but made flexible to accomodate other tasks.

Data type agnostic

Nothing enforced. Just bytes. KV, tuples, whatever; it's up to your application.

Simplifying Deployment

Upload the libraries and use Tez client, nothing installed.

 EXECUTION PERFORMANCE

Performance gains over MapReduce

Due to the fact that this is a DAG: removed write barrier between computations and launch overhead, etc.

Plan reconfiguration at runtime

We can change the topology of the graph in runtime! This means, we can choose different topologies depending on e.g. the volume of data, you change the number of reducers. Or make joins in different orders. It really gives a great deal of flex!

Optimal resource management

By reusing YARN containers, for new tasks or to keep shared objects in memory for further task executions.

Dynamic physical flow decisions

Optimizations like "process locally if possible", "can we use in-memory datastore?", etc. This is in the pipeline, but it does open possibilities.








Friday, April 11, 2014

Hadoop Ownership: administration and development teams inside companies survey

A research document Integratng Hadoop and Business Intelligence into Business Intelligence and Data Warehousing shows some valuable insight into Big Data (well, really, a Hadoop-biased view of big data) technologies adoption in companies out there, sponsored by Cloudera, EMC Greenplum, Hortonworks, ParAccel, SAP, SAS, Tableau Software, and Teradata. Don't try to find MongoDB or Cassandra mentions there.

Among the many interesting facts, I highlight this one of Hadoop Ownership, and maybe my interest is spurred because at the moment of writing this I am assessing entering into administration projects :D
There is a bias towards big corporate too, which it makes sense if you see some of the questions (many companies cannot have a choice for into which team this falls).



Ownership of Hadoop


Hadoop environments may be owned and primarily used by a number of organizational units.
In enterprises with multiple Hadoop environments, ownership can be quite diverse.

Data warehouse group (54%) In organizations that are intent on integrating Hadoop into the
practices and infrastructure for BI, DW, DI, and analytics, it makes the most sense that the DW group or an equivalent BI team own and (perhaps) maintain a Hadoop environment.

Central IT (35%) Every organization is unique, but more and more, TDWI sees central IT evolving into a provider of IT infrastructure, especially networks, data storage, and server resources. This means that application-specific teams are organized elsewhere, instead of being under IT. In that spirit, it’s possible that Hadoop in the future will be just another infrastructure provided by central IT, with a wide range of applications tapped into it, not just analytic ones.

Application group (29%) Many types of big data are associated with specific applications and the technical or business teams that use them. Just think about Web logs and other Web data being generated or captured by Web applications created by a Web applications development team. In those cases, it makes sense that an application group should have its own Hadoop environment.

Research or analysis group (25%) Many user organizations have dedicated teams of business analysts (recently joined by data scientists), who tackle tough new business questions and problems as they arise. Similar teams support product developers and other researchers who depend heavily on data. These relatively small teams of analysts regularly have their own tools and platforms, and it seems that Hadoop is joining such portfolios.

Business unit or department (15%) Most analytic applications have an obvious connection to a department, such as customer-base segmentation for the marketing department or supplier analytics for the procurement department. This explains why most analytic applications are deployed as departmentally owned silos. This precedence continues with big data analytic applications, as seen in the examples given in the previous two bullets.



Thursday, April 10, 2014

Spider.io and Storm: An storming love story with a happy ending

At spider.io, they have made (what I consider) a lot of changes to their core architecture, to adapt to a rapidly-changing business environment. The gotchas on these slides are worth a look (and the humour is appreciated too).


 
Storm at spider.io - London Storm Meetup 2013-06-18 from Ashley Brown

The fact that they changed technology so often, instead of suggesting a lack of vision, can be seen as a great flexibility and ability to adapt to their demands.

The concepts layered are also very challenging and are clearly explained. I've found these slides very rewarding for the time invested. 

Wednesday, April 9, 2014

Bitcoin hacking: MongoDB infrastructure choice led to a hackeable system in finance


It's a dirty little secret that everyone knows: Bitcoin exchanges built on top of first-generation NoSQL infrastructure lack even the most basic measures to guarantee the integrity of their accounts.

A very interesting article on some Bitcoin exchanges that, based on NoSQL technologies (specifically MongoDB), have suffered from concurrency in their systems and have been (successfully) hacked.

What happened at Flexcoin, or Poloniex, or any of the other Bitcoin exchanges beset by technical problems (and I'm looking at you Coinbase!), was not an outlier or unexpected or unforeseable. The infrastructure is broken. And it is broken by design.

Flexicoin recently suffered a hacker attack that may well be shown to students on concurrency classes so they gain interest on how to make easy money due to exploiting other people's failures.

Bitcoin meets NoSQL: The story of Flexicoin and Poloniex


This article provides some ideas on fixing the problem, and also had the merit, in my case, to introduce HyperDex, a system I didn't know that provides consistency + scalability that they proactively evangelise.

I am working now for a firm specialized in finance, so I'm going to pay an extra attention to this kind of articles.


Extra Ball from Hacking Distributed: How-to make documentation on infrastructure.

Monday, April 7, 2014

Lambdoop: Lambda architecture to go

Lambdoop is a product made in Spain that tries to put the famous Lambda Architecture into place with minimal distortion. I haven't tried it, but I have to say that I love the name.

At the moment, their website is minimal too. But this presentation in NoSQLMatters Conference in Barcelona a few months ago shows the present and future of the product, which according to their words, is already in place at some (happy) customers'. 





The choice of technology stack for Lambda's layers has been Hadoop + HBase for the batch layer, and Spark + redis for the real time processing layer. It also provides Java abstraction and facilities to hybridize the processing.

The Lambdoop ecosystem also comprises a set of tools to make life easier for the developers (and non-developers, since there is a workflow designer tool too - where I'd ask why not to use or adapt Oozie).

It is still to be known whether the product will honor such a good name, and more specifically, position itself in the niche. It is still to be made clear that the community needs a product that ships a whole Lambda Architecture product instead of having the flex of arranging layers separately.
With plenty of solutions around Lambda Architecture out there and more to be, they better hurry.


Thursday, April 3, 2014

Deep Learning with H20: Neural networks, machine learning and big data

A very interesting presentation from H20/0xdata, a product that aims to simplify Machine Learning over HDFS, either map reduce, HDFS or YARN. A nice explanation with ideas on neural networks and how to optimize them in the hadoop ecosystem.

I am actually doing Dr. Andrew Ng's Stanford Machine Learning course in Coursera, and by chance I am now into the neural networks chapter, so it's been good to see an application in big data to join both ideas.

Presentation on H20 Deep Learning

Embedding slideshare seems to be failing in this layout, so I leave the link to the presentation instead.

Monday, March 10, 2014

Big Data Architecture Patterns - Eddie Satterly from Splunk

A nice, basic introduction to the BigData / NoSQL ecosystem on Big Data Patterns.
The word pattern can be a little bit misleading: when I first came to the video, I expected more of big data application structures. It is more of a basic introduction to the state of things, and then an intro to some NoSQL systems (Mongo, Cassandra) and Hadoop.  After that, Eddie Satterly from Splunk, speaks about real world applications - so for those initiated, you can skip the first half hour... or maybe skip the whole talk :)


Monday, February 17, 2014

Flume versus Chukwa comparison

I was looking into the differences between Chukwa and Flume

Jonathan Hsieh at Cloudera posted a biased explanation on Flume google groups
Obviously, since Flume is shipped with Cloudera's distribution. 
It's worth to read the full conversation, but I'll post here the last part of the conversation.

-------------------------------------------------

> While the story with the Chukwa MR job seem fairly clear, with the new HBase
> story, does Chukwa depend on HBase  to demux different kinds of data from
> different sources?  Is it based on the data's key somehow?

The new HBase Writer uses the same mapreduce demux parser classes,
which runs on the collector to extract and filter data at real time.
The composition of row key is based on the decision put on the demux
parser.

> Does Chukwa depend on getting data into HBase before performing extraction
> or analytic operations?

It can do both or sending as raw bytes, but it is preferred to perform
extraction before sending to HBase to trim down the storage bytes.

> > 2) Reliability
>
> > > Flume's reliability levels are tunable, just pick the appropriate sink
> > > to specify the mode you want to collect data at.  It offers three
> > > levels -- best effort, store+retry on failure, and end-to-end mode
> > > that uses acks and a write ahead log.
>

All three methods exist in Chukwa.  The first attempt, SeqFileWriter
uses HDFS client to write data with flush, then ack to the source
where data has deposited.  This method is fairly slow because HDFS
sync can not be called too frequently.  It was decided to sync every
15 seconds.  If there is a collector system crash without closing the
file, the data is still lost without written.  The second method was
to use LocalWriter, where data is written to local disk of the
collector, and uploaded to HDFS as one shot, when the chunks of data
has been close after n minute.  This is also a problem when data
arrival rate is faster than HDFS can write, data are buffering faster
than sinked to hdfs, which causes the collector to become disk full.
The improved version is having agent to write ahead and retry if no
ack received, collector does not buffer any data.  This allows
collector to go down without losing data and open up the possibility
to implement other writer like HbaseWriter.  This model works the best
for us so far.  I think both team has done the same research but in
the different order.  End-to-end reliability mode did not work for
Chukwa because hdfs flush was no ops in Hadoop 0.18, and hasn't been
improved in 0.20.

> As I alluded to, I couldn't find much about Chukwa's reliability story on
> its web pages -- and I stand corrected about a Chukwa collector writing to
> local disk.  Do you have a pointer or write up on the different modes?  Is
> the WAL mode the default?

There are SeqFileWriter (HDFS), SocketTeeWriter (in memory),
LocalWriter (local disk), and soon HbaseWriter (in memory).  All of
them are using agent write ahead, and retry on no ack.

> Flume's logs are durable and is intended to allow and recover from multiple
> failures and  the cluster of machines.   Flume supports a hot failover
> mechanism to different collectors ( I'd imagine that Chukwa supports this).
>  Moreover, Flume also supports  dynamic reconfiguration of nodes -- this
> allows us to allocate more collectors at the system's master and take load
> from other collectors in an automated fashion.

Chukwa doesn't have dynamic allocation of more collectors, but this
can be easily implemented.

> > 3) Manageability
>
> Chukwa's core original mechanisms still requires  MapReduce jobs to run to
> demux data.  I can see that if you drop the console/monitoring portion then
> you can drop the mysql part.    Chukwa still has have specialized an agent
> daemons  and a collectors daemons.

The separation of agent and collector is important because collector
is generally work in push model, where agent can utilize both pull
model or push model.  This has several benefits.  First, this model
reduces the chance of operator error for flipping incorrect data flow
direction.  Second, agent only open one tcp connection to one collect
at any given time.  It reduces the number of TCP connections open to
avoid tcp in-cast problem.  Third, the source can decide to buffer
data for reliability reason without making this decision in middle of
the pipe to cause spills.  Forth, each component is optimized for the
assigned task without generalization trade off, collector does not
need to implement logic for tracking rotated log, and agent does not
need to worry about filter or mirror of data, this reduces memory foot
print to live on source nodes.  Chukwa agent can run with only 4mb of
jvm heap size.

> Flume essentially has a data plane that have one kind of daemon (nodes) and
> a control plane that has another daemon (masters).  With the control plan we
> can dynamically change the role of a nodes and its behavior from a
> centralized point.  This means a flume master can react and deal with newly
>  provisioned nodes in the system, or re-purpose nodes to be come collectors
> in different data flows.

Chukwa has the managing aspect open ended, and it could be added by
using zookeeper to keep track of collector list, in 5 minutes.  With
hope, there may be more experience sharing and collaboration among
Flume and Chukwa.

regards,
Eric

Friday, February 14, 2014

Java Hadoop: Reducer Key counter-intuitive behavior

One of the more curious gotchas once you're into the Hadoop Developer career, is a funny counter-intuitive behaviour that occurs in the Java Reducer.

A SecondarySort example to illustrate


Suppose you have a dataset such as the typical Stock Market data:

Stock  Timestamp  Price

GOOG 2014-02-14 15:51  1000.0
GOOG 2014-02-14 15:52  1001.3
GOOG 2014-02-14 15:53  1001.2
...

Of course with different stocks, not only GOOG. And of course again, not necessarily sorted.

You want to sort by <Stock, Timestamp> to properly do your calculations of how fast GOOG stocks are rising (they never go down, do they?). In other words, you want to know the differences between each and the next so you can draw a slope curve to show off in front of your significant other.

You would make a composite Key including Stock and Timestamp. Call it CompositeKey, implement their toString as the concatenation of the two values, and everything else as you should. I omit this code for brevity.
You would perform a secondary sort so you receive in each reducer your key, with all the values gathered.


public void reduce( CompositeKey key, Iterable<DoubleWritable> values), Context context)
      throws IOException, InterruptedException 
{
    double price = -1;
    for(DoubleWritable value: values) {
        if(price = -1)
            price = value.get();
        double difference = value.get() - price; 
        context.write(key, new DoubleWritable(difference)));
    }
}

Here we should be emitting something like:

GOOG 2014-02-14 15:51  0
GOOG 2014-02-14 15:52  1.3
GOOG 2014-02-14 15:53  -0.1

Are we really doing so?


Look at the variable key.
We are iterating the values. But apparently we are not iterating the key. Inside the reducer, the object key seems to be untouched, unchanged. So, according to regular intuition and Java standards, we should be emitting the same key every time (that would be, for example, the first key, <GOOG, 2014-02-14 15:51> ). So for a non-hadoop Java programmer, the output should be something like:

GOOG 2014-02-14 15:51  0
GOOG 2014-02-14 15:51  1.3
GOOG 2014-02-14 15:51  -0.1

So why does the above code works?
It does because of the iterability on the values.
With each new iteration over the values, the values obviously change. But also does the key: when calling next() method, the key object is also updated.

Why is this not made explicit? This is a good question. And I bet this is a question that appears in many introductory Hadoop courses from those curious enough as to take the steps to delve into Hadoop's guts.

What if this is what I want?


If you would want to keep keys (something you may have to ask yourself twice), you can only do it by performing a deep copy (or hard clone). You can do this elegantly by implementing the clone() method in your CompositeKey, and returning a newly created object and deep-copying there the attributes.