Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka producer consumer #4

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions KafkaProducerConsumer/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions KafkaProducerConsumer/.idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions KafkaProducerConsumer/.idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions KafkaProducerConsumer/.idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions KafkaProducerConsumer/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions KafkaProducerConsumer/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions KafkaProducerConsumer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>KafkaProducerConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>KafkaProducerConsumer</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.example.KafkaProducerConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

public class Consumer {
public static void main(String[] args) {

String topic = "topic82";
String brokers = "localhost:8082";
String stringSerializer = "org.apache.kafka.common.serialization.StringDeserializer";

Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringSerializer);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringSerializer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

KafkaConsumer consumer = new KafkaConsumer<String, String>(config);

HashSet<String> topics = new HashSet<String>();
topics.add(topic);
consumer.subscribe(topics);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.example.KafkaProducerConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;

import java.util.HashMap;
import java.util.Map;
public class Producer {

public static void main(String[] args) throws InterruptedException {
String topic = "foobar";
String brokers = "localhost:9092";
String IntegerSerializer = "org.apache.kafka.common.serialization.IntegerSerializer";
String StringSerializer = "org.apache.kafka.common.serialization.StringSerializer";

Map<String, Object> config = new HashMap<String, Object>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer);

KafkaProducer<Integer,String> producer = new KafkaProducer<>(config);

System.out.println("PUBLISHING STARTED");
for(int i=0;i< 10;i++){
producer.send(new ProducerRecord<>(topic, i, "MSG#"+i));
}
producer.close();
System.out.println("PUBLISHING ENDED");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.KafkaProducerConsumer;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}

/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}

/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions KafkaProducerConsumer/target/maven-archiver/pom.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#Generated by Maven
#Sat Apr 02 11:38:38 EDT 2022
groupId=com.example
artifactId=KafkaProducerConsumer
version=0.0.1-SNAPSHOT
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
com/example/KafkaProducerConsumer/Consumer.class
com/example/KafkaProducerConsumer/Producer.class
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/Users/rahulvaish/myRepos/Apache-Kafka/KafkaProducerConsumer/src/main/java/com/example/KafkaProducerConsumer/Consumer.java
/Users/rahulvaish/myRepos/Apache-Kafka/KafkaProducerConsumer/src/main/java/com/example/KafkaProducerConsumer/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com/example/KafkaProducerConsumer/AppTest.class
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/Users/rahulvaish/myRepos/Apache-Kafka/KafkaProducerConsumer/src/test/java/com/example/KafkaProducerConsumer/AppTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8" ?>
<testsuite tests="1" failures="0" name="com.example.KafkaProducerConsumer.AppTest" time="0.004" errors="0" skipped="0">
<properties>
<property name="idea.version" value="2021.3.1"/>
<property name="java.runtime.name" value="Java(TM) SE Runtime Environment"/>
<property name="java.vm.version" value="17.0.2+8-LTS-86"/>
<property name="sun.boot.library.path" value="/Library/Java/JavaVirtualMachines/jdk-17.0.2.jdk/Contents/Home/lib"/>
<property name="maven.multiModuleProjectDirectory" value="/Users/rahulvaish/myRepos/Apache-Kafka/KafkaProducerConsumer"/>
<property name="java.vm.vendor" value="Oracle Corporation"/>
<property name="java.vendor.url" value="https://java.oracle.com/"/>
<property name="guice.disable.misplaced.annotation.check" value="true"/>
<property name="path.separator" value=":"/>
<property name="java.vm.name" value="Java HotSpot(TM) 64-Bit Server VM"/>
<property name="user.country" value="CA"/>
<property name="sun.java.launcher" value="SUN_STANDARD"/>
<property name="java.vm.specification.name" value="Java Virtual Machine Specification"/>
<property name="user.dir" value="/Users/rahulvaish/myRepos/Apache-Kafka/KafkaProducerConsumer"/>
<property name="java.vm.compressedOopsMode" value="Zero based"/>
<property name="java.runtime.version" value="17.0.2+8-LTS-86"/>
<property name="os.arch" value="x86_64"/>
<property name="java.io.tmpdir" value="/var/folders/9h/yk8hzhr13m3584xq2mxwqjg00000gn/T/"/>
<property name="line.separator" value="
"/>
<property name="java.vm.specification.vendor" value="Oracle Corporation"/>
<property name="os.name" value="Mac OS X"/>
<property name="maven.ext.class.path" value="/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven-event-listener.jar"/>
<property name="classworlds.conf" value="/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/bin/m2.conf"/>
<property name="sun.jnu.encoding" value="UTF-8"/>
<property name="java.library.path" value="/Users/rahulvaish/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:."/>
<property name="maven.conf" value="/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/conf"/>
<property name="jdk.debug" value="release"/>
<property name="java.class.version" value="61.0"/>
<property name="java.specification.name" value="Java Platform API Specification"/>
<property name="sun.management.compiler" value="HotSpot 64-Bit Tiered Compilers"/>
<property name="os.version" value="12.1"/>
<property name="http.nonProxyHosts" value="local|*.local|169.254/16|*.169.254/16"/>
<property name="user.home" value="/Users/rahulvaish"/>
<property name="user.timezone" value="America/Toronto"/>
<property name="file.encoding" value="UTF-8"/>
<property name="java.specification.version" value="17"/>
<property name="user.name" value="rahulvaish"/>
<property name="java.class.path" value="/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/boot/plexus-classworlds.license:/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/boot/plexus-classworlds-2.6.0.jar"/>
<property name="java.vm.specification.version" value="17"/>
<property name="sun.arch.data.model" value="64"/>
<property name="sun.java.command" value="org.codehaus.classworlds.Launcher -Didea.version=2021.3.1 install"/>
<property name="java.home" value="/Library/Java/JavaVirtualMachines/jdk-17.0.2.jdk/Contents/Home"/>
<property name="user.language" value="en"/>
<property name="java.specification.vendor" value="Oracle Corporation"/>
<property name="java.vm.info" value="mixed mode, sharing"/>
<property name="java.version" value="17.0.2"/>
<property name="native.encoding" value="UTF-8"/>
<property name="java.vendor" value="Oracle Corporation"/>
<property name="maven.home" value="/Applications/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3"/>
<property name="file.separator" value="/"/>
<property name="java.version.date" value="2022-01-18"/>
<property name="java.vendor.url.bug" value="https://bugreport.java.com/bugreport/"/>
<property name="sun.io.unicode.encoding" value="UnicodeBig"/>
<property name="sun.cpu.endian" value="little"/>
<property name="socksNonProxyHosts" value="local|*.local|169.254/16|*.169.254/16"/>
<property name="ftp.nonProxyHosts" value="local|*.local|169.254/16|*.169.254/16"/>
</properties>
<testcase classname="com.example.KafkaProducerConsumer.AppTest" name="testApp" time="0.004"/>
</testsuite>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-------------------------------------------------------------------------------
Test set: com.example.KafkaProducerConsumer.AppTest
-------------------------------------------------------------------------------
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.021 sec
Binary file not shown.