kafka使用例子导入eclipse:https://cwiki.apache.org/KAFKA/developer-setup.html
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
kakfa的consumer使用拉的方式工作。
安装kafka
下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
使用kafka
- importjava.util.Arrays;
- importjava.util.List;
- importjava.util.Properties;
- importkafka.javaapi.producer.SyncProducer;
- importkafka.javaapi.message.ByteBufferMessageSet;
- importkafka.message.Message;
- importkafka.producer.SyncProducerConfig;
- ...
- Propertiesprops=newProperties();
- props.put(“zk.connect”,“127.0.0.1:2181”);
- props.put("serializer.class","kafka.serializer.StringEncoder");
- ProducerConfigconfig=newProducerConfig(props);
- Producer<String,String>producer=newProducer<String,String>(config);
- Sendasinglemessage
- ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message");
- producer.send(data);
- producer.close();
这样就是一个标准的producer。
consumer的代码
- Propertiesprops=newProperties();
- props.put("zk.connect","localhost:2181");
- props.put("zk.connectiontimeout.ms","1000000");
- props.put("groupid","test_group");
- ConsumerConfigconsumerConfig=newConsumerConfig(props);
- ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String,List<KafkaMessageStream<Message>>>topicMessageStreams=
- consumerConnector.createMessageStreams(ImmutableMap.of("test",4));
- List<KafkaMessageStream<Message>>streams=topicMessageStreams.get("test");
- ExecutorServiceexecutor=Executors.newFixedThreadPool(4);
- for(finalKafkaMessageStream<Message>stream:streams){
- executor.submit(newRunnable(){
- publicvoidrun(){
- for(Messagemessage:stream){
- }
- }
- });
- }
原创文章如转载,请注明:转载自
五四陈科学院[
http://www.54chen.com]
分享到:
相关推荐
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...
Kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程...
Kafka是分布式的消息队列,作为云计算服务的基石,它广泛的应用在实时数据流方面,是实时数据处理的数据中枢,广泛应用在很多互联网企业,例如:linkedin,facebook,腾讯,百度,阿里等。实时数据流是现在互联网...
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...
Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。在大数据系统中,常常会...
Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activitystream)和运营数据处理管道(pipeline)的基础。现在它已为多家不同类型的公司作为多种类型的数据管道(datapipeline)和消息系统使用。...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Kafka是一个高吞吐量分布式消息系统。linkedin开源的kafka。 Kafka就跟这个名字一样,设计非常独特。 首先,kafka的开发者们认为不需要在内存里缓存什么数据,操作系统的文件缓存已经足够完善和强大,只要你不搞...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Kafka社区非常活跃,从0.9版本开始,Kafka的标语已经从“一个高吞吐量,分布式的消息系统”改为"一个分布式流平台"。 Kafka和传统的消息系统不同在于: Kafka是一个分布式系统,易于向外扩展。 它同时为发布和订阅...
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种场景需求:比如基于hadoop...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
其中ActiveMQ是Apache公司开源的消息系统,使用Java语言开发,功能 较为完善,被大量开源项目所使用。而RocketMQ是阿里开源的消息中间件,他也是纯Java开发,具有高吞 吐量、高可用性、适合大规模分布式系统应用的...
Kafka是一个分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。